File: connection_handler_impl.h

package info (click to toggle)
chromium 138.0.7204.183-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 6,071,908 kB
  • sloc: cpp: 34,937,088; ansic: 7,176,967; javascript: 4,110,704; python: 1,419,953; asm: 946,768; xml: 739,971; pascal: 187,324; sh: 89,623; perl: 88,663; objc: 79,944; sql: 50,304; cs: 41,786; fortran: 24,137; makefile: 21,806; php: 13,980; tcl: 13,166; yacc: 8,925; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (146 lines) | stat: -rw-r--r-- 5,600 bytes parent folder | download | duplicates (9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef GOOGLE_APIS_GCM_ENGINE_CONNECTION_HANDLER_IMPL_H_
#define GOOGLE_APIS_GCM_ENGINE_CONNECTION_HANDLER_IMPL_H_

#include <stdint.h>

#include <vector>

#include "base/memory/weak_ptr.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "google_apis/gcm/engine/connection_handler.h"
#include "net/traffic_annotation/network_traffic_annotation.h"

namespace mcs_proto {
class LoginRequest;
}  // namespace mcs_proto

namespace gcm {

class SocketInputStream;
class SocketOutputStream;

class GCM_EXPORT ConnectionHandlerImpl : public ConnectionHandler {
 public:
  // Must be called on |io_task_runner|.
  // |io_task_runner|: for running IO tasks. When provided, it could be a
  //     wrapper on top of base::SingleThreadTaskRunner::GetCurrentDefault() to
  //     provide power management featueres so that a delayed task posted to it
  //     can wake the system up from sleep to perform the task.
  // |read_callback| will be invoked with the contents of any received protobuf
  // message.
  // |write_callback| will be invoked anytime a message has been successfully
  // sent. Note: this just means the data was sent to the wire, not that the
  // other end received it.
  // |connection_callback| will be invoked with any fatal read/write errors
  // encountered.
  ConnectionHandlerImpl(scoped_refptr<base::SequencedTaskRunner> io_task_runner,
                        base::TimeDelta read_timeout,
                        const ProtoReceivedCallback& read_callback,
                        const ProtoSentCallback& write_callback,
                        const ConnectionChangedCallback& connection_callback);

  ConnectionHandlerImpl(const ConnectionHandlerImpl&) = delete;
  ConnectionHandlerImpl& operator=(const ConnectionHandlerImpl&) = delete;

  ~ConnectionHandlerImpl() override;

  // ConnectionHandler implementation.
  void Init(const mcs_proto::LoginRequest& login_request,
            mojo::ScopedDataPipeConsumerHandle receive_stream,
            mojo::ScopedDataPipeProducerHandle send_stream) override;
  void Reset() override;
  bool CanSendMessage() const override;
  void SendMessage(const google::protobuf::MessageLite& message) override;

 private:
  // State machine for handling incoming data. See WaitForData(..) for usage.
  enum ProcessingState {
    // Processing the version, tag, and size packets (assuming minimum length
    // size packet). Only used during the login handshake.
    MCS_VERSION_TAG_AND_SIZE = 0,
    // Processing the tag and size packets (assuming minimum length size
    // packet). Used for normal messages.
    MCS_TAG_AND_SIZE,
    // Processing the size packet alone.
    MCS_SIZE,
    // Processing the protocol buffer bytes (for those messages with non-zero
    // sizes).
    MCS_PROTO_BYTES
  };

  // Sends the protocol version and login request. First step in the MCS
  // connection handshake.
  void Login(const google::protobuf::MessageLite& login_request);

  // SendMessage continuation. Invoked when Socket::Write completes.
  void OnMessageSent();

  // Starts the message processing process, which is comprised of the tag,
  // message size, and bytes packet types.
  void GetNextMessage();

  // Performs any necessary SocketInputStream refreshing until the data
  // associated with |packet_type| is fully ready, then calls the appropriate
  // OnGot* message to process the packet data. If the read times out,
  // will close the stream and invoke the connection callback.
  void WaitForData(ProcessingState state);

  // Incoming data helper methods.
  void OnGotVersion();
  void OnGotMessageTag();
  void OnGotMessageSize();
  void OnGotMessageBytes();

  // Timeout handler.
  void OnTimeout();

  // Closes the current connection.
  void CloseConnection();

  const scoped_refptr<base::SequencedTaskRunner> io_task_runner_;

  // Timeout policy: the timeout is only enforced while waiting on the
  // handshake (version and/or LoginResponse) or once at least a tag packet has
  // been received. It is reset every time new data is received, and is
  // only stopped when a full message is processed.
  // TODO(zea): consider enforcing a separate timeout when waiting for
  // a message to send.
  base::RetainingOneShotTimer read_timeout_timer_;

  // This connection's input/output streams.
  std::unique_ptr<SocketInputStream> input_stream_;
  std::unique_ptr<SocketOutputStream> output_stream_;

  // Whether the MCS login handshake has successfully completed. See Init(..)
  // description for more info on what the handshake involves.
  bool handshake_complete_;

  // State for the message currently being processed, if there is one.
  uint8_t message_tag_;
  uint32_t message_size_;

  ProtoReceivedCallback read_callback_;
  ProtoSentCallback write_callback_;
  ConnectionChangedCallback connection_callback_;

  // The number of bytes of the size packet read so far without finishing the
  // read. This can be up to but no larger than 5 (the max number of bytes for
  // a varint32).
  uint8_t size_packet_so_far_;
  // A temporary input buffer for holding large messages. Will hold
  // message_size_ bytes for messages larger than the normal size limit (and
  // will be empty otherwise).
  std::vector<uint8_t> payload_input_buffer_;

  base::WeakPtrFactory<ConnectionHandlerImpl> weak_ptr_factory_{this};
};

}  // namespace gcm

#endif  // GOOGLE_APIS_GCM_ENGINE_CONNECTION_HANDLER_IMPL_H_