1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
import argparse
import asyncio
import logging
import math
import cv2
import numpy
from aiortc import (
RTCIceCandidate,
RTCPeerConnection,
RTCSessionDescription,
VideoStreamTrack,
)
from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder
from aiortc.contrib.signaling import BYE, add_signaling_arguments, create_signaling
from av import VideoFrame
class FlagVideoStreamTrack(VideoStreamTrack):
"""
A video track that returns an animated flag.
"""
def __init__(self):
super().__init__() # don't forget this!
self.counter = 0
height, width = 480, 640
# generate flag
data_bgr = numpy.hstack(
[
self._create_rectangle(
width=213, height=480, color=(255, 0, 0)
), # blue
self._create_rectangle(
width=214, height=480, color=(255, 255, 255)
), # white
self._create_rectangle(width=213, height=480, color=(0, 0, 255)), # red
]
)
# shrink and center it
M = numpy.float32([[0.5, 0, width / 4], [0, 0.5, height / 4]])
data_bgr = cv2.warpAffine(data_bgr, M, (width, height))
# compute animation
omega = 2 * math.pi / height
id_x = numpy.tile(numpy.array(range(width), dtype=numpy.float32), (height, 1))
id_y = numpy.tile(
numpy.array(range(height), dtype=numpy.float32), (width, 1)
).transpose()
self.frames = []
for k in range(30):
phase = 2 * k * math.pi / 30
map_x = id_x + 10 * numpy.cos(omega * id_x + phase)
map_y = id_y + 10 * numpy.sin(omega * id_x + phase)
self.frames.append(
VideoFrame.from_ndarray(
cv2.remap(data_bgr, map_x, map_y, cv2.INTER_LINEAR), format="bgr24"
)
)
async def recv(self):
pts, time_base = await self.next_timestamp()
frame = self.frames[self.counter % 30]
frame.pts = pts
frame.time_base = time_base
self.counter += 1
return frame
def _create_rectangle(self, width, height, color):
data_bgr = numpy.zeros((height, width, 3), numpy.uint8)
data_bgr[:, :] = color
return data_bgr
async def run(pc, player, recorder, signaling, role):
def add_tracks():
if player and player.audio:
pc.addTrack(player.audio)
if player and player.video:
pc.addTrack(player.video)
else:
pc.addTrack(FlagVideoStreamTrack())
@pc.on("track")
def on_track(track):
print("Receiving %s" % track.kind)
recorder.addTrack(track)
# connect signaling
await signaling.connect()
if role == "offer":
# send offer
add_tracks()
await pc.setLocalDescription(await pc.createOffer())
await signaling.send(pc.localDescription)
# consume signaling
while True:
obj = await signaling.receive()
if isinstance(obj, RTCSessionDescription):
await pc.setRemoteDescription(obj)
await recorder.start()
if obj.type == "offer":
# send answer
add_tracks()
await pc.setLocalDescription(await pc.createAnswer())
await signaling.send(pc.localDescription)
elif isinstance(obj, RTCIceCandidate):
await pc.addIceCandidate(obj)
elif obj is BYE:
print("Exiting")
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Video stream from the command line")
parser.add_argument("role", choices=["offer", "answer"])
parser.add_argument("--play-from", help="Read the media from a file and sent it.")
parser.add_argument("--record-to", help="Write received media to a file.")
parser.add_argument("--verbose", "-v", action="count")
add_signaling_arguments(parser)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
# create signaling and peer connection
signaling = create_signaling(args)
pc = RTCPeerConnection()
# create media source
if args.play_from:
player = MediaPlayer(args.play_from)
else:
player = None
# create media sink
if args.record_to:
recorder = MediaRecorder(args.record_to)
else:
recorder = MediaBlackhole()
# run event loop
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
run(
pc=pc,
player=player,
recorder=recorder,
signaling=signaling,
role=args.role,
)
)
except KeyboardInterrupt:
pass
finally:
# cleanup
loop.run_until_complete(recorder.stop())
loop.run_until_complete(signaling.close())
loop.run_until_complete(pc.close())
|