File: websocket-inject-message.py

package info (click to toggle)
mitmproxy 8.1.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 38,952 kB
  • sloc: python: 53,389; javascript: 1,603; xml: 186; sh: 105; ansic: 68; makefile: 13
file content (37 lines) | stat: -rw-r--r-- 1,094 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""
Inject a WebSocket message into a running connection.

This example shows how to inject a WebSocket message into a running connection.
"""
import asyncio

from mitmproxy import ctx, http


# Simple example: Inject a message as a response to an event


def websocket_message(flow: http.HTTPFlow):
    assert flow.websocket is not None  # make type checker happy
    last_message = flow.websocket.messages[-1]
    if last_message.is_text and "secret" in last_message.text:
        last_message.drop()
        ctx.master.commands.call(
            "inject.websocket", flow, last_message.from_client, b"ssssssh"
        )


# Complex example: Schedule a periodic timer


async def inject_async(flow: http.HTTPFlow):
    msg = "hello from mitmproxy! "
    assert flow.websocket is not None  # make type checker happy
    while flow.websocket.timestamp_end is None:
        ctx.master.commands.call("inject.websocket", flow, True, msg.encode())
        await asyncio.sleep(1)
        msg = msg[1:] + msg[:1]


def websocket_start(flow: http.HTTPFlow):
    asyncio.create_task(inject_async(flow))