1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
import textwrap
import subprocess
import sys
import unittest
try:
from unittest import mock
except ImportError:
import mock
import mitogen.master
import testlib
import plain_old_module
import simple_pkg.a
class NeutralizeMainTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.master.ModuleResponder
def call(self, *args, **kwargs):
router = mock.Mock()
return self.klass(router).neutralize_main(*args, **kwargs)
def test_missing_exec_guard(self):
path = testlib.data_path('main_with_no_exec_guard.py')
args = [sys.executable, path]
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
_, stderr = proc.communicate()
self.assertEqual(1, proc.returncode)
expect = self.klass.main_guard_msg % (path,)
self.assertIn(expect, stderr.decode())
HAS_MITOGEN_MAIN = mitogen.core.b(
textwrap.dedent("""
herp derp
def myprog():
pass
@mitogen.main(maybe_some_option=True)
def main(router):
pass
""")
)
def test_mitogen_main(self):
untouched = self.call("derp.py", self.HAS_MITOGEN_MAIN)
self.assertEqual(untouched, self.HAS_MITOGEN_MAIN)
HAS_EXEC_GUARD = mitogen.core.b(
textwrap.dedent("""
herp derp
def myprog():
pass
def main():
pass
if __name__ == '__main__':
main()
""")
)
def test_exec_guard(self):
touched = self.call("derp.py", self.HAS_EXEC_GUARD)
bits = touched.decode().split()
self.assertEqual(bits[-3:], ['def', 'main():', 'pass'])
class GoodModulesTest(testlib.RouterMixin, testlib.TestCase):
def test_plain_old_module(self):
# The simplest case: a top-level module with no interesting imports or
# package machinery damage.
context = self.router.local()
self.assertEqual(256, context.call(plain_old_module.pow, 2, 8))
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEqual(1+os_fork, self.router.responder.get_module_count)
self.assertEqual(1+os_fork, self.router.responder.good_load_module_count)
self.assertLess(300, self.router.responder.good_load_module_size)
def test_simple_pkg(self):
# Ensure success of a simple package containing two submodules, one of
# which imports the other.
context = self.router.local()
self.assertEqual(3,
context.call(simple_pkg.a.subtract_one_add_two, 2))
os_fork = int(sys.version_info < (2, 6)) # mitogen.os_fork
self.assertEqual(2+os_fork, self.router.responder.get_module_count)
self.assertEqual(3+os_fork, self.router.responder.good_load_module_count)
self.assertEqual(0, self.router.responder.bad_load_module_count)
self.assertLess(450, self.router.responder.good_load_module_size)
def test_self_contained_program(self):
# Ensure a program composed of a single script can be imported
# successfully.
args = [sys.executable, testlib.data_path('self_contained_program.py')]
proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
b_stdout, _ = proc.communicate()
self.assertEqual(proc.returncode, 0)
self.assertEqual(b_stdout.decode(), "['__main__', 50]\n")
class BrokenModulesTest(testlib.TestCase):
def test_obviously_missing(self):
# Ensure we don't crash in the case of a module legitimately being
# unavailable. Should never happen in the real world.
stream = mock.Mock()
stream.protocol.sent_modules = set()
router = mock.Mock()
router.stream_by_id = lambda n: stream
msg = mitogen.core.Message(
data=mitogen.core.b('non_existent_module'),
reply_to=50,
)
msg.router = router
responder = mitogen.master.ModuleResponder(router)
responder._on_get_module(msg)
self.assertEqual(1, len(router._async_route.mock_calls))
self.assertEqual(1, responder.get_module_count)
self.assertEqual(0, responder.good_load_module_count)
self.assertEqual(0, responder.good_load_module_size)
self.assertEqual(1, responder.bad_load_module_count)
call = router._async_route.mock_calls[0]
msg, = call[1]
self.assertEqual(mitogen.core.LOAD_MODULE, msg.handle)
self.assertEqual(('non_existent_module', None, None, None, ()),
msg.unpickle())
@unittest.skipIf(
condition=sys.version_info < (2, 6),
reason='Ancient Python lacked "from . import foo"',
)
def test_ansible_six_messed_up_path(self):
# The copy of six.py shipped with Ansible appears in a package whose
# __path__ subsequently ends up empty, which prevents pkgutil from
# finding its submodules. After ansible.compat.six is initialized in
# the parent, attempts to execute six/__init__.py on the slave will
# cause an attempt to request ansible.compat.six._six from the master.
import six_brokenpkg
stream = mock.Mock()
stream.protocol.sent_modules = set()
router = mock.Mock()
router.stream_by_id = lambda n: stream
msg = mitogen.core.Message(
data=mitogen.core.b('six_brokenpkg._six'),
reply_to=50,
)
msg.router = router
responder = mitogen.master.ModuleResponder(router)
responder._on_get_module(msg)
self.assertEqual(1, len(router._async_route.mock_calls))
self.assertEqual(1, responder.get_module_count)
self.assertEqual(1, responder.good_load_module_count)
self.assertEqual(0, responder.bad_load_module_count)
call = router._async_route.mock_calls[0]
msg, = call[1]
self.assertEqual(mitogen.core.LOAD_MODULE, msg.handle)
tup = msg.unpickle()
self.assertIsInstance(tup, tuple)
class ForwardTest(testlib.RouterMixin, testlib.TestCase):
@unittest.skip('Fails on Debian')
def test_forward_to_nonexistent_context(self):
nonexistent = mitogen.core.Context(self.router, 123)
capture = testlib.LogCapturer()
capture.start()
self.broker.defer_sync(lambda:
self.router.responder.forward_modules(
nonexistent,
['mitogen.core']
)
)
s = capture.stop()
self.assertIn('dropping forward of', s)
def test_stats(self):
# Forwarding stats broken because forwarding is broken. See #469.
c1 = self.router.local()
c2 = self.router.local(via=c1)
os_fork = int(sys.version_info < (2, 6))
self.assertEqual(256, c2.call(plain_old_module.pow, 2, 8))
self.assertEqual(2+os_fork, self.router.responder.get_module_count)
self.assertEqual(2+os_fork, self.router.responder.good_load_module_count)
self.assertLess(10000, self.router.responder.good_load_module_size)
self.assertGreater(40000, self.router.responder.good_load_module_size)
class BlacklistTest(testlib.TestCase):
@unittest.skip('implement me')
def test_whitelist_no_blacklist(self):
assert 0
@unittest.skip('implement me')
def test_whitelist_has_blacklist(self):
assert 0
@unittest.skip('implement me')
def test_blacklist_no_whitelist(self):
assert 0
@unittest.skip('implement me')
def test_blacklist_has_whitelist(self):
assert 0
|