File: bidirectional_stream.h

package info (click to toggle)
chromium 138.0.7204.157-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,071,864 kB
  • sloc: cpp: 34,936,859; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,967; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (249 lines) | stat: -rw-r--r-- 8,781 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef COMPONENTS_GRPC_SUPPORT_BIDIRECTIONAL_STREAM_H_
#define COMPONENTS_GRPC_SUPPORT_BIDIRECTIONAL_STREAM_H_

#include <memory>
#include <vector>

#include "base/memory/raw_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/synchronization/lock.h"
#include "net/http/bidirectional_stream.h"
#include "net/third_party/quiche/src/quiche/common/http/http_header_block.h"
#include "net/url_request/url_request_context_getter.h"

namespace base {
class Location;
}  // namespace base

namespace net {
class HttpRequestHeaders;
class WrappedIOBuffer;
}  // namespace net

namespace grpc_support {

// An adapter to net::BidirectionalStream.
//
// The app is expected to initiate the next step like ReadData.
//
// Threading notes:
// * Created and configured from any thread.
// * All callbacks into the Delegate are done on the network thread.
// * Public methods can be called on any thread.  In particular, Start,
//   ReadData, and WriteData can be called on any thread (including network
//   thread), and will post calls to the corresponding
//   {Start|ReadData|WriteData}OnNetworkThread to the network thread.
// * Owner of `BidirectionalStream` needs to ensure that the object is destroyed
//   on the network thread.
class BidirectionalStream : public net::BidirectionalStream::Delegate {
 public:
  class Delegate {
   public:
    virtual void OnStreamReady() = 0;

    virtual void OnHeadersReceived(
        const quiche::HttpHeaderBlock& response_headers,
        const char* negotiated_protocol) = 0;

    virtual void OnDataRead(char* data, int size) = 0;

    virtual void OnDataSent(const char* data) = 0;

    virtual void OnTrailersReceived(
        const quiche::HttpHeaderBlock& trailers) = 0;

    virtual void OnSucceeded() = 0;

    virtual void OnFailed(int error) = 0;

    virtual void OnCanceled() = 0;
  };

  BidirectionalStream(net::URLRequestContextGetter* request_context_getter,
                      Delegate* delegate);

  BidirectionalStream(const BidirectionalStream&) = delete;
  BidirectionalStream& operator=(const BidirectionalStream&) = delete;

  ~BidirectionalStream() override;

  // Disables automatic flushing of each buffer passed to WriteData().
  void disable_auto_flush(bool disable_auto_flush) {
    disable_auto_flush_ = disable_auto_flush;
  }

  // Delays sending request headers until first call to Flush().
  void delay_headers_until_flush(bool delay_headers_until_flush) {
    delay_headers_until_flush_ = delay_headers_until_flush;
  }

  // Validates method and headers, initializes and starts the request. If
  // |end_of_stream| is true, then stream is half-closed after sending header
  // frame and no data is expected to be written.
  // Returns 0 if request is valid and started successfully,
  // Returns -1 if |method| is not valid HTTP method name.
  // Returns position of invalid header value in |headers| if header name is
  // not valid.
  int Start(const char* url,
            int priority,
            const char* method,
            const net::HttpRequestHeaders& headers,
            bool end_of_stream);

  // Reads more data into |buffer| up to |capacity| bytes.
  bool ReadData(char* buffer, int capacity);

  // Writes |count| bytes of data from |buffer|. The |end_of_stream| is
  // passed to remote to indicate end of stream.
  bool WriteData(const char* buffer, int count, bool end_of_stream);

  // Sends buffers passed to WriteData().
  void Flush();

  // Cancels the request. The OnCanceled callback is invoked when request is
  // caneceled, and not other callbacks are invoked afterwards..
  void Cancel();

