File: test_async_executor.py

package info (click to toggle)
patroni 4.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,704 kB
  • sloc: python: 29,743; sh: 573; makefile: 29
file content (33 lines) | stat: -rw-r--r-- 766 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import unittest

from threading import Thread
from unittest.mock import Mock, patch

from patroni.async_executor import AsyncExecutor, CriticalTask


class TestAsyncExecutor(unittest.TestCase):

    def setUp(self):
        self.a = AsyncExecutor(Mock(), Mock())

    @patch.object(Thread, 'start', Mock())
    def test_run_async(self):
        self.a.run_async(Mock(return_value=True))

    def test_run(self):
        self.a.run(Mock(side_effect=Exception()))

    def test_cancel(self):
        self.a.cancel()
        self.a.schedule('foo')
        self.a.cancel()
        self.a.run(Mock())


class TestCriticalTask(unittest.TestCase):

    def test_completed_task(self):
        ct = CriticalTask()
        ct.complete(1)
        self.assertFalse(ct.cancel())