1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
|
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CAST_STREAMING_RECEIVER_H_
#define CAST_STREAMING_RECEIVER_H_
#include <stdint.h>
#include <array>
#include <chrono>
#include <memory>
#include <utility>
#include <vector>
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "cast/streaming/clock_drift_smoother.h"
#include "cast/streaming/compound_rtcp_builder.h"
#include "cast/streaming/environment.h"
#include "cast/streaming/frame_collector.h"
#include "cast/streaming/frame_id.h"
#include "cast/streaming/packet_receive_stats_tracker.h"
#include "cast/streaming/receiver_base.h"
#include "cast/streaming/rtcp_common.h"
#include "cast/streaming/rtcp_session.h"
#include "cast/streaming/rtp_packet_parser.h"
#include "cast/streaming/sender_report_parser.h"
#include "cast/streaming/session_config.h"
#include "cast/streaming/ssrc.h"
#include "platform/api/time.h"
#include "util/alarm.h"
namespace openscreen {
namespace cast {
struct EncodedFrame;
class ReceiverPacketRouter;
// The Cast Streaming Receiver, a peer corresponding to some Cast Streaming
// Sender at the other end of a network link.
//
// Cast Streaming is a transport protocol which divides up the frames for one
// media stream (e.g., audio or video) into multiple RTP packets containing an
// encrypted payload. The Receiver is the peer responsible for collecting the
// RTP packets, decrypting the payload, and re-assembling a frame that can be
// passed to a decoder and played out.
//
// A Sender ↔ Receiver pair is used to transport each media stream. Typically,
// there are two pairs in a normal system, one for the audio stream and one for
// video stream. A local player is responsible for synchronizing the playout of
// the frames of each stream to achieve lip-sync. See the discussion in
// encoded_frame.h for how the |reference_time| and |rtp_timestamp| of the
// EncodedFrames are used to achieve this.
//
// See the Receiver Demo app for a reference implementation that both shows and
// explains how Receivers are properly configured and started, integrated with a
// decoder, and the resulting decoded media is played out. Also, here is a
// general usage example:
//
// class MyPlayer : public openscreen::cast::Receiver::Consumer {
// public:
// explicit MyPlayer(Receiver* receiver) : receiver_(receiver) {
// recevier_->SetPlayerProcessingTime(std::chrono::milliseconds(10));
// receiver_->SetConsumer(this);
// }
//
// ~MyPlayer() override {
// receiver_->SetConsumer(nullptr);
// }
//
// private:
// // Receiver::Consumer implementation.
// void OnFramesReady(int next_frame_buffer_size) override {
// std::vector<uint8_t> buffer;
// buffer.resize(next_frame_buffer_size);
// openscreen::cast::EncodedFrame encoded_frame =
// receiver_->ConsumeNextFrame(absl::Span<uint8_t>(buffer));
//
// display_.RenderFrame(decoder_.DecodeFrame(encoded_frame.data));
//
// // Note: An implementation could call receiver_->AdvanceToNextFrame()
// // and receiver_->ConsumeNextFrame() in a loop here, to consume all the
// // remaining frames that are ready.
// }
//
// Receiver* const receiver_;
// MyDecoder decoder_;
// MyDisplay display_;
// };
//
// Internally, a queue of complete and partially-received frames is maintained.
// The queue is a circular queue of FrameCollectors that each maintain the
// individual receive state of each in-flight frame. There are three conceptual
// "pointers" that indicate what assumptions and operations are made on certain
// ranges of frames in the queue:
//
// 1. Latest Frame Expected: The FrameId of the latest frame whose existence
// is known to this Receiver. This is the highest FrameId seen in any
// successfully-parsed RTP packet.
// 2. Checkpoint Frame: Indicates that all of the RTP packets for all frames
// up to and including the one having this FrameId have been successfully
// received and processed.
// 3. Last Frame Consumed: The FrameId of last frame consumed (see
// ConsumeNextFrame()). Once a frame is consumed, all internal resources
// related to the frame can be freed and/or re-used for later frames.
class Receiver : public ReceiverBase {
public:
using ReceiverBase::Consumer;
// Constructs a Receiver that attaches to the given |environment| and
// |packet_router|. The config contains the settings that were
// agreed-upon by both sides from the OFFER/ANSWER exchange (i.e., the part of
// the overall end-to-end connection process that occurs before Cast Streaming
// is started).
Receiver(Environment* environment,
ReceiverPacketRouter* packet_router,
SessionConfig config);
~Receiver() override;
// ReceiverBase overrides.
const SessionConfig& config() const override;
int rtp_timebase() const override;
Ssrc ssrc() const override;
void SetConsumer(Consumer* consumer) override;
void SetPlayerProcessingTime(Clock::duration needed_time) override;
void RequestKeyFrame() override;
int AdvanceToNextFrame() override;
EncodedFrame ConsumeNextFrame(absl::Span<uint8_t> buffer) override;
// Allows setting picture loss indication for testing. In production, this
// should be done using the config.
void SetPliEnabledForTesting(bool is_pli_enabled) {
is_pli_enabled_ = is_pli_enabled;
}
// The default "player processing time" amount. See SetPlayerProcessingTime().
static constexpr std::chrono::milliseconds kDefaultPlayerProcessingTime =
ReceiverBase::kDefaultPlayerProcessingTime;
// Returned by AdvanceToNextFrame() when there are no frames currently ready
// for consumption.
static constexpr int kNoFramesReady = ReceiverBase::kNoFramesReady;
protected:
friend class ReceiverPacketRouter;
// Called by ReceiverPacketRouter to provide this Receiver with what looks
// like a RTP/RTCP packet meant for it specifically (among other Receivers).
void OnReceivedRtpPacket(Clock::time_point arrival_time,
std::vector<uint8_t> packet);
void OnReceivedRtcpPacket(Clock::time_point arrival_time,
std::vector<uint8_t> packet);
private:
// An entry in the circular queue (see |pending_frames_|).
struct PendingFrame {
FrameCollector collector;
// The Receiver's [local] Clock time when this frame was originally captured
// at the Sender. This is computed and assigned when the RTP packet with ID
// 0 is processed. Add the target playout delay to this to get the target
// playout time.
absl::optional<Clock::time_point> estimated_capture_time;
PendingFrame();
~PendingFrame();
// Reset this entry to its initial state, freeing resources.
void Reset();
};
// Get/Set the checkpoint FrameId. This indicates that all of the packets for
// all frames up to and including this FrameId have been successfully received
// (or otherwise do not need to be re-transmitted).
FrameId checkpoint_frame() const { return rtcp_builder_.checkpoint_frame(); }
void set_checkpoint_frame(FrameId frame_id) {
rtcp_builder_.SetCheckpointFrame(frame_id);
}
// Send an RTCP packet to the Sender immediately, to acknowledge the complete
// reception of one or more additional frames, to reply to a Sender Report, or
// to request re-transmits. Calling this also schedules additional RTCP
// packets to be sent periodically for the life of this Receiver.
void SendRtcp();
// Helpers to map the given |frame_id| to the element in the |pending_frames_|
// circular queue. There are both const and non-const versions, but neither
// mutate any state (i.e., they are just look-ups).
const PendingFrame& GetQueueEntry(FrameId frame_id) const;
PendingFrame& GetQueueEntry(FrameId frame_id);
// Record that the target playout delay has changed starting with the given
// FrameId.
void RecordNewTargetPlayoutDelay(FrameId as_of_frame,
std::chrono::milliseconds delay);
// Examine the known target playout delay changes to determine what setting is
// in-effect for the given frame.
std::chrono::milliseconds ResolveTargetPlayoutDelay(FrameId frame_id) const;
// Called to move the checkpoint forward. This scans the queue, starting from
// |new_checkpoint|, to find the latest in a contiguous sequence of completed
// frames. Then, it records that frame as the new checkpoint, and immediately
// sends a feedback RTCP packet to the Sender.
void AdvanceCheckpoint(FrameId new_checkpoint);
// Helper to force-drop all frames before |first_kept_frame|, even if they
// were never consumed. This will also auto-cancel frames that were never
// completely received, artificially moving the checkpoint forward, and
// notifying the Sender of that. The caller of this method is responsible for
// making sure that frame data dependencies will not be broken by dropping the
// frames.
void DropAllFramesBefore(FrameId first_kept_frame);
// Sets the |consumption_alarm_| to check whether any frames are ready,
// including possibly skipping over late frames in order to make not-yet-late
// frames become ready. The default argument value means "without delay."
void ScheduleFrameReadyCheck(Clock::time_point when = Alarm::kImmediately);
const ClockNowFunctionPtr now_;
ReceiverPacketRouter* const packet_router_;
const SessionConfig config_;
RtcpSession rtcp_session_;
SenderReportParser rtcp_parser_;
CompoundRtcpBuilder rtcp_builder_;
PacketReceiveStatsTracker stats_tracker_; // Tracks transmission stats.
RtpPacketParser rtp_parser_;
const int rtp_timebase_; // RTP timestamp ticks per second.
const FrameCrypto crypto_; // Decrypts assembled frames.
bool is_pli_enabled_; // Whether picture loss indication is enabled.
// Buffer for serializing/sending RTCP packets.
const int rtcp_buffer_capacity_;
const std::unique_ptr<uint8_t[]> rtcp_buffer_;
// Schedules tasks to ensure RTCP reports are sent within a bounded interval.
// Not scheduled until after this Receiver has processed the first packet from
// the Sender.
Alarm rtcp_alarm_;
Clock::time_point last_rtcp_send_time_ = Clock::time_point::min();
// The last Sender Report received and when the packet containing it had
// arrived. This contains lip-sync timestamps used as part of the calculation
// of playout times for the received frames, as well as ping-pong data bounced
// back to the Sender in the Receiver Reports. It is nullopt until the first
// parseable Sender Report is received.
absl::optional<SenderReportParser::SenderReportWithId> last_sender_report_;
Clock::time_point last_sender_report_arrival_time_;
// Tracks the offset between the Receiver's [local] clock and the Sender's
// clock. This is invalid until the first Sender Report has been successfully
// processed (i.e., |last_sender_report_| is not nullopt).
ClockDriftSmoother smoothed_clock_offset_;
// The ID of the latest frame whose existence is known to this Receiver. This
// value must always be greater than or equal to |checkpoint_frame()|.
FrameId latest_frame_expected_ = FrameId::leader();
// The ID of the last frame consumed. This value must always be less than or
// equal to |checkpoint_frame()|, since it's impossible to consume incomplete
// frames!
FrameId last_frame_consumed_ = FrameId::leader();
// The ID of the latest key frame known to be in-flight. This is used by
// RequestKeyFrame() to ensure the PLI condition doesn't get set again until
// after the consumer has seen a key frame that would clear the condition.
FrameId last_key_frame_received_;
// The frame queue (circular), which tracks which frames are in-flight, stores
// data for partially-received frames, and holds onto completed frames until
// the consumer consumes them.
//
// Use GetQueueEntry() to access a slot. The currently-active slots are those
// for the frames after |last_frame_consumed_| and up-to/including
// |latest_frame_expected_|.
std::array<PendingFrame, kMaxUnackedFrames> pending_frames_{};
// Tracks the recent changes to the target playout delay, which is controlled
// by the Sender. The FrameId indicates the first frame where a new delay
// setting takes effect. This vector is never empty, is kept sorted, and is
// pruned to remain as small as possible.
//
// The target playout delay is the amount of time between a frame's
// capture/recording on the Sender and when it should be played-out at the
// Receiver.
std::vector<std::pair<FrameId, std::chrono::milliseconds>>
playout_delay_changes_;
// The consumer to notify when there are one or more frames completed and
// ready to be consumed.
Consumer* consumer_ = nullptr;
// The additional time needed to decode/play-out each frame after being
// consumed from this Receiver.
Clock::duration player_processing_time_ = kDefaultPlayerProcessingTime;
// Scheduled to check whether there are frames ready and, if there are, to
// notify the Consumer via OnFramesReady().
Alarm consumption_alarm_;
// The interval between sending ACK/NACK feedback RTCP messages while
// incomplete frames exist in the queue.
//
// TODO(jophba): This should be a function of the current target playout
// delay, similar to the Sender's kickstart interval logic.
static constexpr std::chrono::milliseconds kNackFeedbackInterval{30};
};
} // namespace cast
} // namespace openscreen
#endif // CAST_STREAMING_RECEIVER_H_
|