File: non_tls_server.py

package info (click to toggle)
aws-crt-python 0.16.8%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 78,328 kB
  • sloc: ansic: 330,743; python: 18,949; makefile: 6,271; sh: 3,712; asm: 754; cpp: 699; ruby: 208; java: 77; perl: 73; javascript: 46; xml: 11
file content (56 lines) | stat: -rw-r--r-- 1,290 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# example server from http://python-hyper.org/projects/h2/en/stable/basic-usage.html

import json
import socket

import h2.connection
import h2.events
import h2.config


def send_response(conn, event):
    stream_id = event.stream_id
    response_data = b"success"

    conn.send_headers(
        stream_id=stream_id,
        headers=[
            (':status', '200'),
            ('content-length', str(len(response_data))),
        ],
    )
    conn.send_data(
        stream_id=stream_id,
        data=response_data,
        end_stream=True
    )


def handle(sock):
    config = h2.config.H2Configuration(client_side=False)
    conn = h2.connection.H2Connection(config=config)
    conn.initiate_connection()
    sock.sendall(conn.data_to_send())

    while True:
        data = sock.recv(65535)
        if not data:
            break

        events = conn.receive_data(data)
        for event in events:
            if isinstance(event, h2.events.RequestReceived):
                send_response(conn, event)

        data_to_send = conn.data_to_send()
        if data_to_send:
            sock.sendall(data_to_send)


sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', 3280))
sock.listen(5)

while True:
    handle(sock.accept()[0])