File: rpc_demuxer_stream_handler.h

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (207 lines) | stat: -rw-r--r-- 8,056 bytes parent folder | download | duplicates (9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COMPONENTS_CAST_STREAMING_BROWSER_CONTROL_REMOTING_RPC_DEMUXER_STREAM_HANDLER_H_
#define COMPONENTS_CAST_STREAMING_BROWSER_CONTROL_REMOTING_RPC_DEMUXER_STREAM_HANDLER_H_

#include <memory>

#include "base/functional/callback.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "components/cast_streaming/browser/common/demuxer_stream_client.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/video_decoder_config.h"
#include "media/cast/openscreen/rpc_call_message_handler.h"
#include "third_party/openscreen/src/cast/streaming/public/rpc_messenger.h"

namespace base {
class SequencedTaskRunner;
}  // namespace base

namespace openscreen::cast {
class RpcMessage;
}  // namespace openscreen::cast

namespace cast_streaming::remoting {

// Wrapper around all RPC operations associated with a DemuxerStream. This one
// instance handles interactions with both audio and video DemuxerStream
// instances.
class RpcDemuxerStreamHandler
    : public media::cast::RpcDemuxerStreamCBMessageHandler {
 public:
  // Class responsible for handling callbacks from this class upon a change in
  // config.
  class Client {
   public:
    virtual ~Client();

    virtual void OnNewAudioConfig(media::AudioDecoderConfig new_config) = 0;
    virtual void OnNewVideoConfig(media::VideoDecoderConfig new_config) = 0;
  };

  // Creates a new instance of this class. |client| is expected to outlive this
  // class.
  using HandleFactory =
      base::RepeatingCallback<openscreen::cast::RpcMessenger::Handle()>;
  using RpcProcessMessageCB = base::RepeatingCallback<void(
      openscreen::cast::RpcMessenger::Handle,
      std::unique_ptr<openscreen::cast::RpcMessage>)>;
  RpcDemuxerStreamHandler(scoped_refptr<base::SequencedTaskRunner> task_runner,
                          Client* client,
                          HandleFactory handle_factory,
                          RpcProcessMessageCB process_message_cb);

  ~RpcDemuxerStreamHandler() override;

  // To be called when the RPC_ACQUIRE_DEMUXER message is received. Acts to
  // immediately create and send an RPC message for initialization of each
  // valid demuxer stream.
  void OnRpcAcquireDemuxer(
      openscreen::cast::RpcMessenger::Handle audio_stream_handle,
      openscreen::cast::RpcMessenger::Handle video_stream_handle);

  // To be called when the RPC_DS_ENABLEBITSTREAMCONVERTER_CALLBACK message is
  // received.
  void OnRpcBitstreamConverterEnabled(
      openscreen::cast::RpcMessenger::Handle handle,
      bool success);

  base::WeakPtr<DemuxerStreamClient> GetAudioClient();
  base::WeakPtr<DemuxerStreamClient> GetVideoClient();

 private:
  friend class RpcDemuxerStreamHandlerTest;

  class MessageProcessor : public DemuxerStreamClient {
   public:
    enum class Type { kUnknown = 0, kAudio, kVideo };

    // Creates a new instance.
    //
    // |local_handle| is a new handle created for this instance, and with which
    // remote messages arriving here will have their handle set.
    // |remote_handle| is the handle received from the remote sender in the
    // OnRpcAcquiredDemuxer() call and used as the handle for sending messages
    // back to the sender.
    MessageProcessor(scoped_refptr<base::SequencedTaskRunner> task_runner,
                     Client* client,
                     RpcProcessMessageCB process_message_cb,
                     openscreen::cast::RpcMessenger::Handle local_handle,
                     openscreen::cast::RpcMessenger::Handle remote_handle,
                     Type type);
    ~MessageProcessor() override;

    bool OnRpcInitializeCallback(
        std::optional<media::AudioDecoderConfig> audio_config,
        std::optional<media::VideoDecoderConfig> video_config);
    bool OnRpcReadUntilCallback(
        std::optional<media::AudioDecoderConfig> audio_config,
        std::optional<media::VideoDecoderConfig> video_config,
        uint32_t total_frames_received);
    void OnBitstreamConverterEnabled(bool success);

    base::WeakPtr<MessageProcessor> GetWeakPtr();

    uint32_t total_frames_received() const { return total_frames_received_; }

    openscreen::cast::RpcMessenger::Handle local_handle() const {
      return local_handle_;
    }
    openscreen::cast::RpcMessenger::Handle remote_handle() const {
      return remote_handle_;
    }

    bool is_read_until_call_pending() const {
      return is_read_until_call_pending_;
    }
    bool is_read_until_call_ongoing() const {
      return is_read_until_call_ongoing_;
    }

   private:
    void OnBufferRequestTimeout();

    // DemuxerStreamClient implementation.
    //
    // OnNoBuffersAvailable() has the following send behavior:
    // - When first called, an immediate call will be made.
    // - If called again before the ACK for the above call has been received,
    //   the |is_read_until_call_pending_| flag is set and a new call will be
    //   made when this ACK is received.
    // - If the ACK is not received quickly, retries will be made at regular
    //   intervals with increasingly large requested frame counts.
    //
    // This retry behavior is required due to occasionally dropped messages
    // immediately following a FlushUntil() call.
    void EnableBitstreamConverter(BitstreamConverterEnabledCB cb) override;
    void OnNoBuffersAvailable() override;
    void OnError() override;

    base::TimeTicks last_request_time_;

    scoped_refptr<base::SequencedTaskRunner> task_runner_;
    raw_ptr<Client> client_;
    RpcProcessMessageCB process_message_cb_;
    openscreen::cast::RpcMessenger::Handle local_handle_;
    openscreen::cast::RpcMessenger::Handle remote_handle_;
    Type type_ = Type::kUnknown;

    uint32_t total_frames_received_ = 0;

    // Whether or not this class is currently waiting for an Ack from the
    // sender side for a READUNTIL call.
    bool is_read_until_call_ongoing_ = false;

    // Whether or not a new READUNTIL call should be made immediately following
    // the ACK for the currently ongoing READUNTIL call.
    bool is_read_until_call_pending_ = false;

    // The number of consecuitive READUNTIL calls for which an ACK has not been
    // received within a reasonable amount of time.
    int failed_consecutive_read_until_requests_ = 0;

    // Timer to re-execute a call if no response is received.
    base::OneShotTimer call_timeout_timer_;

    // Most recent callback for EnableBitstreamConverter().
    BitstreamConverterEnabledCB bitstream_converter_enabled_cb_;

    base::WeakPtrFactory<MessageProcessor> weak_factory_;
  };

  // Helpers for the above methods of the same name.
  void RequestMoreBuffers(MessageProcessor* message_processor);
  void OnError(MessageProcessor* message_processor);

  // RpcDemuxerStreamCBMessageHandler overrides.
  void OnRpcInitializeCallback(
      openscreen::cast::RpcMessenger::Handle handle,
      std::optional<media::AudioDecoderConfig> audio_config,
      std::optional<media::VideoDecoderConfig> video_config) override;
  void OnRpcReadUntilCallback(
      openscreen::cast::RpcMessenger::Handle handle,
      std::optional<media::AudioDecoderConfig> audio_config,
      std::optional<media::VideoDecoderConfig> video_config,
      uint32_t total_frames_received) override;
  void OnRpcEnableBitstreamConverterCallback(
      openscreen::cast::RpcMessenger::Handle handle,
      bool succeeded) override;

  scoped_refptr<base::SequencedTaskRunner> task_runner_;
  const raw_ptr<Client> client_;
  HandleFactory handle_factory_;
  RpcProcessMessageCB process_message_cb_;

  std::unique_ptr<MessageProcessor> audio_message_processor_;
  std::unique_ptr<MessageProcessor> video_message_processor_;
};

}  // namespace cast_streaming::remoting

#endif  // COMPONENTS_CAST_STREAMING_BROWSER_CONTROL_REMOTING_RPC_DEMUXER_STREAM_HANDLER_H_