1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
import logging
from unittest.mock import (
patch, Mock
)
from pytest import (
raises, fixture
)
from builtins import bytes
from kiwi.command_process import (
CommandProcess,
CommandIterator
)
from kiwi.exceptions import KiwiCommandError
class TestCommandProcess:
@fixture(autouse=True)
def inject_fixtures(self, caplog):
self._caplog = caplog
def fake_matcher(self, item, output):
return True
def poll(self):
return self.data_flow.pop()
def outavailable(self):
return True
def erravailable(self):
return True
def outdata(self):
return self.data_out.pop()
def errdata(self):
return self.data_err.pop()
def create_flow_method(self, method):
def create_method(arg=None):
return method()
return create_method
def setup(self):
self.data_flow = [True, None, None, None, None, None, None, None]
self.data_out = [
bytes(b''), bytes(b''), bytes(b'\n'), bytes(b'a'),
bytes(b't'), bytes(b'a'), bytes(b'd')
]
self.data_err = [
bytes(b''), bytes(b'\n'), bytes(b'r'), bytes(b'o'),
bytes(b'r'), bytes(b'r'), bytes(b'e')
]
self.flow = self.create_flow_method(self.poll)
self.flow_out_available = self.create_flow_method(self.outavailable)
self.flow_err_available = self.create_flow_method(self.erravailable)
self.flow_out = self.create_flow_method(self.outdata)
self.flow_err = self.create_flow_method(self.errdata)
def setup_method(self, cls):
self.setup()
@patch('kiwi.command.Command')
def test_returncode(self, mock_command):
command = Mock()
mock_command.return_value = command
process = CommandProcess(command)
assert process.returncode() == command.process.returncode
@patch('kiwi.command.Command')
def test_poll_show_progress(self, mock_command):
match_method = CommandProcess(mock_command).create_match_method(
self.fake_matcher
)
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 0
with self._caplog.at_level(logging.DEBUG):
process.poll_show_progress(['a', 'b'], match_method, True)
assert 'system: data' in self._caplog.text
@patch('kiwi.command.Command')
def test_poll_show_progress_raises(self, mock_command):
match_method = CommandProcess(mock_command).create_match_method(
self.fake_matcher
)
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 1
with raises(KiwiCommandError):
process.poll_show_progress(['a', 'b'], match_method)
@patch('kiwi.command.Command')
def test_poll(self, mock_command):
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 0
with self._caplog.at_level(logging.DEBUG):
process.poll()
assert 'system: data' in self._caplog.text
@patch('kiwi.command.Command')
def test_poll_raises(self, mock_command):
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.output.read.return_value = 'data'
process.command.command.process.returncode = 1
with raises(KiwiCommandError):
process.poll()
@patch('kiwi.command.Command')
def test_poll_and_watch(self, mock_command):
process = CommandProcess(mock_command)
process.command.command.process.poll = self.flow
process.command.command.output_available = self.flow_out_available
process.command.command.error_available = self.flow_err_available
process.command.command.output.read = self.flow_out
process.command.command.error.read = self.flow_err
process.command.command.process.returncode = 1
with self._caplog.at_level(logging.DEBUG):
result = process.poll_and_watch()
assert '--------------out start-------------' in self._caplog.text
assert 'data' in self._caplog.text
assert '--------------out stop--------------' in self._caplog.text
assert result.stderr == 'error\n'
@patch('kiwi.command.Command')
def test_create_match_method(self, mock_command):
match_method = CommandProcess(mock_command).create_match_method(
self.fake_matcher
)
assert match_method('a', 'b') is True
def test_command_iterator(self):
iterator = CommandIterator(Mock())
assert iterator.__iter__() == iterator
def test_kill(self):
iterator = CommandIterator(Mock())
iterator.kill()
iterator.command.process.kill.assert_called_once_with()
def test_get_pid(self):
iterator = CommandIterator(Mock())
assert iterator.get_pid() == iterator.command.process.pid
|