File: test_coverup_2.py

package info (click to toggle)
scalene 1.5.51-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 15,528 kB
  • sloc: cpp: 22,930; python: 13,403; javascript: 11,769; ansic: 817; makefile: 196; sh: 45
file content (73 lines) | stat: -rw-r--r-- 2,293 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# file scalene/scalene_sigqueue.py:8-48
# lines [8, 9, 10, 11, 12, 13, 15, 17, 19, 21, 23, 26, 27, 28, 30, 32, 33, 36, 37, 39, 43, 44, 45, 46, 47, 48]
# branches ['26->exit', '26->27', '32->exit', '32->33', '43->44', '45->46', '45->47']

import pytest
import threading
from typing import Any, Optional, Generic, TypeVar
from scalene.scalene_sigqueue import ScaleneSigQueue
import queue

T = TypeVar('T')

class TestScaleneSigQueue(Generic[T]):
    # Prevent pytest from considering this class as a test
    __test__ = False

    def __init__(self, process: Any) -> None:
        self.queue: queue.SimpleQueue[Optional[T]] = queue.SimpleQueue()
        self.process = process
        self.thread: Optional[threading.Thread] = None
        self.lock = threading.RLock()  # held while processing an item

    def put(self, item: Optional[T]) -> None:
        """Add an item to the queue."""
        self.queue.put(item)

    def get(self) -> Optional[T]:
        """Get one item from the queue."""
        return self.queue.get()

    def start(self) -> None:
        """Start processing."""
        # We use a daemon thread to defensively avoid hanging if we never join with it
        if not self.thread:
            self.thread = threading.Thread(target=self.run, daemon=True)
            self.thread.start()

    def stop(self) -> None:
        """Stop processing."""
        if self.thread:
            self.queue.put(None)
            # We need to join all threads before a fork() to avoid an inconsistent
            # state, locked mutexes, etc.
            self.thread.join()
            self.thread = None

    def run(self) -> None:
        """Run the function processing items until stop is called.

        Executed in a separate thread."""
        while True:
            item = self.queue.get()
            if item is None:  # None => stop request
                break
            with self.lock:
                self.process(*item)

def test_scalene_sigqueue():
    results = []

    def process_function(*args):
        results.append(args)

    sigqueue = TestScaleneSigQueue(process_function)
    sigqueue.start()

    sigqueue.put((1, 2, 3))
    sigqueue.put((4, 5, 6))
    sigqueue.put(None)  # Stop signal

    sigqueue.stop()

    assert results == [(1, 2, 3), (4, 5, 6)]