File: test_coverup_17.py

package info (click to toggle)
scalene 1.5.51-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 15,528 kB
  • sloc: cpp: 22,930; python: 13,403; javascript: 11,769; ansic: 817; makefile: 196; sh: 45
file content (56 lines) | stat: -rw-r--r-- 1,905 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# file scalene/scalene_sigqueue.py:8-48
# lines [10, 11, 12, 13, 17, 21, 26, 27, 28, 32, 33, 36, 37, 43, 44, 45, 46, 47, 48]
# branches ['26->exit', '26->27', '32->exit', '32->33', '43->44', '45->46', '45->47']

import pytest
import threading
from typing import Optional, Any, Generic, TypeVar
from scalene.scalene_sigqueue import ScaleneSigQueue
import queue

T = TypeVar('T')

class TestScaleneSigQueue(Generic[T]):
    # Prevent pytest from considering this class as a test
    __test__ = False

    def test_scalene_sigqueue(self):
        # Define a process function that will be called by the queue
        def process_function(*args):
            assert args == (1, 2, 3), "The process function did not receive the expected arguments."

        # Create an instance of ScaleneSigQueue
        sigqueue = ScaleneSigQueue(process_function)

        # Start the queue processing
        sigqueue.start()
        assert sigqueue.thread is not None and sigqueue.thread.is_alive(), "The thread should be started and alive."

        # Put an item into the queue
        sigqueue.put((1, 2, 3))
        # Allow some time for the thread to process the item
        threading.Event().wait(0.1)

        # Stop the queue processing
        sigqueue.stop()
        assert sigqueue.thread is None, "The thread should be stopped and set to None."

        # Test get method
        sigqueue.put(None)
        item = sigqueue.get()
        assert item is None, "The get method should return None."

        # Test put method
        sigqueue.put((1, 2, 3))
        item = sigqueue.get()
        assert item == (1, 2, 3), "The put method should add the item to the queue."

        # Clean up
        sigqueue.stop()

@pytest.fixture(autouse=True)
def run_around_tests():
    # Before each test
    yield
    # After each test
    # No cleanup needed as the test_scalene_sigqueue function handles its own cleanup