File: network_interruption.py

package info (click to toggle)
python-sse-starlette 3.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,248 kB
  • sloc: python: 3,481; makefile: 131; sh: 57
file content (300 lines) | stat: -rw-r--r-- 10,319 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# demonstrations/production_scenarios/network_interruption.py
"""
DEMONSTRATION: Network Interruption Handling

PURPOSE:
Shows how SSE connections behave during network issues like packet loss,
temporary disconnections, and connection timeouts.

KEY LEARNING:
- Network issues cause immediate SSE connection failures
- Clients must implement reconnection logic
- Server-side timeouts help detect dead connections

PATTERN:
Simulating network conditions to test SSE resilience.
"""

import asyncio
import time
import httpx
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from starlette.middleware.base import BaseHTTPMiddleware
from sse_starlette import EventSourceResponse


class NetworkSimulationMiddleware(BaseHTTPMiddleware):
    """
    Middleware that simulates network conditions.
    """

    def __init__(self, app):
        super().__init__(app)
        self.simulate_delay = False
        self.simulate_failure = False
        self.delay_duration = 2.0

    async def dispatch(self, request, call_next):
        # Simulate network delay
        if self.simulate_delay and request.url.path == "/events":
            print(f"๐ŸŒ Simulating {self.delay_duration}s network delay...")
            await asyncio.sleep(self.delay_duration)

        # Simulate network failure
        if self.simulate_failure and request.url.path == "/events":
            print("๐Ÿ’ฅ Simulating network failure!")
            from starlette.responses import Response
            return Response("Network Error", status_code=503)

        return await call_next(request)


# Global middleware instance for control
network_middleware = None


async def robust_stream(request: Request):
    """
    Stream that's designed to handle network issues gracefully.
    """
    connection_id = id(request)
    print(f"๐Ÿ”— Stream {connection_id} started")

    try:
        for i in range(1, 30):
            # Check for client disconnect frequently
            if await request.is_disconnected():
                print(f"๐Ÿ”Œ Client {connection_id} disconnected at event {i}")
                break

            # Send heartbeat and data
            yield {
                "data": f"Event {i} - timestamp: {time.time():.2f}",
                "id": str(i),
                "event": "data"
            }

            # Regular interval - important for detecting dead connections
            await asyncio.sleep(1)

    except Exception as e:
        print(f"๐Ÿ’ฅ Stream {connection_id} error: {e}")
        yield {"data": "Stream error occurred", "event": "error"}
        raise

    finally:
        print(f"๐Ÿงน Stream {connection_id} cleanup completed")


async def sse_endpoint(request: Request):
    """SSE endpoint with network resilience."""
    return EventSourceResponse(
        robust_stream(request),
        ping=5,  # Send ping every 5 seconds to detect dead connections
        send_timeout=10.0  # Timeout sends after 10 seconds
    )


async def control_endpoint(request: Request):
    """Control endpoint to simulate network conditions."""
    from starlette.responses import JSONResponse
    from urllib.parse import parse_qs

    query = parse_qs(str(request.query_params))

    if "delay" in query:
        network_middleware.simulate_delay = query["delay"][0].lower() == "true"
        network_middleware.delay_duration = float(query.get("duration", ["2.0"])[0])

    if "failure" in query:
        network_middleware.simulate_failure = query["failure"][0].lower() == "true"

    return JSONResponse({
        "network_delay": network_middleware.simulate_delay,
        "delay_duration": network_middleware.delay_duration,
        "network_failure": network_middleware.simulate_failure
    })


# Create app with network simulation
app = Starlette(routes=[
    Route("/events", sse_endpoint),
    Route("/control", control_endpoint)
])

# Add network simulation middleware
network_middleware = NetworkSimulationMiddleware(app)
app = network_middleware


