1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
# file scalene/scalene_sigqueue.py:8-48
# lines [10, 11, 12, 13, 17, 21, 26, 27, 28, 32, 33, 36, 37, 43, 44, 45, 46, 47, 48]
# branches ['26->exit', '26->27', '32->exit', '32->33', '43->44', '45->46', '45->47']
import pytest
import threading
from typing import Optional, Any, Generic, TypeVar
from scalene.scalene_sigqueue import ScaleneSigQueue
import queue
T = TypeVar('T')
class TestScaleneSigQueue(Generic[T]):
# Prevent pytest from considering this class as a test
__test__ = False
def test_scalene_sigqueue(self):
# Define a process function that will be called by the queue
def process_function(*args):
assert args == (1, 2, 3), "The process function did not receive the expected arguments."
# Create an instance of ScaleneSigQueue
sigqueue = ScaleneSigQueue(process_function)
# Start the queue processing
sigqueue.start()
assert sigqueue.thread is not None and sigqueue.thread.is_alive(), "The thread should be started and alive."
# Put an item into the queue
sigqueue.put((1, 2, 3))
# Allow some time for the thread to process the item
threading.Event().wait(0.1)
# Stop the queue processing
sigqueue.stop()
assert sigqueue.thread is None, "The thread should be stopped and set to None."
# Test get method
sigqueue.put(None)
item = sigqueue.get()
assert item is None, "The get method should return None."
# Test put method
sigqueue.put((1, 2, 3))
item = sigqueue.get()
assert item == (1, 2, 3), "The put method should add the item to the queue."
# Clean up
sigqueue.stop()
@pytest.fixture(autouse=True)
def run_around_tests():
# Before each test
yield
# After each test
# No cleanup needed as the test_scalene_sigqueue function handles its own cleanup
|