1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/nearby_sharing/instantmessaging/stream_parser.h"
#include <string_view>
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "net/base/io_buffer.h"
#include "third_party/protobuf/src/google/protobuf/io/coded_stream.h"
#include "third_party/protobuf/src/google/protobuf/wire_format_lite.h"
namespace {
using ::google::protobuf::internal::WireFormatLite;
// A buffer spare capacity limits the amount of times we need to resize the
// buffer when copying over data, which involves reallocating memory.
// We chose 512 because it is one of the larger standard sizes for
// a buffer, and we expect a lot of data to be received in the WebRTC
// signaling process.
constexpr int kReadBufferSpareCapacity = 512;
// The minimum number of bytes to parse the messages or the noop field of
// the StreamBody proto is 2 because the size of the tag and wire type is a
// single byte, and the smallest size information would be contained in another
// single byte.
constexpr int kMinimumBytesToParseNextMessagesField = 2;
} // namespace
StreamParser::StreamParser() = default;
StreamParser::~StreamParser() = default;
std::vector<
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
StreamParser::Append(std::string_view data) {
if (!unparsed_data_buffer_) {
unparsed_data_buffer_ = base::MakeRefCounted<net::GrowableIOBuffer>();
unparsed_data_buffer_->SetCapacity(data.size() + kReadBufferSpareCapacity);
} else if (unparsed_data_buffer_->RemainingCapacity() <
static_cast<int>(data.size())) {
unparsed_data_buffer_->SetCapacity(unparsed_data_buffer_->offset() +
data.size() + kReadBufferSpareCapacity);
}
DCHECK_GE(unparsed_data_buffer_->RemainingCapacity(),
static_cast<int>(data.size()));
UNSAFE_TODO(memcpy(unparsed_data_buffer_->data(), data.data(), data.size()));
unparsed_data_buffer_->set_offset(unparsed_data_buffer_->offset() +
data.size());
return ParseStreamIfAvailable();
}
std::vector<
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
StreamParser::ParseStreamIfAvailable() {
DCHECK(unparsed_data_buffer_);
std::vector<
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
receive_messages_responses;
base::span<uint8_t> unparsed_bytes_available =
unparsed_data_buffer_->span_before_offset();
if (unparsed_bytes_available.size() < kMinimumBytesToParseNextMessagesField) {
return receive_messages_responses;
}
google::protobuf::io::CodedInputStream input_stream(
unparsed_bytes_available.data(), unparsed_bytes_available.size());
size_t bytes_consumed = 0;
// We can't use StreamBody::ParseFromString() here, as it can't do partial
// parsing, nor can it tell how many bytes are consumed.
bool continue_parsing = unparsed_bytes_available.size() > 0;
while (continue_parsing) {
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
parsed_response;
StreamParsingResult result =
ParseNextMessagesFieldFromStream(&input_stream, &parsed_response);
switch (result) {
case StreamParser::StreamParsingResult::kSuccessfullyParsedResponse:
receive_messages_responses.push_back(parsed_response);
[[fallthrough]];
case StreamParser::StreamParsingResult::kNoop:
bytes_consumed =
base::checked_cast<size_t>(input_stream.CurrentPosition());
continue_parsing = bytes_consumed < unparsed_bytes_available.size();
break;
case StreamParser::StreamParsingResult::kNotEnoughDataYet:
case StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed:
continue_parsing = false;
break;
}
}
if (bytes_consumed == 0)
return receive_messages_responses;
// Shift the unread data back to the beginning of the buffer for the next
// iteration of reading data.
base::span<uint8_t> bytes_not_consumed =
unparsed_bytes_available.subspan(bytes_consumed);
unparsed_bytes_available.copy_prefix_from(bytes_not_consumed);
unparsed_data_buffer_->set_offset(bytes_not_consumed.size());
return receive_messages_responses;
}
StreamParser::StreamParsingResult
StreamParser::ParseNextMessagesFieldFromStream(
google::protobuf::io::CodedInputStream* input_stream,
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse*
parsed_response) {
// The WireFormat nature of protos allows for key:value pairs, each which
// contains the value of one proto field. The key (also called tag) for each
// pair is actually two values: the field number and the wire type.
//
// A typical stream looks like:
// [message tag][field data][message tag][field data]...
// where the message tag consists of the field id and the WireType, like so:
// [field id + WireType][field data][field id + WireType][field data]...
//
// In this case, we are only looking at the two fields of the StreamBody:
// "messages", which is field 1, and "noop", which is field 15. The "messages"
// field is the one containing the ReceiveMessageResponse proto, and the
// "noop" field is sent by the Tachyon server to keep the connection alive.
// Both of these fields we expect to be the 'bytes' data type, which the
// wire type says it is a length delimited value. From this, we know that the
// next bytes on should be a length followed by the actual data bytes (which
// will be read by WireFormatLite::ReadBytes). Note: this is only true when
// the wire type is set to WIRETYPE_LENGTH_DELIMITED and we know the field
// type is bytes.
//
// Therefore, for this specific instance, we expect our stream to look like
// this when it contains the ReceiveMessagesResponse with an InboxMessage or
// a FastPathReady message:
// [field id="messages"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data]
// or it will look like this when we receive a noop message:
// [field id="noop"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data]
// See https://developers.google.com/protocol-buffers/docs/encoding for
// further explanation.
// A message tag of zero means we don't have a valid tag or we don't have
// enough bytes to read a tag. If we cannot read the tag, we likely need to
// wait for more bytes to be appended to the input stream.
uint32_t messages_tag = input_stream->ReadTag();
if (messages_tag == 0)
return StreamParser::StreamParsingResult::kNotEnoughDataYet;
// If we were able to read the full tag above, and the field id does not
// match the StreamBody messages field body or the noop field body we were
// expecting then we are encountering a field we are not prepared to handle.
// TODO(crbug.com/1217150) Add a way to read through bytes of the unknown
// fields to skip it, in order to be more robost to StreamBody changes.
int field_number = WireFormatLite::GetTagFieldNumber(messages_tag);
if (field_number != chrome_browser_nearby_sharing_instantmessaging::
StreamBody::kMessagesFieldNumber &&
field_number != chrome_browser_nearby_sharing_instantmessaging::
StreamBody::kNoopFieldNumber) {
return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
}
// WireType specifies the format of the data to follow. Here, we are verifying
// the data we are receiving matching the data we are expecting, which is in
// the form of WIRETYPE_LENGTH_DELIMITED. We expect this to be
// WIRETYPE_LENGTH_DELIMITED because the proto defines "messages" and "noop"
// field as the "bytes" type.
if (WireFormatLite::GetTagWireType(messages_tag) !=
WireFormatLite::WireType::WIRETYPE_LENGTH_DELIMITED) {
return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
}
// Read the byte field, not including tags of the StreamBody, which will
// either be "StreamBody.messages" or "StreamBody.noop". If it is not
// successful, we likely need to wait for more bytes to be appended to the
// input stream to form a complete StreamBody. This function makes the
// assumption that we already know the field and read the tag to determine
// what field to read, which is why we need the checks above.
std::string stream_body_field_bytes;
if (!WireFormatLite::ReadBytes(input_stream, &stream_body_field_bytes))
return StreamParser::StreamParsingResult::kNotEnoughDataYet;
// Now that we have a complete "StreamBody.messages" or "StreamBody.noop"
// bytes field, we want to properly handle it. If we have a
// "StreamBody.messages", we want to transform the bytes into a
// ReceiveMessagesResponse and append it to the vector we are returning, then
// we can move along and read the next data from the buffer, if applicable.
// "StreamBody.noop" messages may be generated as a way to keep the connection
// to the server alive, and it is not an error. However, these messages do not
// contain a ReceiveMessagesResponse, but we still want to remove this data
// from the buffer and continue reading the next data, if applicable. We
// update the |is_noop_field_| to true to tell ParseStreamIfAvailable that
// although it receives an std::nullopt, it should still remove the bytes
// from the buffer.
if (field_number == chrome_browser_nearby_sharing_instantmessaging::
StreamBody::kNoopFieldNumber) {
return StreamParser::StreamParsingResult::kNoop;
}
if (!parsed_response->ParseFromString(stream_body_field_bytes)) {
LOG(ERROR) << "Failed to parse ReceiveMessagesResponse from stream body "
"message bytes.";
return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
}
return StreamParser::StreamParsingResult::kSuccessfullyParsedResponse;
}
|