class ResilientSSEClient:
    """
    Client with automatic reconnection and error handling.
    Demonstrates production-ready SSE client patterns.
    """

    def __init__(self, base_url, max_retries=3):
        self.base_url = base_url
        self.max_retries = max_retries
        self.events_received = 0
        self.connection_attempts = 0
        self.last_event_id = None

    async def connect_with_retry(self):
        """Connect with exponential backoff retry logic."""

        for attempt in range(self.max_retries + 1):
            self.connection_attempts += 1

            try:
                print(f"๐Ÿ”„ Connection attempt {attempt + 1}/{self.max_retries + 1}")
                await self._connect()
                break  # Success

            except Exception as e:
                print(f"โŒ Attempt {attempt + 1} failed: {type(e).__name__}")

                if attempt < self.max_retries:
                    # Exponential backoff: 1s, 2s, 4s, 8s...
                    delay = 2 ** attempt
                    print(f"โณ Retrying in {delay}s...")
                    await asyncio.sleep(delay)
                else:
                    print("๐Ÿ’€ All retry attempts exhausted")
                    raise

    async def _connect(self):
        """Single connection attempt."""
        headers = {}

        # Include Last-Event-ID for resumption
        if self.last_event_id:
            headers["Last-Event-ID"] = self.last_event_id
            print(f"๐Ÿ“ Resuming from event ID: {self.last_event_id}")

        async with httpx.AsyncClient(timeout=15.0) as client:
            async with client.stream("GET", f"{self.base_url}/events", headers=headers) as response:

                # Check response status
                if response.status_code != 200:
                    raise httpx.HTTPStatusError(f"HTTP {response.status_code}", request=None, response=response)

                print(f"โœ… Connected successfully")

                async for line in response.aiter_lines():
                    if line.strip():
                        self.events_received += 1

                        # Parse event ID for resumption
                        if line.startswith("id: "):
                            self.last_event_id = line[4:]

                        print(f"๐Ÿ“จ Event {self.events_received}: {line[:50]}...")

                        # Simulate client processing
                        await asyncio.sleep(0.1)


async def demonstrate_network_issues():
    """
    Demonstrates different network failure scenarios.
    """
    print("๐ŸŒ Network Interruption Demonstrations\n")

    client = ResilientSSEClient("http://localhost:8000")

    async def scenario_1_normal_connection():
        """Normal operation baseline."""
        print("๐Ÿ“ก Scenario 1: Normal Connection")

        # Reset network conditions
        async with httpx.AsyncClient() as http_client:
            await http_client.get("http://localhost:8000/control?delay=false&failure=false")

        try:
            # Connect for 5 seconds
            await asyncio.wait_for(client.connect_with_retry(), timeout=5.0)
        except asyncio.TimeoutError:
            print("โœ… Normal connection worked for 5 seconds")

        print(f"๐Ÿ“Š Events received: {client.events_received}\n")

    async def scenario_2_network_delay():
        """Connection with network delays."""
        print("๐Ÿ“ก Scenario 2: Network Delays")

        # Enable network delay
        async with httpx.AsyncClient() as http_client:
            await http_client.get("http://localhost:8000/control?delay=true&duration=3.0")

        start_time = time.time()

        try:
            await asyncio.wait_for(client.connect_with_retry(), timeout=10.0)
        except asyncio.TimeoutError:
            duration = time.time() - start_time
            print(f"โฑ๏ธ  Connection with delays lasted {duration:.1f}s")

        print(f"๐Ÿ“Š Additional events: {client.events_received}\n")

    async def scenario_3_connection_failure():
        """Connection failures with retry."""
        print("๐Ÿ“ก Scenario 3: Connection Failures")

        # Enable network failures
        async with httpx.AsyncClient() as http_client:
            await http_client.get("http://localhost:8000/control?failure=true")

        try:
            await client.connect_with_retry()
        except Exception as e:
            print(f"๐Ÿ’€ Expected failure: {type(e).__name__}")

        # Restore normal operation
        async with httpx.AsyncClient() as http_client:
            await http_client.get("http://localhost:8000/control?failure=false")

        print("๐Ÿ”„ Testing recovery after failure...")
        try:
            await asyncio.wait_for(client.connect_with_retry(), timeout=3.0)
            print("โœ… Successfully recovered!")
        except:
            print("โŒ Recovery failed")

    # Run scenarios
    await scenario_1_normal_connection()
    await scenario_2_network_delay()
    await scenario_3_connection_failure()

    print(f"๐Ÿ“Š Total events received: {client.events_received}")
    print(f"๐Ÿ”„ Total connection attempts: {client.connection_attempts}")


if __name__ == "__main__":
    """
    DEMONSTRATION STEPS:
    1. Start server: python network_interruption.py
    2. Run client test: python -c "import asyncio; from network_interruption import demonstrate_network_issues; asyncio.run(demonstrate_network_issues())"
    3. Observe how client handles different network conditions

    PRODUCTION INSIGHTS:
    - Always implement client-side retry logic
    - Use Last-Event-ID header for stream resumption
    - Set appropriate timeouts on both client and server
    - Monitor connection health with pings
    - Handle partial message delivery gracefully
    """
    import uvicorn
    import sys

    if len(sys.argv) > 1 and sys.argv[1] == "demo":
        # Run client demonstration
        asyncio.run(demonstrate_network_issues())
    else:
        # Run server
        print("๐Ÿš€ Starting network interruption test server...")
        print("๐Ÿ“‹ Run demo with: python network_interruption.py demo")
        print("๐ŸŽ›๏ธ  Control network: curl 'http://localhost:8000/control?delay=true&duration=5'")
        uvicorn.run(app, host="localhost", port=8000, log_level="error")