File: stream_parser.cc

package info (click to toggle)
chromium 139.0.7258.127-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 6,122,068 kB
  • sloc: cpp: 35,100,771; ansic: 7,163,530; javascript: 4,103,002; python: 1,436,920; asm: 946,517; xml: 746,709; pascal: 187,653; perl: 88,691; sh: 88,436; objc: 79,953; sql: 51,488; cs: 44,583; fortran: 24,137; makefile: 22,147; tcl: 15,277; php: 13,980; yacc: 8,984; ruby: 7,485; awk: 3,720; lisp: 3,096; lex: 1,327; ada: 727; jsp: 228; sed: 36
file content (210 lines) | stat: -rw-r--r-- 10,045 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/nearby_sharing/instantmessaging/stream_parser.h"

#include <string_view>

#include "base/compiler_specific.h"
#include "base/logging.h"
#include "net/base/io_buffer.h"
#include "third_party/protobuf/src/google/protobuf/io/coded_stream.h"
#include "third_party/protobuf/src/google/protobuf/wire_format_lite.h"

namespace {

using ::google::protobuf::internal::WireFormatLite;

// A buffer spare capacity limits the amount of times we need to resize the
// buffer when copying over data, which involves reallocating memory.
// We chose 512 because it is one of the larger standard sizes for
// a buffer, and we expect a lot of data to be received in the WebRTC
// signaling process.
constexpr int kReadBufferSpareCapacity = 512;

// The minimum number of bytes to parse the messages or the noop field of
// the StreamBody proto is 2 because the size of the tag and wire type is a
// single byte, and the smallest size information would be contained in another
// single byte.
constexpr int kMinimumBytesToParseNextMessagesField = 2;

}  // namespace

StreamParser::StreamParser() = default;
StreamParser::~StreamParser() = default;

std::vector<
    chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
StreamParser::Append(std::string_view data) {
  if (!unparsed_data_buffer_) {
    unparsed_data_buffer_ = base::MakeRefCounted<net::GrowableIOBuffer>();
    unparsed_data_buffer_->SetCapacity(data.size() + kReadBufferSpareCapacity);
  } else if (unparsed_data_buffer_->RemainingCapacity() <
             static_cast<int>(data.size())) {
    unparsed_data_buffer_->SetCapacity(unparsed_data_buffer_->offset() +
                                       data.size() + kReadBufferSpareCapacity);
  }

  DCHECK_GE(unparsed_data_buffer_->RemainingCapacity(),
            static_cast<int>(data.size()));
  UNSAFE_TODO(memcpy(unparsed_data_buffer_->data(), data.data(), data.size()));
  unparsed_data_buffer_->set_offset(unparsed_data_buffer_->offset() +
                                    data.size());
  return ParseStreamIfAvailable();
}

std::vector<
    chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
StreamParser::ParseStreamIfAvailable() {
  DCHECK(unparsed_data_buffer_);
  std::vector<
      chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
      receive_messages_responses;

  base::span<uint8_t> unparsed_bytes_available =
      unparsed_data_buffer_->span_before_offset();
  if (unparsed_bytes_available.size() < kMinimumBytesToParseNextMessagesField) {
    return receive_messages_responses;
  }

  google::protobuf::io::CodedInputStream input_stream(
      unparsed_bytes_available.data(), unparsed_bytes_available.size());
  size_t bytes_consumed = 0;

  // We can't use StreamBody::ParseFromString() here, as it can't do partial
  // parsing, nor can it tell how many bytes are consumed.
  bool continue_parsing = unparsed_bytes_available.size() > 0;
  while (continue_parsing) {
    chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
        parsed_response;
    StreamParsingResult result =
        ParseNextMessagesFieldFromStream(&input_stream, &parsed_response);
    switch (result) {
      case StreamParser::StreamParsingResult::kSuccessfullyParsedResponse:
        receive_messages_responses.push_back(parsed_response);
        [[fallthrough]];
      case StreamParser::StreamParsingResult::kNoop:
        bytes_consumed =
            base::checked_cast<size_t>(input_stream.CurrentPosition());
        continue_parsing = bytes_consumed < unparsed_bytes_available.size();
        break;
      case StreamParser::StreamParsingResult::kNotEnoughDataYet:
      case StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed:
        continue_parsing = false;
        break;
    }
  }

  if (bytes_consumed == 0)
    return receive_messages_responses;

  // Shift the unread data back to the beginning of the buffer for the next
  // iteration of reading data.
  base::span<uint8_t> bytes_not_consumed =
      unparsed_bytes_available.subspan(bytes_consumed);
  unparsed_bytes_available.copy_prefix_from(bytes_not_consumed);
  unparsed_data_buffer_->set_offset(bytes_not_consumed.size());

  return receive_messages_responses;
}

