File: test_multi_loop.py

package info (click to toggle)
python-sse-starlette 3.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,248 kB
  • sloc: python: 3,481; makefile: 131; sh: 57
file content (196 lines) | stat: -rw-r--r-- 6,464 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Tests for multi-loop and thread safety scenarios to verify Issue #140 fix.
"""

import asyncio
import threading
import time
from typing import List

import pytest

from sse_starlette.sse import AppStatus, EventSourceResponse, _exit_event_context


class TestMultiLoopSafety:
    """Test suite for multi-loop and thread safety."""

    def setup_method(self):
        """Reset AppStatus before each test."""
        AppStatus.should_exit = False

    def teardown_method(self):
        """Clean up after each test."""
        AppStatus.should_exit = False

    def test_context_isolation_same_thread(self):
        """Test that exit events are isolated between different contexts in same thread."""

        async def create_and_check_event():
            # Each call should get its own event
            event1 = AppStatus.get_or_create_exit_event()
            event2 = AppStatus.get_or_create_exit_event()

            # Should be the same within same context
            assert event1 is event2
            return event1

        # Run in different asyncio event loops (different contexts)
        loop1 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop1)
        try:
            event_a = loop1.run_until_complete(create_and_check_event())
        finally:
            loop1.close()

        loop2 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop2)
        try:
            event_b = loop2.run_until_complete(create_and_check_event())
        finally:
            loop2.close()

        # Events from different loops should be different objects
        assert event_a is not event_b

    def test_thread_isolation(self):
        """Test that exit events are isolated between different threads."""
        events: List = []
        errors: List = []

        def create_event_in_thread():
            """Create an event in a new thread with its own event loop."""
            try:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)

                async def get_event():
                    return AppStatus.get_or_create_exit_event()

                event = loop.run_until_complete(get_event())
                events.append(event)
                loop.close()
            except Exception as e:
                errors.append(e)

        # Create events in multiple threads
        threads = []
        for _ in range(3):
            thread = threading.Thread(target=create_event_in_thread)
            threads.append(thread)
            thread.start()

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

        # Should have no errors
        assert not errors, f"Errors occurred: {errors}"

        # Should have 3 different events
        assert len(events) == 3
        assert len(set(id(event) for event in events)) == 3, "Events should be unique"

    def test_exit_signal_propagation_multiple_contexts(self):
        """Test that exit signals properly propagate to multiple contexts."""

        # Test that exit signal set before waiting works correctly
        AppStatus.should_exit = True

        async def quick_exit_test():
            await EventSourceResponse._listen_for_exit_signal()
            return "exited"

        # Test in single loop first
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            result = loop.run_until_complete(quick_exit_test())
            assert result == "exited"
        finally:
            loop.close()

    @pytest.mark.asyncio
    async def test_context_cleanup(self):
        """Test that context variables are properly cleaned up."""

        # Create an event in current context
        initial_event = AppStatus.get_or_create_exit_event()
        assert _exit_event_context.get() is initial_event

        # Verify we can create new contexts
        async def inner_context():
            # This should create a new event in the task context
            return AppStatus.get_or_create_exit_event()

        # Create task which runs in a copied context
        task_event = await asyncio.create_task(inner_context())

        # The task should have access to the same event (context is copied)
        assert task_event is initial_event

    @pytest.mark.asyncio
    async def test_exit_before_event_creation(self):
        """Test that exit signal works even when set before event creation."""

        # Set exit before any event is created
        AppStatus.should_exit = True

        # This should return immediately without waiting
        start_time = time.time()
        await EventSourceResponse._listen_for_exit_signal()
        elapsed = time.time() - start_time

        # Should return almost immediately (less than 0.1 seconds)
        assert elapsed < 0.1

    @pytest.mark.asyncio
    async def test_race_condition_protection(self):
        """Test protection against race conditions during event setup."""

        # Set exit before creating tasks to avoid hanging
        AppStatus.should_exit = True

        # Multiple concurrent calls should all work correctly
        tasks = [
            asyncio.create_task(EventSourceResponse._listen_for_exit_signal())
            for _ in range(3)
        ]

        # All tasks should complete quickly
        results = await asyncio.gather(*tasks)
        assert len(results) == 3

    def test_no_global_state_pollution(self):
        """Test that global state is not polluted between test runs."""

        # Verify clean state
        assert not AppStatus.should_exit
        assert _exit_event_context.get(None) is None

        # Create an event
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:

            async def create_event():
                return AppStatus.get_or_create_exit_event()

            event = loop.run_until_complete(create_event())
            assert event is not None
        finally:
            loop.close()

        # After loop closes, context should be clean for new contexts
        # (This test verifies we don't have lingering global state)
        loop2 = asyncio.new_event_loop()
        asyncio.set_event_loop(loop2)
        try:

            async def create_new_event():
                return AppStatus.get_or_create_exit_event()

            new_event = loop2.run_until_complete(create_new_event())
            assert new_event is not None
        finally:
            loop2.close()