File: stream_consumer.cc

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (225 lines) | stat: -rw-r--r-- 7,031 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifdef UNSAFE_BUFFERS_BUILD
// TODO(crbug.com/40285824): Remove this and convert code to safer constructs.
#pragma allow_unsafe_buffers
#endif

#include "components/cast_streaming/browser/frame/stream_consumer.h"

#include <algorithm>

#include "base/containers/span.h"
#include "base/logging.h"
#include "base/numerics/safe_conversions.h"
#include "base/task/sequenced_task_runner.h"
#include "components/cast_streaming/browser/common/decoder_buffer_factory.h"
#include "components/cast_streaming/common/public/features.h"
#include "media/base/media_util.h"
#include "media/mojo/common/media_type_converters.h"
#include "third_party/openscreen/src/platform/base/span.h"

namespace cast_streaming {

StreamConsumer::BufferDataWrapper::~BufferDataWrapper() = default;

base::span<uint8_t> StreamConsumer::BufferDataWrapper::Get() {
  return base::span<uint8_t>(&pending_buffer_[pending_buffer_offset_],
                             pending_buffer_remaining_bytes_);
}

base::span<uint8_t> StreamConsumer::BufferDataWrapper::Consume(
    uint32_t max_size) {
  const uint32_t current_offset = pending_buffer_offset_;
  const uint32_t current_remaining_bytes = pending_buffer_remaining_bytes_;

  const uint32_t read_size = std::min(max_size, current_remaining_bytes);

  pending_buffer_offset_ += read_size;
  pending_buffer_remaining_bytes_ -= read_size;
  return base::span<uint8_t>(&pending_buffer_[current_offset], read_size);
}

bool StreamConsumer::BufferDataWrapper::Reset(uint32_t new_size) {
  if (new_size > kMaxFrameSize) {
    return false;
  }

  pending_buffer_offset_ = 0;
  pending_buffer_remaining_bytes_ = new_size;
  return true;
}

void StreamConsumer::BufferDataWrapper::Clear() {
  bool success = Reset(uint32_t{0});
  DCHECK(success);
}

uint32_t StreamConsumer::BufferDataWrapper::Size() const {
  return pending_buffer_remaining_bytes_;
}

StreamConsumer::StreamConsumer(
    openscreen::cast::Receiver* receiver,
    mojo::ScopedDataPipeProducerHandle data_pipe,
    FrameReceivedCB frame_received_cb,
    base::RepeatingClosure on_new_frame,
    std::unique_ptr<DecoderBufferFactory> decoder_buffer_factory)
    : receiver_(receiver),
      data_pipe_(std::move(data_pipe)),
      frame_received_cb_(std::move(frame_received_cb)),
      pipe_watcher_(FROM_HERE,
                    mojo::SimpleWatcher::ArmingPolicy::MANUAL,
                    base::SequencedTaskRunner::GetCurrentDefault()),
      on_new_frame_(std::move(on_new_frame)),
      decoder_buffer_factory_(std::move(decoder_buffer_factory)) {
  DCHECK(receiver_);
  DCHECK(decoder_buffer_factory_);

  receiver_->SetConsumer(this);
  MojoResult result =
      pipe_watcher_.Watch(data_pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
                          base::BindRepeating(&StreamConsumer::OnPipeWritable,
                                              base::Unretained(this)));
  if (result != MOJO_RESULT_OK) {
    CloseDataPipeOnError();
    return;
  }
}

// NOTE: Do NOT call into |receiver_| methods here, as the object may no longer
// be valid at time of this object's destruction.
StreamConsumer::~StreamConsumer() = default;

void StreamConsumer::ReadFrame(base::OnceClosure no_frames_available_cb) {
  DCHECK(!is_read_pending_);
  DCHECK(!no_frames_available_cb_);
  is_read_pending_ = true;
  no_frames_available_cb_ = std::move(no_frames_available_cb);
  MaybeSendNextFrame();
}

void StreamConsumer::CloseDataPipeOnError() {
  DLOG(WARNING) << "[ssrc:" << receiver_->ssrc() << "] Data pipe closed.";
  pipe_watcher_.Cancel();
  data_pipe_.reset();
}

void StreamConsumer::OnPipeWritable(MojoResult result) {
  DCHECK(data_pipe_);

  if (result != MOJO_RESULT_OK) {
    CloseDataPipeOnError();
    return;
  }

  size_t bytes_written = 0;
  result = data_pipe_->WriteData(data_wrapper_.Get(), MOJO_WRITE_DATA_FLAG_NONE,
                                 bytes_written);
  if (result != MOJO_RESULT_OK) {
    CloseDataPipeOnError();
    return;
  }

  data_wrapper_.Consume(base::checked_cast<uint32_t>(bytes_written));
  if (!data_wrapper_.empty()) {
    pipe_watcher_.ArmOrNotify();
    return;
  }

  MaybeSendNextFrame();
}

void StreamConsumer::OnFramesReady(int next_frame_buffer_size) {
  MaybeSendNextFrame();
}

void StreamConsumer::FlushUntil(uint32_t frame_id) {
  skip_until_frame_id_ = frame_id;
  if (is_read_pending_) {
    is_read_pending_ = false;
    no_frames_available_cb_.Reset();
    frame_received_cb_.Run(nullptr);
  }
}

void StreamConsumer::MaybeSendNextFrame() {
  if (!is_read_pending_ || !data_wrapper_.empty()) {
    return;
  }

  const int current_frame_buffer_size = receiver_->AdvanceToNextFrame();
  if (current_frame_buffer_size == openscreen::cast::Receiver::kNoFramesReady) {
    if (no_frames_available_cb_) {
      std::move(no_frames_available_cb_).Run();
    }
    return;
  }

  on_new_frame_.Run();

  if (!data_wrapper_.Reset(current_frame_buffer_size)) {
    LOG(ERROR) << "[ssrc:" << receiver_->ssrc() << "] "
               << "Frame size too big: " << current_frame_buffer_size;
    CloseDataPipeOnError();
    return;
  }

  openscreen::cast::EncodedFrame encoded_frame;

  // Write to temporary storage in case we need to drop this frame.
  base::span<uint8_t> span = data_wrapper_.Get();
  encoded_frame = receiver_->ConsumeNextFrame(
      openscreen::ByteBuffer(span.data(), span.size()));

  // If the frame occurs before the id we want to flush until, drop it and try
  // again.
  // TODO(crbug.com/1412561): Move this logic to Openscreen.
  if (encoded_frame.frame_id <
      openscreen::cast::FrameId(int64_t{skip_until_frame_id_})) {
    VLOG(1) << "Skipping Frame " << encoded_frame.frame_id;

    data_wrapper_.Clear();
    MaybeSendNextFrame();
    return;
  }

  // Create the buffer, retrying if this fails.
  scoped_refptr<media::DecoderBuffer> decoder_buffer =
      decoder_buffer_factory_->ToDecoderBuffer(encoded_frame, data_wrapper_);
  if (!decoder_buffer) {
    data_wrapper_.Clear();
    MaybeSendNextFrame();
  }

  // At this point, the frame is known to be "good".
  skip_until_frame_id_ = 0;
  no_frames_available_cb_.Reset();

  // Write the frame's data to Mojo.
  size_t bytes_written = 0;
  auto result = data_pipe_->WriteData(data_wrapper_.Get(),
                                      MOJO_WRITE_DATA_FLAG_NONE, bytes_written);
  if (result == MOJO_RESULT_SHOULD_WAIT) {
    pipe_watcher_.ArmOrNotify();
    bytes_written = 0;
  } else if (result != MOJO_RESULT_OK) {
    CloseDataPipeOnError();
    return;
  }
  data_wrapper_.Consume(base::checked_cast<uint32_t>(bytes_written));

  // Return the frame.
  is_read_pending_ = false;
  frame_received_cb_.Run(media::mojom::DecoderBuffer::From(*decoder_buffer));

  // Wait for the mojo pipe to be writable if there is still pending data to
  // write.
  if (!data_wrapper_.empty()) {
    pipe_watcher_.ArmOrNotify();
  }
}

}  // namespace cast_streaming