File: stream_consumer.h

package info (click to toggle)
chromium 120.0.6099.224-1~deb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 6,112,112 kB
  • sloc: cpp: 32,907,025; ansic: 8,148,123; javascript: 3,679,536; python: 2,031,248; asm: 959,718; java: 804,675; xml: 617,256; sh: 111,417; objc: 100,835; perl: 88,443; cs: 53,032; makefile: 29,579; fortran: 24,137; php: 21,162; tcl: 21,147; sql: 20,809; ruby: 17,735; pascal: 12,864; yacc: 8,045; lisp: 3,388; lex: 1,323; ada: 727; awk: 329; jsp: 267; csh: 117; exp: 43; sed: 37
file content (131 lines) | stat: -rw-r--r-- 5,171 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COMPONENTS_CAST_STREAMING_BROWSER_FRAME_STREAM_CONSUMER_H_
#define COMPONENTS_CAST_STREAMING_BROWSER_FRAME_STREAM_CONSUMER_H_

#include "base/functional/callback.h"
#include "base/memory/raw_ptr.h"
#include "base/time/time.h"
#include "components/cast_streaming/browser/common/decoder_buffer_factory.h"
#include "media/mojo/mojom/media_types.mojom.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/openscreen/src/cast/streaming/receiver.h"
#include "third_party/openscreen/src/cast/streaming/receiver_session.h"

namespace cast_streaming {

// Attaches to an Open Screen Receiver to receive buffers of encoded data and
// invokes |frame_received_cb_| with each buffer.
//
// Internally, this class writes buffers of encoded data directly to
// |data_pipe_| rather than using a helper class like MojoDecoderBufferWriter.
// This allows us to use |data_pipe_| as an end-to-end buffer to cap memory
// usage. Receiving new buffers is delayed until the pipe has free memory again.
// The Open Screen library takes care of discarding buffers that are too old and
// requesting new key frames as needed.
class StreamConsumer final : public openscreen::cast::Receiver::Consumer {
 public:
  using FrameReceivedCB =
      base::RepeatingCallback<void(media::mojom::DecoderBufferPtr)>;

  // |receiver| sends frames to this object. It must outlive this object.
  // |frame_received_cb| is called on every new frame, after a new frame has
  // been written to |data_pipe|. On error, |data_pipe| will be closed.
  // On every new frame, |on_new_frame| will be called.
  StreamConsumer(openscreen::cast::Receiver* receiver,
                 mojo::ScopedDataPipeProducerHandle data_pipe,
                 FrameReceivedCB frame_received_cb,
                 base::RepeatingClosure on_new_frame,
                 std::unique_ptr<DecoderBufferFactory> decoder_buffer_factory);
  ~StreamConsumer() override;

  StreamConsumer(const StreamConsumer&) = delete;
  StreamConsumer& operator=(const StreamConsumer&) = delete;

  // Informs the StreamConsumer that a new frame should be read asynchronously.
  // Eventually, the |frame_received_cb_| will be called with the data for this
  // frame. |no_frames_available_cb| will be called if no frames are immediately
  // available when this callback first tries to read them.
  void ReadFrame(base::OnceClosure no_frames_available_cb);

  // Cancels any ongoing read call, then skips reading of all future frames with
  // an id less than |frame_id|.
  void FlushUntil(uint32_t frame_id);

 private:
  // Wrapper around a data buffer used for storing the data of a DecoderBuffer
  // received from Openscreen.
  class BufferDataWrapper : public DecoderBufferFactory::FrameContents {
   public:
    ~BufferDataWrapper() override;

    // Returns up to |max_size| more bytes of the underlying array, invalidating
    // these bytes in the underlying buffer.
    base::span<uint8_t> Consume(uint32_t max_size);

    // DecoderBufferFactory::FrameContents overrides.
    base::span<uint8_t> Get() override;
    bool Reset(uint32_t new_size) override;
    void Clear() override;
    uint32_t Size() const override;

   private:
    // Maximum frame size that OnFramesReady() can accept.
    static constexpr size_t kMaxFrameSize = 512 * 1024;

    // Buffer backing the spans created by this class.
    uint8_t pending_buffer_[kMaxFrameSize];

    // Current offset for data in |pending_buffer_|.
    uint32_t pending_buffer_offset_ = 0;

    // Remaining bytes to write from |pending_buffer_|.
    uint32_t pending_buffer_remaining_bytes_ = 0;
  };

  // Closes |data_pipe_| and resets the Consumer in |receiver_|. No frames will
  // be received after this call.
  void CloseDataPipeOnError();

  // Callback when |data_pipe_| can be written to again after it was full.
  void OnPipeWritable(MojoResult result);

  // Processes a ready frame, if both one is ready and a read callback is
  // pending.
  void MaybeSendNextFrame();

  bool WriteBufferToDataPipe();

  // openscreen::cast::Receiver::Consumer implementation.
  void OnFramesReady(int next_frame_buffer_size) override;

  // This receiver should skip all frames with id less than this value. Set by a
  // call to FlushUntil() and 0 when no flush is ongoing.
  uint32_t skip_until_frame_id_ = 0;

  const raw_ptr<openscreen::cast::Receiver> receiver_;
  mojo::ScopedDataPipeProducerHandle data_pipe_;
  const FrameReceivedCB frame_received_cb_;

  BufferDataWrapper data_wrapper_;

  // Provides notifications about |data_pipe_| readiness.
  mojo::SimpleWatcher pipe_watcher_;

  bool is_read_pending_ = false;

  base::OnceClosure no_frames_available_cb_;

  // Closure called on every new frame.
  base::RepeatingClosure on_new_frame_;

  // Factory to use for creating DecoderBuffers.
  std::unique_ptr<DecoderBufferFactory> decoder_buffer_factory_;
};

}  // namespace cast_streaming

#endif  // COMPONENTS_CAST_STREAMING_BROWSER_FRAME_STREAM_CONSUMER_H_