1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
import time
from multiprocessing import Process
import pytest
from doit.cmdparse import DefaultUpdate
from doit.exceptions import InvalidCommand
from doit.task import Task
from doit.cmd_base import TaskLoader
from doit import cmd_auto
class TestFindFileDeps(object):
def find_deps(self, sel_tasks):
tasks = {
't1': Task("t1", [""], file_dep=['f1']),
't2': Task("t2", [""], file_dep=['f2'], task_dep=['t1']),
't3': Task("t3", [""], file_dep=['f3'], setup=['t1']),
}
return cmd_auto.Auto._find_file_deps(tasks, sel_tasks)
def test_find_file_deps(self):
assert set(['f1']) == self.find_deps(['t1'])
assert set(['f1', 'f2']) == self.find_deps(['t2'])
assert set(['f1', 'f3']) == self.find_deps(['t3'])
class TestDepChanged(object):
def test_changed(self, dependency1):
started = time.time()
assert not cmd_auto.Auto._dep_changed([dependency1], started, [])
assert cmd_auto.Auto._dep_changed([dependency1], started-100, [])
assert not cmd_auto.Auto._dep_changed([dependency1], started-100,
[dependency1])
class FakeLoader(TaskLoader):
def __init__(self, task_list, dep_file):
self.task_list = task_list
self.dep_file = dep_file
def load_tasks(self, cmd, params, args):
return self.task_list, {'verbosity':2, 'dep_file':self.dep_file}
class TestAuto(object):
def test_invalid_args(self, dependency1, depfile_name):
t1 = Task("t1", [""], file_dep=[dependency1])
cmd = cmd_auto.Auto(task_loader=FakeLoader([t1], depfile_name))
pytest.raises(InvalidCommand, cmd.execute, None, 't2')
def test_run_wait(self, dependency1, target1, depfile_name):
def ok():
with open(target1, 'w') as fp:
fp.write('ok')
t1 = Task("t1", [ok], file_dep=[dependency1])
cmd = cmd_auto.Auto(task_loader=FakeLoader([t1], depfile_name))
run_wait_proc = Process(target=cmd.run_watch,
args=(DefaultUpdate(), []))
run_wait_proc.start()
# wait until task is executed
for x in range(5):
try:
got = open(target1, 'r').read()
print(got)
if got == 'ok':
break
except:
print('busy')
time.sleep(0.1)
else: # pragma: no cover
raise Exception("target not created")
# write on file to terminate process
fd = open(dependency1, 'w')
fd.write("hi" + str(time.asctime()))
fd.close()
run_wait_proc.join(.5)
if run_wait_proc.is_alive(): # pragma: no cover
# this test is very flaky so we give it one more chance...
# write on file to terminate process
fd = open(dependency1, 'w')
fd.write("hi" + str(time.asctime()))
fd.close()
run_wait_proc.join(1)
if run_wait_proc.is_alive(): # pragma: no cover
run_wait_proc.terminate()
raise Exception("process not terminated")
assert 0 == run_wait_proc.exitcode
def test_execute(self, monkeypatch):
# use dumb operation instead of executing RUN command and waiting event
def fake_run(self, params, args): # pragma: no cover
5 + 2
monkeypatch.setattr(cmd_auto.Auto, 'run_watch', fake_run)
# after join raise exception to stop AUTO command
original = cmd_auto.Process.join
def join_interrupt(self):
original(self)
raise KeyboardInterrupt()
monkeypatch.setattr(cmd_auto.Process, 'join', join_interrupt)
cmd = cmd_auto.Auto()
cmd.execute(None, None)
|