1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
import fractions
from unittest import TestCase
from aiortc.codecs import depayload, get_decoder, get_encoder
from aiortc.jitterbuffer import JitterFrame
from aiortc.mediastreams import AUDIO_PTIME, VIDEO_TIME_BASE
from av import AudioFrame, VideoFrame
from av.packet import Packet
class CodecTestCase(TestCase):
def create_audio_frame(self, samples, pts, layout="mono", sample_rate=48000):
frame = AudioFrame(format="s16", layout=layout, samples=samples)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = pts
frame.sample_rate = sample_rate
frame.time_base = fractions.Fraction(1, sample_rate)
return frame
def create_audio_frames(self, layout, sample_rate, count):
frames = []
timestamp = 0
samples_per_frame = int(AUDIO_PTIME * sample_rate)
for i in range(count):
frames.append(
self.create_audio_frame(
samples=samples_per_frame,
pts=timestamp,
layout=layout,
sample_rate=sample_rate,
)
)
timestamp += samples_per_frame
return frames
def create_packet(self, payload: bytes, pts: int) -> Packet:
"""
Create a packet.
"""
packet = Packet(len(payload))
packet.update(payload)
packet.pts = pts
packet.time_base = fractions.Fraction(1, 1000)
return packet
def create_video_frame(
self, width, height, pts, format="yuv420p", time_base=VIDEO_TIME_BASE
):
"""
Create a single blank video frame.
"""
frame = VideoFrame(width=width, height=height, format=format)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = pts
frame.time_base = time_base
return frame
def create_video_frames(self, width, height, count, time_base=VIDEO_TIME_BASE):
"""
Create consecutive blank video frames.
"""
frames = []
for i in range(count):
frames.append(
self.create_video_frame(
width=width,
height=height,
pts=int(i / time_base / 30),
time_base=time_base,
)
)
return frames
def roundtrip_audio(
self,
codec,
output_layout,
output_sample_rate,
input_layout="mono",
input_sample_rate=8000,
drop=[],
):
"""
Round-trip an AudioFrame through encoder then decoder.
"""
encoder = get_encoder(codec)
decoder = get_decoder(codec)
input_frames = self.create_audio_frames(
layout=input_layout, sample_rate=input_sample_rate, count=10
)
output_sample_count = int(output_sample_rate * AUDIO_PTIME)
for i, frame in enumerate(input_frames):
# encode
packages, timestamp = encoder.encode(frame)
if i not in drop:
# depacketize
data = b""
for package in packages:
data += depayload(codec, package)
# decode
frames = decoder.decode(JitterFrame(data=data, timestamp=timestamp))
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].format.name, "s16")
self.assertEqual(frames[0].layout.name, output_layout)
self.assertEqual(frames[0].samples, output_sample_rate * AUDIO_PTIME)
self.assertEqual(frames[0].sample_rate, output_sample_rate)
self.assertEqual(frames[0].pts, i * output_sample_count)
self.assertEqual(
frames[0].time_base, fractions.Fraction(1, output_sample_rate)
)
def roundtrip_video(self, codec, width, height, time_base=VIDEO_TIME_BASE):
"""
Round-trip a VideoFrame through encoder then decoder.
"""
encoder = get_encoder(codec)
decoder = get_decoder(codec)
input_frames = self.create_video_frames(
width=width, height=height, count=30, time_base=time_base
)
for i, frame in enumerate(input_frames):
# encode
packages, timestamp = encoder.encode(frame)
# depacketize
data = b""
for package in packages:
data += depayload(codec, package)
# decode
frames = decoder.decode(JitterFrame(data=data, timestamp=timestamp))
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].width, frame.width)
self.assertEqual(frames[0].height, frame.height)
self.assertEqual(frames[0].pts, i * 3000)
self.assertEqual(frames[0].time_base, VIDEO_TIME_BASE)
|