File: load_simulations.py

package info (click to toggle)
python-sse-starlette 3.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,248 kB
  • sloc: python: 3,481; makefile: 131; sh: 57
file content (214 lines) | stat: -rw-r--r-- 6,574 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# demonstrations/production_scenarios/load_simulation.py
"""
DEMONSTRATION: Load Testing with Multiple Concurrent Clients

PURPOSE:
Shows how SSE server behaves under load with many concurrent connections.

KEY LEARNING:
- Resource usage scales with client count
- Memory and connection management is critical
- Performance characteristics of SSE at scale

PATTERN:
Controlled load testing to understand SSE performance limits.
"""

import asyncio
import time
import httpx
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from sse_starlette import EventSourceResponse


class LoadTestServer:
    """
    SSE server instrumented for load testing.
    Tracks connections, memory usage, and performance metrics.
    """

    def __init__(self):
        self.active_connections = 0
        self.total_connections = 0
        self.events_sent = 0
        self.start_time = time.time()

    def connection_started(self):
        self.active_connections += 1
        self.total_connections += 1
        print(f"šŸ“ˆ Active: {self.active_connections}, Total: {self.total_connections}")

    def connection_ended(self):
        self.active_connections -= 1
        print(f"šŸ“‰ Active: {self.active_connections}")

    def event_sent(self):
        self.events_sent += 1

    @property
    def stats(self):
        uptime = time.time() - self.start_time
        return {
            "active_connections": self.active_connections,
            "total_connections": self.total_connections,
            "events_sent": self.events_sent,
            "uptime_seconds": uptime,
            "events_per_second": self.events_sent / uptime if uptime > 0 else 0
        }


# Global server instance
server = LoadTestServer()


async def load_test_stream(request: Request):
    """
    Stream optimized for load testing.
    Minimal processing to focus on connection handling.
    """
    connection_id = id(request)
    server.connection_started()

    try:
        # Send events with minimal delay for load testing
        for i in range(10):  # Limited events per connection
            if await request.is_disconnected():
                break

            yield {"data": f"Event {i}", "id": str(i)}
            server.event_sent()
            await asyncio.sleep(0.1)  # Fast events for load testing

    finally:
        server.connection_ended()


async def sse_endpoint(request: Request):
    return EventSourceResponse(load_test_stream(request))


async def stats_endpoint(request: Request):
    from starlette.responses import JSONResponse
    return JSONResponse(server.stats)


# Test application
app = Starlette(routes=[
    Route("/events", sse_endpoint),
    Route("/stats", stats_endpoint)
])


class LoadTestClient:
    """
    Client that simulates realistic SSE usage patterns.
    """

    def __init__(self, client_id, base_url):
        self.client_id = client_id
        self.base_url = base_url
        self.events_received = 0
        self.start_time = None
        self.end_time = None

    async def run(self):
        """Run the client simulation."""
        self.start_time = time.time()

        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                async with client.stream("GET", f"{self.base_url}/events") as response:
                    async for line in response.aiter_lines():
                        if line.strip():
                            self.events_received += 1

                            # Simulate client processing time
                            await asyncio.sleep(0.01)

        except Exception as e:
            print(f"āŒ Client {self.client_id} error: {type(e).__name__}")

        finally:
            self.end_time = time.time()

    @property
    def duration(self):
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        return 0


async def run_load_test(num_clients=10, base_url="http://localhost:8000"):
    """
    Run load test with specified number of concurrent clients.
    """
    print(f"šŸš€ Starting load test with {num_clients} clients...")

    # Create client tasks
    clients = [LoadTestClient(i, base_url) for i in range(num_clients)]
    client_tasks = [client.run() for client in clients]

    # Track progress
    async def progress_monitor():
        for _ in range(10):  # Monitor for 10 seconds
            await asyncio.sleep(1)

            # Get server stats
            async with httpx.AsyncClient() as http_client:
                try:
                    response = await http_client.get(f"{base_url}/stats")
                    stats = response.json()
                    print(f"šŸ“Š Active: {stats['active_connections']}, "
                          f"Events/sec: {stats['events_per_second']:.1f}")
                except:
                    pass

    # Run load test
    start_time = time.time()
    await asyncio.gather(
        *client_tasks,
        progress_monitor(),
        return_exceptions=True
    )

    # Analyze results
    total_duration = time.time() - start_time
    successful_clients = [c for c in clients if c.events_received > 0]

    print(f"\nšŸ“ˆ Load Test Results:")
    print(f"   Clients: {num_clients}")
    print(f"   Successful: {len(successful_clients)}")
    print(f"   Duration: {total_duration:.1f}s")
    print(f"   Total events: {sum(c.events_received for c in clients)}")
    print(f"   Avg events per client: {sum(c.events_received for c in clients) / len(clients):.1f}")


if __name__ == "__main__":
    """
    DEMONSTRATION STEPS:
    1. Run server: python load_simulation.py (in one terminal)
    2. Run clients: python production_scenarios/load_simulations.py test 20 (in another terminal)
    3. Monitor resource usage with system tools
    4. Experiment with different client counts

    PERFORMANCE INSIGHTS:
    - Memory usage scales linearly with connections
    - CPU usage depends on event frequency
    - Network becomes bottleneck with many clients
    - Connection limits are OS and application dependent
    """
    import uvicorn
    import sys

    if len(sys.argv) > 1 and sys.argv[1] == "test":
        # Run as load test client
        num_clients = int(sys.argv[2]) if len(sys.argv) > 2 else 10
        asyncio.run(run_load_test(num_clients))
    else:
        # Run as server
        print("šŸš€ Starting SSE load test server...")
        print("šŸ“‹ Run load test with: python load_simulation.py test 20")
        uvicorn.run(app, host="localhost", port=8000, log_level="error")