1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
import os
import time
import multiprocessing
import six
if six.PY3: # pragma: no cover
from dbm import whichdb
else:
from whichdb import whichdb
import py
import pytest
from doit.dependency import Dependency
from doit.task import Task
def get_abspath(relativePath):
""" return abs file path relative to this file"""
return os.path.join(os.path.dirname(__file__), relativePath)
# fixture to create a sample file to be used as file_dep
@pytest.fixture
def dependency1(request):
path = get_abspath("data/dependency1")
if os.path.exists(path): os.remove(path)
ff = open(path, "w")
ff.write("whatever" + str(time.asctime()))
ff.close()
def remove_dependency():
if os.path.exists(path):
os.remove(path)
request.addfinalizer(remove_dependency)
return path
# fixture to create a sample file to be used as file_dep
@pytest.fixture
def target1(request):
path = get_abspath("data/target1")
if os.path.exists(path): # pragma: no cover
os.remove(path)
def remove_path():
if os.path.exists(path):
os.remove(path)
request.addfinalizer(remove_path)
return path
# fixture for "doit.db". create/remove for every test
def remove_db(filename):
"""remove db file from anydbm"""
# dbm on some systems add '.db' on others add ('.dir', '.pag')
extensions = ['', #dbhash #gdbm
'.bak', #dumbdb
'.dat', #dumbdb
'.dir', #dumbdb #dbm2
'.db', #dbm1
'.pag', #dbm2
]
for ext in extensions:
if os.path.exists(filename + ext):
os.remove(filename + ext)
# dbm backends use different file extentions
db_ext = {'dbhash': [''],
'gdbm': [''],
'dbm': ['.db', '.dir'],
'dumbdbm': ['.dat'],
# for python3
'dbm.ndbm': ['.db'],
}
@pytest.fixture
def depfile(request):
if hasattr(request, 'param'):
dep_class = request.param
else:
dep_class = Dependency
# copied from tempdir plugin
name = request._pyfuncitem.name
name = py.std.re.sub("[\W]", "_", name)
my_tmpdir = request.config._tmpdirhandler.mktemp(name, numbered=True)
dep_file = dep_class(os.path.join(my_tmpdir.strpath, "testdb"))
dep_file.whichdb = whichdb(dep_file.name)
dep_file.name_ext = db_ext.get(dep_file.whichdb, [''])
def remove_depfile():
if not dep_file._closed:
dep_file.close()
remove_db(dep_file.name)
request.addfinalizer(remove_depfile)
return dep_file
@pytest.fixture
def depfile_name(request):
# copied from tempdir plugin
name = request._pyfuncitem.name
name = py.std.re.sub("[\W]", "_", name)
my_tmpdir = request.config._tmpdirhandler.mktemp(name, numbered=True)
depfile_name = (os.path.join(my_tmpdir.strpath, "testdb"))
def remove_depfile():
remove_db(depfile_name)
request.addfinalizer(remove_depfile)
return depfile_name
@pytest.fixture
def restore_cwd(request):
"""restore cwd to its initial value after test finishes."""
previous = os.getcwd()
def restore_cwd():
os.chdir(previous)
request.addfinalizer(restore_cwd)
# create a list of sample tasks
def tasks_sample():
tasks_sample = [
# 0
Task("t1", [""], doc="t1 doc string"),
# 1
Task("t2", [""], file_dep=['tests/data/dependency1'],
doc="t2 doc string"),
# 2
Task("g1", None, doc="g1 doc string", has_subtask=True),
# 3
Task("g1.a", [""], doc="g1.a doc string", is_subtask=True),
# 4
Task("g1.b", [""], doc="g1.b doc string", is_subtask=True),
# 5
Task("t3", [""], doc="t3 doc string", task_dep=["t1"])
]
tasks_sample[2].task_dep = ['g1.a', 'g1.b']
return tasks_sample
# mokey patch multiprocessing to enable code coverage
# NOTE: doesnt work with pytest-xdist (actually execnet)
def coverage_multiprocessing_process(): # pragma: no cover
try:
import coverage as _coverage
_coverage
except:
return
from coverage.collector import Collector
from coverage.control import coverage
# detect if coverage was running in forked process
if Collector._collectors:
original = multiprocessing.Process._bootstrap
class Process_WithCoverage(multiprocessing.Process):
def _bootstrap(self):
cov = coverage(data_suffix=True)
cov.start()
try:
return original(self)
finally:
cov.stop()
cov.save()
return Process_WithCoverage
ProcessCoverage = coverage_multiprocessing_process()
if ProcessCoverage:
multiprocessing.Process = ProcessCoverage
|