1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
import fractions
from typing import Optional
from aiortc.codecs import depayload, get_decoder, get_encoder
from aiortc.jitterbuffer import JitterFrame
from aiortc.mediastreams import AUDIO_PTIME, VIDEO_TIME_BASE
from aiortc.rtcrtpparameters import RTCRtpCodecParameters
from av import AudioFrame, VideoFrame
from av.frame import Frame
from av.packet import Packet
from .utils import TestCase
class CodecTestCase(TestCase):
def assertAudioFrame(
self,
frame: Frame,
*,
layout: str,
pts: int,
samples: int,
sample_rate: int,
data: Optional[bytes],
) -> None:
assert isinstance(frame, AudioFrame)
self.assertEqual(frame.format.name, "s16")
self.assertEqual(frame.layout.name, layout)
self.assertEqual(frame.pts, pts)
self.assertEqual(frame.samples, samples)
self.assertEqual(frame.sample_rate, sample_rate)
self.assertEqual(frame.time_base, fractions.Fraction(1, sample_rate))
if data is not None:
plane_data = bytes(frame.planes[0])
self.assertEqual(plane_data[: len(data)], data)
def create_audio_frame(
self, samples: int, pts: int, layout: str = "mono", sample_rate: int = 48000
) -> AudioFrame:
frame = AudioFrame(format="s16", layout=layout, samples=samples)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = pts
frame.sample_rate = sample_rate
frame.time_base = fractions.Fraction(1, sample_rate)
return frame
def create_audio_frames(
self, layout: str, sample_rate: int, count: int
) -> list[AudioFrame]:
frames = []
timestamp = 0
samples_per_frame = int(AUDIO_PTIME * sample_rate)
for i in range(count):
frames.append(
self.create_audio_frame(
samples=samples_per_frame,
pts=timestamp,
layout=layout,
sample_rate=sample_rate,
)
)
timestamp += samples_per_frame
return frames
def create_packet(self, payload: bytes, pts: int) -> Packet:
"""
Create a packet.
"""
packet = Packet(len(payload))
packet.update(payload)
packet.pts = pts
packet.time_base = fractions.Fraction(1, 1000)
return packet
def create_video_frame(
self,
width: int,
height: int,
pts: int,
format: str = "yuv420p",
time_base: fractions.Fraction = VIDEO_TIME_BASE,
) -> VideoFrame:
"""
Create a single blank video frame.
"""
frame = VideoFrame(width=width, height=height, format=format)
for p in frame.planes:
p.update(bytes(p.buffer_size))
frame.pts = pts
frame.time_base = time_base
return frame
def create_video_frames(
self,
width: int,
height: int,
count: int,
time_base: fractions.Fraction = VIDEO_TIME_BASE,
) -> list[VideoFrame]:
"""
Create consecutive blank video frames.
"""
frames = []
for i in range(count):
frames.append(
self.create_video_frame(
width=width,
height=height,
pts=int(i / time_base / 30),
time_base=time_base,
)
)
return frames
def roundtrip_audio(
self,
codec: RTCRtpCodecParameters,
layout: str,
sample_rate: int,
drop: list[int] = [],
) -> None:
"""
Round-trip an AudioFrame through encoder then decoder.
"""
encoder = get_encoder(codec)
decoder = get_decoder(codec)
samples = int(sample_rate * AUDIO_PTIME)
time_base = fractions.Fraction(1, sample_rate)
input_frames = self.create_audio_frames(
layout=layout, sample_rate=sample_rate, count=10
)
for i, frame in enumerate(input_frames):
# encode
packages, timestamp = encoder.encode(frame)
self.assertEqual(timestamp, i * codec.clockRate * AUDIO_PTIME)
if i not in drop:
# depacketize
data = b""
for package in packages:
data += depayload(codec, package)
# decode
frames = decoder.decode(JitterFrame(data=data, timestamp=timestamp))
self.assertEqual(len(frames), 1)
assert isinstance(frames[0], AudioFrame)
self.assertEqual(frames[0].format.name, "s16")
self.assertEqual(frames[0].layout.name, layout)
self.assertEqual(frames[0].samples, samples)
self.assertEqual(frames[0].sample_rate, sample_rate)
self.assertEqual(frames[0].pts, i * samples)
self.assertEqual(frames[0].time_base, time_base)
def roundtrip_video(
self,
codec: RTCRtpCodecParameters,
width: int,
height: int,
time_base: fractions.Fraction = VIDEO_TIME_BASE,
) -> None:
"""
Round-trip a VideoFrame through encoder then decoder.
"""
encoder = get_encoder(codec)
decoder = get_decoder(codec)
input_frames = self.create_video_frames(
width=width, height=height, count=30, time_base=time_base
)
for i, frame in enumerate(input_frames):
# encode
packages, timestamp = encoder.encode(frame)
# depacketize
data = b""
for package in packages:
data += depayload(codec, package)
# decode
frames = decoder.decode(JitterFrame(data=data, timestamp=timestamp))
self.assertEqual(len(frames), 1)
assert isinstance(frames[0], VideoFrame)
self.assertEqual(frames[0].width, frame.width)
self.assertEqual(frames[0].height, frame.height)
self.assertAlmostEqual(frames[0].pts, i * 3000, delta=1)
self.assertEqual(frames[0].time_base, VIDEO_TIME_BASE)
|