 private:
  // States of BidirectionalStream are tracked in |read_state_| and
  // |write_state_|.
  // The write state is separated as it changes independently of the read state.
  // There is one initial state: NOT_STARTED. There is one normal final state:
  // SUCCESS, reached after READING_DONE and WRITING_DONE. There are two
  // exceptional final states: CANCELED and ERROR, which can be reached from
  // any other non-final state.
  enum State {
    // Initial state, stream not started.
    NOT_STARTED,
    // Stream started, request headers are being sent.
    STARTED,
    // Waiting for ReadData() to be called.
    WAITING_FOR_READ,
    // Reading from the remote, OnDataRead callback will be invoked when done.
    READING,
    // There is no more data to read and stream is half-closed by the remote
    // side.
    READING_DONE,
    // Stream is canceled.
    CANCELED,
    // Error has occured, stream is closed.
    ERR,
    // Reading and writing are done, and the stream is closed successfully.
    SUCCESS,
    // Waiting for Flush() to be called.
    WAITING_FOR_FLUSH,
    // Writing to the remote, callback will be invoked when done.
    WRITING,
    // There is no more data to write and stream is half-closed by the local
    // side.
    WRITING_DONE,
  };

  // Container to hold buffers and sizes of the pending data to be written.
  class WriteBuffers {
   public:
    WriteBuffers();

    WriteBuffers(const WriteBuffers&) = delete;
    WriteBuffers& operator=(const WriteBuffers&) = delete;

    ~WriteBuffers();

    // Clears Write Buffers list.
    void Clear();

    // Appends |buffer| of |buffer_size| length to the end of buffer list.
    void AppendBuffer(const scoped_refptr<net::IOBuffer>& buffer,
                      int buffer_size);

    void MoveTo(WriteBuffers* target);

    // Returns true of Write Buffers list is empty.
    bool Empty() const;

    const std::vector<scoped_refptr<net::IOBuffer>>& buffers() const {
      return write_buffer_list;
    }

    const std::vector<int>& lengths() const { return write_buffer_len_list; }

   private:
    // Every IOBuffer in |write_buffer_list| points to the memory owned by the
    // application.
    std::vector<scoped_refptr<net::IOBuffer>> write_buffer_list;
    // A list of the length of each IOBuffer in |write_buffer_list|.
    std::vector<int> write_buffer_len_list;
  };

  // net::BidirectionalStream::Delegate implementations:
  void OnStreamReady(bool request_headers_sent) override;
  void OnHeadersReceived(
      const quiche::HttpHeaderBlock& response_headers) override;
  void OnDataRead(int bytes_read) override;
  void OnDataSent() override;
  void OnTrailersReceived(const quiche::HttpHeaderBlock& trailers) override;
  void OnFailed(int error) override;
  // Helper method to derive OnSucceeded.
  void MaybeOnSucceded();

  void StartOnNetworkThread(
      std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info);
  void ReadDataOnNetworkThread(scoped_refptr<net::WrappedIOBuffer> read_buffer,
                               int buffer_size);
  void WriteDataOnNetworkThread(scoped_refptr<net::WrappedIOBuffer> read_buffer,
                                int buffer_size,
                                bool end_of_stream);
  void FlushOnNetworkThread();
  void SendFlushingWriteData();
  void CancelOnNetworkThread();

  bool IsOnNetworkThread();
  void PostToNetworkThread(const base::Location& from_here,
                           base::OnceClosure task);

  // Read state is tracking reading flow. Only accessed on network thread.
  //                         | <--- READING <--- |
  //                         |                   |
  //                         |                   |
  // NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
  State read_state_;

  // Write state is tracking writing flow.  Only accessed on network thread.
  //                         | <--- WRITING <---  |
  //                         |                    |
  //                         |                    |
  // NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
  State write_state_;

  bool write_end_of_stream_;
  bool request_headers_sent_;

  bool disable_auto_flush_;
  bool delay_headers_until_flush_;

  const raw_ptr<net::URLRequestContextGetter> request_context_getter_;

  scoped_refptr<net::WrappedIOBuffer> read_buffer_;

  // Write data that is pending the flush.
  std::unique_ptr<WriteBuffers> pending_write_data_;
  // Write data that is flushed, but not sending yet.
  std::unique_ptr<WriteBuffers> flushing_write_data_;
  // Write data that is sending.
  std::unique_ptr<WriteBuffers> sending_write_data_;

  std::unique_ptr<net::BidirectionalStream> bidi_stream_;
  raw_ptr<Delegate> delegate_;

  base::WeakPtr<BidirectionalStream> weak_this_;
  base::WeakPtrFactory<BidirectionalStream> weak_factory_{this};
};

}  // namespace grpc_support

#endif  // COMPONENTS_GRPC_SUPPORT_BIDIRECTIONAL_STREAM_H_