1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
import unittest
from unittest.mock import Mock, patch
import psutil
from patroni.exceptions import PostgresException
from patroni.postgresql.cancellable import CancellableSubprocess
class TestCancellableSubprocess(unittest.TestCase):
def setUp(self):
self.c = CancellableSubprocess()
def test_call(self):
self.c.cancel()
self.assertRaises(PostgresException, self.c.call)
def test__kill_children(self):
self.c._process_children = [Mock()]
self.c._kill_children()
self.c._process_children[0].kill.side_effect = psutil.AccessDenied()
self.c._kill_children()
self.c._process_children[0].kill.side_effect = psutil.NoSuchProcess(123)
self.c._kill_children()
@patch('patroni.postgresql.cancellable.polling_loop', Mock(return_value=[0, 0]))
def test_cancel(self):
self.c._process = Mock()
self.c._process.is_running.return_value = True
self.c._process.children.side_effect = psutil.NoSuchProcess(123)
self.c._process.suspend.side_effect = psutil.AccessDenied()
self.c.cancel()
self.c._process.is_running.side_effect = [True, False]
self.c.cancel()
|