File: vpn.py

package info (click to toggle)
python-aiortc 1.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,052 kB
  • sloc: python: 20,890; javascript: 321; makefile: 21; sh: 1
file content (109 lines) | stat: -rw-r--r-- 2,879 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import argparse
import asyncio
import logging

import tuntap
from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.signaling import BYE, add_signaling_arguments, create_signaling

logger = logging.Logger("vpn")


def channel_log(channel, t, message):
    logger.info("channel(%s) %s %s" % (channel.label, t, repr(message)))


async def consume_signaling(pc, signaling):
    while True:
        obj = await signaling.receive()

        if isinstance(obj, RTCSessionDescription):
            await pc.setRemoteDescription(obj)

            if obj.type == "offer":
                # send answer
                await pc.setLocalDescription(await pc.createAnswer())
                await signaling.send(pc.localDescription)
        elif isinstance(obj, RTCIceCandidate):
            await pc.addIceCandidate(obj)
        elif obj is BYE:
            print("Exiting")
            break


def tun_start(tap, channel):
    tap.open()

    # relay channel -> tap
    channel.on("message")(tap.fd.write)

    # relay tap -> channel
    def tun_reader():
        data = tap.fd.read(tap.mtu)
        if data:
            channel.send(data)

    loop = asyncio.get_event_loop()
    loop.add_reader(tap.fd, tun_reader)

    tap.up()


async def run_answer(pc, signaling, tap):
    await signaling.connect()

    @pc.on("datachannel")
    def on_datachannel(channel):
        channel_log(channel, "-", "created by remote party")
        if channel.label == "vpntap":
            tun_start(tap, channel)

    await consume_signaling(pc, signaling)


async def run_offer(pc, signaling, tap):
    await signaling.connect()

    channel = pc.createDataChannel("vpntap")
    channel_log(channel, "-", "created by local party")

    @channel.on("open")
    def on_open():
        tun_start(tap, channel)

    # send offer
    await pc.setLocalDescription(await pc.createOffer())
    await signaling.send(pc.localDescription)

    await consume_signaling(pc, signaling)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="VPN over data channel")
    parser.add_argument("role", choices=["offer", "answer"])
    parser.add_argument("--verbose", "-v", action="count")
    add_signaling_arguments(parser)
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)

    tap = tuntap.Tun(name="revpn-%s" % args.role)

    signaling = create_signaling(args)
    pc = RTCPeerConnection()
    if args.role == "offer":
        coro = run_offer(pc, signaling, tap)
    else:
        coro = run_answer(pc, signaling, tap)

    # run event loop
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(coro)
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(pc.close())
        loop.run_until_complete(signaling.close())
        tap.close()