File: ng-load-tester.py

package info (click to toggle)
rtpengine 13.5.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,676 kB
  • sloc: ansic: 86,764; perl: 59,422; python: 3,193; sh: 1,030; makefile: 693; asm: 211
file content (135 lines) | stat: -rw-r--r-- 3,060 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import base64
import fastbencode
import json
import random
import socket


def conv(e):
    if type(e) is str:
        return bytes(e, "ASCII")
    if type(e) is dict:
        n = {}
        for k, v in e.items():
            n[bytes(k, "ASCII")] = conv(v)
        return n
    if type(e) is list:
        n = []
        for v in e:
            n.append(conv(v))
        return n
    return e


def benc_enc(msg):
    return fastbencode.bencode(conv(msg))


def json_enc(msg):
    return bytes(json.dumps(msg), "ASCII")


addr = "127.0.0.1"
port = 2223

fmt = "bencode"
iters = 200
requests = 5000
cmd = "offer"

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

if cmd == "offer":
    msg = {
        "command": "offer",
        "from-tag": "bar",
        "to-tag": "meh",
        "sdp": """
    v=0
    o=- 1695296331 1695296331 IN IP4 192.168.1.202
    s=-
    t=0 0
    c=IN IP4 192.168.1.202
    m=audio 45825 RTP/AVP 0 8 101
    a=rtpmap:0 PCMU/8000/1
    a=rtpmap:8 PCMA/8000/1
    a=rtpmap:101 telephone-event/8000
    a=sendrecv
    """,
        "replace": ["origin"],
    }
elif cmd == "answer":
    msg = {
        "command": "answer",
        "call-id": "foo",
        "from-tag": "bar",
        "to-tag": "meh",
        "sdp": """
    v=0
    o=- 1695296331 1695296331 IN IP4 192.168.1.202
    s=-
    t=0 0
    c=IN IP4 192.168.1.202
    m=audio 45825 UDP/TLS/RTP/SAVPF 0 8 101
    a=setup:active
    a=fingerprint:sha-256 49:05:98:B2:15:43:1C:9C:4F:29:07:60:F8:63:77:16:80:F9:44:C0:97:8E:E5:48:D6:71:B4:03:10:85:D6:E3
    a=rtpmap:0 PCMU/8000/1
    a=rtpmap:8 PCMA/8000/1
    a=rtpmap:101 telephone-event/8000
    a=rtcp-mux
    a=rtcprsize
    a=sendrecv
    """,
        "flags": ["generate RTCP", "pad crypto", "symmetric codecs"],
        "ICE": "remove",
        "codec": {
            "mask": ["opus", "PCMA", "PCMU"],
            "transcode": ["G722", "AMR"],
            "strip": ["AMR-WB", "EVS"],
        },
        "transport-protocol": "RTP/AVP",
        "replace": ["origin"],
        "rtcp-mux": ["demux"],
    }
elif cmd == "statistics":
    msg = {"command": "statistics"}

enc = None

if fmt == "json":
    fn = json_enc
elif fmt == "bencode":
    fn = benc_enc
else:
    raise

if "call-id" in msg:
    enc = fn(msg)

for _ in range(iters):
    call_ids = []

    for _ in range(requests):
        if enc:
            packet = base64.b64encode(random.randbytes(6)) + b" " + enc
        else:
            call_id = str(base64.b64encode(random.randbytes(6)))
            call_ids.append(call_id)
            packet = (
                base64.b64encode(random.randbytes(6))
                + b" "
                + fn({**msg, "call-id": call_id})
            )
        sock.sendto(packet, (addr, port))
        sock.recvfrom(4096)

    for call_id in call_ids:
        packet = (
            base64.b64encode(random.randbytes(6))
            + b" "
            + fn({"command": "delete", "call-id": call_id, "delete-delay": 0})
        )
        sock.sendto(packet, (addr, port))
        sock.recvfrom(4096)

print("done")