StreamParser::StreamParsingResult
StreamParser::ParseNextMessagesFieldFromStream(
    google::protobuf::io::CodedInputStream* input_stream,
    chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse*
        parsed_response) {
  // The WireFormat nature of protos allows for key:value pairs, each which
  // contains the value of one proto field. The key (also called tag) for each
  // pair is actually two values: the field number and the wire type.
  //
  // A typical stream looks like:
  //      [message tag][field data][message tag][field data]...
  // where the message tag consists of the field id and the WireType, like so:
  //      [field id + WireType][field data][field id + WireType][field data]...
  //
  // In this case, we are only looking at the two fields of the StreamBody:
  // "messages", which is field 1, and "noop", which is field 15. The "messages"
  // field is the one containing the ReceiveMessageResponse proto, and the
  // "noop" field is sent by the Tachyon server to keep the connection alive.
  // Both of these fields we expect to be the 'bytes' data type, which the
  // wire type says it is a length delimited value. From this, we know that the
  // next bytes on should be a length followed by the actual data bytes (which
  // will be read by WireFormatLite::ReadBytes). Note: this is only true when
  // the wire type is set to  WIRETYPE_LENGTH_DELIMITED and we know the field
  // type is bytes.
  //
  // Therefore, for this specific instance, we expect our stream to look like
  // this when it contains the ReceiveMessagesResponse with an InboxMessage or
  // a FastPathReady message:
  //     [field id="messages"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data]
  // or it will look like this when we receive a noop message:
  //     [field id="noop"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data]
  // See https://developers.google.com/protocol-buffers/docs/encoding for
  // further explanation.

  // A message tag of zero means we don't have a valid tag or we don't have
  // enough bytes to read a tag. If we cannot read the tag, we likely need to
  // wait for more bytes to be appended to the input stream.
  uint32_t messages_tag = input_stream->ReadTag();
  if (messages_tag == 0)
    return StreamParser::StreamParsingResult::kNotEnoughDataYet;

  // If we were able to read the full tag above, and the field id does not
  // match the StreamBody messages field body or the noop field body we were
  // expecting then we are encountering a field we are not prepared to handle.
  // TODO(crbug.com/1217150) Add a way to read through bytes of the unknown
  // fields to skip it, in order to be more robost to StreamBody changes.
  int field_number = WireFormatLite::GetTagFieldNumber(messages_tag);
  if (field_number != chrome_browser_nearby_sharing_instantmessaging::
                          StreamBody::kMessagesFieldNumber &&
      field_number != chrome_browser_nearby_sharing_instantmessaging::
                          StreamBody::kNoopFieldNumber) {
    return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
  }

  // WireType specifies the format of the data to follow. Here, we are verifying
  // the data we are receiving matching the data we are expecting, which is in
  // the form of WIRETYPE_LENGTH_DELIMITED. We expect this to be
  // WIRETYPE_LENGTH_DELIMITED because the proto defines "messages" and "noop"
  // field as the "bytes" type.
  if (WireFormatLite::GetTagWireType(messages_tag) !=
      WireFormatLite::WireType::WIRETYPE_LENGTH_DELIMITED) {
    return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
  }

  // Read the byte field, not including tags of the StreamBody, which will
  // either be "StreamBody.messages" or "StreamBody.noop". If it is not
  // successful, we likely need to wait for more bytes to be appended to the
  // input stream to form a complete StreamBody. This function makes the
  // assumption that we already know the field and read the tag to determine
  // what field to read, which is why we need the checks above.
  std::string stream_body_field_bytes;
  if (!WireFormatLite::ReadBytes(input_stream, &stream_body_field_bytes))
    return StreamParser::StreamParsingResult::kNotEnoughDataYet;

  // Now that we have a complete "StreamBody.messages" or "StreamBody.noop"
  // bytes field, we want to properly handle it. If we have a
  // "StreamBody.messages", we want to transform the bytes into a
  // ReceiveMessagesResponse and append it to the vector we are returning, then
  // we can move along and read the next data from the buffer, if applicable.
  // "StreamBody.noop" messages may be generated as a way to keep the connection
  // to the server alive, and it is not an error. However, these messages do not
  // contain a ReceiveMessagesResponse, but we still want to remove this data
  // from the buffer and continue reading the next data, if applicable. We
  // update the |is_noop_field_| to true to tell ParseStreamIfAvailable that
  // although it receives an std::nullopt, it should still remove the bytes
  // from the buffer.
  if (field_number == chrome_browser_nearby_sharing_instantmessaging::
                          StreamBody::kNoopFieldNumber) {
    return StreamParser::StreamParsingResult::kNoop;
  }

  if (!parsed_response->ParseFromString(stream_body_field_bytes)) {
    LOG(ERROR) << "Failed to parse ReceiveMessagesResponse from stream body "
                  "message bytes.";
    return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
  }

  return StreamParser::StreamParsingResult::kSuccessfullyParsedResponse;
}