File: conftest.py

package info (click to toggle)
flufl.lock 9.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 256 kB
  • sloc: python: 847; makefile: 12
file content (98 lines) | stat: -rw-r--r-- 2,961 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import time
import multiprocessing

from sybil import Sybil
from psutil import pid_exists
from doctest import ELLIPSIS, NORMALIZE_WHITESPACE, REPORT_NDIFF
from datetime import timedelta
from tempfile import TemporaryDirectory
from contextlib import ExitStack
from flufl.lock import SEP, Lock
from sybil.parsers.doctest import DocTestParser
from sybil.parsers.codeblock import PythonCodeBlockParser

DOCTEST_FLAGS = ELLIPSIS | NORMALIZE_WHITESPACE | REPORT_NDIFF


def child_locker(filename, lifetime, queue, extra_sleep):
    # First, acquire the file lock.
    with Lock(filename, lifetime):
        # Now inform the parent that we've acquired the lock.
        queue.put(True)
        # Keep the file lock for a while.
        time.sleep(lifetime.seconds + extra_sleep)


def acquire(filename, *, lifetime=None, extra_sleep=-1):
    """Acquire the named lock file in a subprocess."""
    queue = multiprocessing.Queue()
    proc = multiprocessing.Process(
        target=child_locker,
        args=(filename, timedelta(seconds=lifetime), queue, extra_sleep))
    proc.start()
    while not queue.get():
        time.sleep(0.1)


def child_waitfor(filename, lifetime, queue):
    t0 = time.time()
    # Try to acquire the lock.
    with Lock(filename, lifetime):
        # Tell the parent how long it took to acquire the lock.
        queue.put(time.time() - t0)


def waitfor(filename, lifetime):
    """Fire off a child that waits for a lock."""
    queue = multiprocessing.Queue()
    proc = multiprocessing.Process(
        target=child_waitfor,
        args=(filename, lifetime, queue))
    proc.start()
    return queue.get()


def simulate_process_crash(lockfile):
    # We simulate an unclean crash of the process holding the lock by
    # deliberately fiddling with the pid in the lock file, ensuring that no
    # process with the new pid exists.
    with open(lockfile) as fp:
        filename = fp.read().strip()
    lockfile, hostname, pid_str, random = filename.split(SEP)
    pid = int(pid_str)
    while pid_exists(pid):
        pid += 1
    with open(lockfile, 'w') as fp:
        fp.write(SEP.join((lockfile, hostname, str(pid), random)))


class DoctestNamespace:
    def __init__(self):
        self._resources = ExitStack()

    def setup(self, namespace):
        namespace['acquire'] = acquire
        namespace['temporary_lockfile'] = self.temporary_lockfile
        namespace['waitfor'] = waitfor
        namespace['simulate_process_crash'] = simulate_process_crash

    def teardown(self, namespace):
        self._resources.close()

    def temporary_lockfile(self):
        lock_dir = self._resources.enter_context(TemporaryDirectory())
        return os.path.join(lock_dir, 'test.lck')


namespace = DoctestNamespace()


pytest_collect_file = Sybil(
    parsers=[
        DocTestParser(optionflags=DOCTEST_FLAGS),
        PythonCodeBlockParser(),
        ],
    pattern='*.rst',
    setup=namespace.setup,
    ).pytest()