1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
|
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "cast/streaming/compound_rtcp_parser.h"
#include <algorithm>
#include "cast/streaming/packet_util.h"
#include "cast/streaming/rtcp_session.h"
#include "util/osp_logging.h"
#include "util/std_util.h"
namespace openscreen {
namespace cast {
namespace {
// Use the Clock's minimum time value (an impossible value, waaaaay before epoch
// time) to represent unset time_point values.
constexpr auto kNullTimePoint = Clock::time_point::min();
// Canonicalizes the just-parsed list of packet-specific NACKs so that the
// CompoundRtcpParser::Client can make several simplifying assumptions when
// processing the results.
void CanonicalizePacketNackVector(std::vector<PacketNack>* packets) {
// First, sort all elements. The sort order is the normal lexicographical
// ordering, with one exception: The special kAllPacketsLost packet_id value
// should be treated as coming before all others. This special sort order
// allows the filtering algorithm below to be simpler, and only require one
// pass; and the final result will be the normal lexicographically-sorted
// output the CompoundRtcpParser::Client expects.
std::sort(packets->begin(), packets->end(),
[](const PacketNack& a, const PacketNack& b) {
// Since the comparator is a hot code path, use a simple modular
// arithmetic trick in lieu of extra branching: When comparing the
// tuples, map all packet_id values to packet_id + 1, mod 0x10000.
// This results in the desired sorting behavior since
// kAllPacketsLost (0xffff) wraps-around to 0x0000, and all other
// values become N + 1.
static_assert(static_cast<FramePacketId>(kAllPacketsLost + 1) <
FramePacketId{0x0000 + 1},
"comparison requires integer wrap-around");
return PacketNack{a.frame_id,
static_cast<FramePacketId>(a.packet_id + 1)} <
PacketNack{b.frame_id,
static_cast<FramePacketId>(b.packet_id + 1)};
});
// De-duplicate elements. Two possible cases:
//
// 1. Identical elements (same FrameId+FramePacketId).
// 2. If there are any elements with kAllPacketsLost as the packet ID,
// prune-out all other elements having the same frame ID, as they are
// redundant.
//
// This is done by walking forwards over the sorted vector and deciding which
// elements to keep. Those that are kept are stacked-up at the front of the
// vector. After the "to-keep" pass, the vector is truncated to remove the
// left-over garbage at the end.
auto have_it = packets->begin();
if (have_it != packets->end()) {
auto kept_it = have_it; // Always keep the first element.
for (++have_it; have_it != packets->end(); ++have_it) {
if (have_it->frame_id != kept_it->frame_id ||
(kept_it->packet_id != kAllPacketsLost &&
have_it->packet_id != kept_it->packet_id)) { // Keep it.
++kept_it;
*kept_it = *have_it;
}
}
packets->erase(++kept_it, packets->end());
}
}
} // namespace
CompoundRtcpParser::CompoundRtcpParser(RtcpSession* session,
CompoundRtcpParser::Client* client)
: session_(session),
client_(client),
latest_receiver_timestamp_(kNullTimePoint) {
OSP_DCHECK(session_);
OSP_DCHECK(client_);
}
CompoundRtcpParser::~CompoundRtcpParser() = default;
bool CompoundRtcpParser::Parse(absl::Span<const uint8_t> buffer,
FrameId max_feedback_frame_id) {
// These will contain the results from the various ParseXYZ() methods. None of
// the results will be dispatched to the Client until the entire parse
// succeeds.
Clock::time_point receiver_reference_time = kNullTimePoint;
absl::optional<RtcpReportBlock> receiver_report;
FrameId checkpoint_frame_id;
std::chrono::milliseconds target_playout_delay{};
std::vector<FrameId> received_frames;
std::vector<PacketNack> packet_nacks;
bool picture_loss_indicator = false;
// The data contained in |buffer| can be a "compound packet," which means that
// it can be the concatenation of multiple RTCP packets. The loop here
// processes each one-by-one.
while (!buffer.empty()) {
const auto header = RtcpCommonHeader::Parse(buffer);
if (!header) {
return false;
}
buffer.remove_prefix(kRtcpCommonHeaderSize);
if (static_cast<int>(buffer.size()) < header->payload_size) {
return false;
}
const absl::Span<const uint8_t> payload =
buffer.subspan(0, header->payload_size);
buffer.remove_prefix(header->payload_size);
switch (header->packet_type) {
case RtcpPacketType::kReceiverReport:
if (!ParseReceiverReport(payload, header->with.report_count,
&receiver_report)) {
return false;
}
break;
case RtcpPacketType::kPayloadSpecific:
switch (header->with.subtype) {
case RtcpSubtype::kPictureLossIndicator:
if (!ParsePictureLossIndicator(payload, &picture_loss_indicator)) {
return false;
}
break;
case RtcpSubtype::kFeedback:
if (!ParseFeedback(payload, max_feedback_frame_id,
&checkpoint_frame_id, &target_playout_delay,
&received_frames, &packet_nacks)) {
return false;
}
break;
default:
// Ignore: Unimplemented or not part of the Cast Streaming spec.
break;
}
break;
case RtcpPacketType::kExtendedReports:
if (!ParseExtendedReports(payload, &receiver_reference_time)) {
return false;
}
break;
default:
// Ignored, unimplemented or not part of the Cast Streaming spec.
break;
}
}
// A well-behaved Cast Streaming Receiver will always include a reference time
// report. This essentially "timestamps" the RTCP packets just parsed.
// However, the spec does not explicitly require this be included. When it is
// present, improve the stability of the system by ignoring stale/out-of-order
// RTCP packets.
if (receiver_reference_time != kNullTimePoint) {
// If the packet is out-of-order (e.g., it got delayed/shuffled when going
// through the network), just ignore it. Since RTCP packets always include
// all the necessary current state from the peer, dropping them does not
// mean important signals will be lost. In fact, it can actually be harmful
// to process compound RTCP packets out-of-order.
if (latest_receiver_timestamp_ != kNullTimePoint &&
receiver_reference_time < latest_receiver_timestamp_) {
return true;
}
latest_receiver_timestamp_ = receiver_reference_time;
client_->OnReceiverReferenceTimeAdvanced(latest_receiver_timestamp_);
}
// At this point, the packet is known to be well-formed. Dispatch events of
// interest to the Client.
if (receiver_report) {
client_->OnReceiverReport(*receiver_report);
}
if (!checkpoint_frame_id.is_null()) {
client_->OnReceiverCheckpoint(checkpoint_frame_id, target_playout_delay);
}
if (!received_frames.empty()) {
OSP_DCHECK(AreElementsSortedAndUnique(received_frames));
client_->OnReceiverHasFrames(std::move(received_frames));
}
CanonicalizePacketNackVector(&packet_nacks);
if (!packet_nacks.empty()) {
client_->OnReceiverIsMissingPackets(std::move(packet_nacks));
}
if (picture_loss_indicator) {
client_->OnReceiverIndicatesPictureLoss();
}
return true;
}
bool CompoundRtcpParser::ParseReceiverReport(
absl::Span<const uint8_t> in,
int num_report_blocks,
absl::optional<RtcpReportBlock>* receiver_report) {
if (in.size() < kRtcpReceiverReportSize) {
return false;
}
if (ConsumeField<uint32_t>(&in) == session_->receiver_ssrc()) {
*receiver_report = RtcpReportBlock::ParseOne(in, num_report_blocks,
session_->sender_ssrc());
}
return true;
}
bool CompoundRtcpParser::ParseFeedback(
absl::Span<const uint8_t> in,
FrameId max_feedback_frame_id,
FrameId* checkpoint_frame_id,
std::chrono::milliseconds* target_playout_delay,
std::vector<FrameId>* received_frames,
std::vector<PacketNack>* packet_nacks) {
OSP_DCHECK(!max_feedback_frame_id.is_null());
if (static_cast<int>(in.size()) < kRtcpFeedbackHeaderSize) {
return false;
}
if (ConsumeField<uint32_t>(&in) != session_->receiver_ssrc() ||
ConsumeField<uint32_t>(&in) != session_->sender_ssrc()) {
return true; // Ignore report from mismatched SSRC(s).
}
if (ConsumeField<uint32_t>(&in) != kRtcpCastIdentifierWord) {
return false;
}
const FrameId feedback_frame_id =
max_feedback_frame_id.ExpandLessThanOrEqual(ConsumeField<uint8_t>(&in));
const int loss_field_count = ConsumeField<uint8_t>(&in);
const auto playout_delay =
std::chrono::milliseconds(ConsumeField<uint16_t>(&in));
// Don't process feedback that would move the checkpoint backwards. The Client
// makes assumptions about what frame data and other tracking state can be
// discarded based on a monotonically non-decreasing checkpoint FrameId.
if (!checkpoint_frame_id->is_null() &&
*checkpoint_frame_id > feedback_frame_id) {
return true;
}
*checkpoint_frame_id = feedback_frame_id;
*target_playout_delay = playout_delay;
received_frames->clear();
packet_nacks->clear();
if (static_cast<int>(in.size()) <
(kRtcpFeedbackLossFieldSize * loss_field_count)) {
return false;
}
// Parse the NACKs.
for (int i = 0; i < loss_field_count; ++i) {
const FrameId frame_id =
feedback_frame_id.ExpandGreaterThan(ConsumeField<uint8_t>(&in));
FramePacketId packet_id = ConsumeField<uint16_t>(&in);
uint8_t bits = ConsumeField<uint8_t>(&in);
packet_nacks->push_back(PacketNack{frame_id, packet_id});
if (packet_id != kAllPacketsLost) {
// Translate each set bit in the bit vector into another missing
// FramePacketId.
while (bits) {
++packet_id;
if (bits & 1) {
packet_nacks->push_back(PacketNack{frame_id, packet_id});
}
bits >>= 1;
}
}
}
// Parse the optional CST2 feedback (frame-level ACKs).
if (static_cast<int>(in.size()) < kRtcpFeedbackAckHeaderSize ||
ConsumeField<uint32_t>(&in) != kRtcpCst2IdentifierWord) {
// Optional CST2 extended feedback is not present. For backwards-
// compatibility reasons, do not consider any extra "garbage" in the packet
// that doesn't match 'CST2' as corrupted input.
return true;
}
// Skip over the "Feedback Count" field. It's currently unused, though it
// might be useful for event tracing later...
in.remove_prefix(sizeof(uint8_t));
const int ack_bitvector_octet_count = ConsumeField<uint8_t>(&in);
if (static_cast<int>(in.size()) < ack_bitvector_octet_count) {
return false;
}
// Translate each set bit in the bit vector into a FrameId. See the
// explanation of this wire format in rtp_defines.h for where the "plus two"
// comes from.
FrameId starting_frame_id = feedback_frame_id + 2;
for (int i = 0; i < ack_bitvector_octet_count; ++i) {
uint8_t bits = ConsumeField<uint8_t>(&in);
FrameId frame_id = starting_frame_id;
while (bits) {
if (bits & 1) {
received_frames->push_back(frame_id);
}
++frame_id;
bits >>= 1;
}
constexpr int kBitsPerOctet = 8;
starting_frame_id += kBitsPerOctet;
}
return true;
}
bool CompoundRtcpParser::ParseExtendedReports(
absl::Span<const uint8_t> in,
Clock::time_point* receiver_reference_time) {
if (static_cast<int>(in.size()) < kRtcpExtendedReportHeaderSize) {
return false;
}
if (ConsumeField<uint32_t>(&in) != session_->receiver_ssrc()) {
return true; // Ignore report from unknown receiver.
}
while (!in.empty()) {
// All extended report types have the same 4-byte subheader.
if (static_cast<int>(in.size()) < kRtcpExtendedReportBlockHeaderSize) {
return false;
}
const uint8_t block_type = ConsumeField<uint8_t>(&in);
in.remove_prefix(sizeof(uint8_t)); // Skip the "reserved" byte.
const int block_data_size =
static_cast<int>(ConsumeField<uint16_t>(&in)) * 4;
if (static_cast<int>(in.size()) < block_data_size) {
return false;
}
if (block_type == kRtcpReceiverReferenceTimeReportBlockType) {
if (block_data_size != sizeof(uint64_t)) {
return false; // Length field must always be 2 words.
}
*receiver_reference_time = session_->ntp_converter().ToLocalTime(
ReadBigEndian<uint64_t>(in.data()));
} else {
// Ignore any other type of extended report.
}
in.remove_prefix(block_data_size);
}
return true;
}
bool CompoundRtcpParser::ParsePictureLossIndicator(
absl::Span<const uint8_t> in,
bool* picture_loss_indicator) {
if (static_cast<int>(in.size()) < kRtcpPictureLossIndicatorHeaderSize) {
return false;
}
// Only set the flag if the PLI is from the Receiver and to this Sender.
if (ConsumeField<uint32_t>(&in) == session_->receiver_ssrc() &&
ConsumeField<uint32_t>(&in) == session_->sender_ssrc()) {
*picture_loss_indicator = true;
}
return true;
}
CompoundRtcpParser::Client::Client() = default;
CompoundRtcpParser::Client::~Client() = default;
void CompoundRtcpParser::Client::OnReceiverReferenceTimeAdvanced(
Clock::time_point reference_time) {}
void CompoundRtcpParser::Client::OnReceiverReport(
const RtcpReportBlock& receiver_report) {}
void CompoundRtcpParser::Client::OnReceiverIndicatesPictureLoss() {}
void CompoundRtcpParser::Client::OnReceiverCheckpoint(
FrameId frame_id,
std::chrono::milliseconds playout_delay) {}
void CompoundRtcpParser::Client::OnReceiverHasFrames(
std::vector<FrameId> acks) {}
void CompoundRtcpParser::Client::OnReceiverIsMissingPackets(
std::vector<PacketNack> nacks) {}
} // namespace cast
} // namespace openscreen
|