1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
// Copyright 2025 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "services/network/partial_decoder.h"
#include "base/check_op.h"
#include "net/base/io_buffer.h"
#include "net/filter/filter_source_stream.h"
namespace network {
PartialDecoderResult::PartialDecoderResult(
base::queue<scoped_refptr<net::IOBufferWithSize>> raw_buffers,
const std::optional<net::Error>& completion_status)
: completion_status_(completion_status) {
// Wrap each raw buffer in a `DrainableIOBuffer` to allow for partial
// consumption in `ConsumeRawData`.
while (!raw_buffers.empty()) {
auto buffer = raw_buffers.front();
CHECK_NE(buffer->size(), 0);
raw_buffers_.push(
base::MakeRefCounted<net::DrainableIOBuffer>(buffer, buffer->size()));
raw_buffers.pop();
}
}
PartialDecoderResult::PartialDecoderResult(PartialDecoderResult&& other) =
default;
PartialDecoderResult& PartialDecoderResult::operator=(PartialDecoderResult&&) =
default;
PartialDecoderResult::~PartialDecoderResult() = default;
bool PartialDecoderResult::HasRawData() const {
return !raw_buffers_.empty();
}
size_t PartialDecoderResult::ConsumeRawData(base::span<uint8_t> out) {
base::span<uint8_t> remaining = out;
// Copy data from the raw buffers into `out` until either all raw data is
// consumed or `out` is full.
while (!raw_buffers_.empty() && !remaining.empty()) {
// Get the next raw buffer.
auto buf = raw_buffers_.front();
// Calculate the number of bytes to write, which is the minimum of the
// remaining space in `out` and the remaining bytes in the current buffer.
size_t write_size = std::min(
remaining.size(), base::checked_cast<size_t>(buf->BytesRemaining()));
auto [destination, rest] = remaining.split_at(write_size);
// Copy the data from the raw buffer to the output span.
destination.copy_from_nonoverlapping(buf->first(write_size));
// Mark the bytes as consumed in the DrainableIOBuffer.
buf->DidConsume(write_size);
// If the current raw buffer is fully consumed, remove it.
if (buf->BytesRemaining() == 0) {
raw_buffers_.pop();
}
// Update the remaining span.
remaining = rest;
}
return out.size() - remaining.size();
}
PartialDecoder::RecordingStream::RecordingStream(
base::RepeatingCallback<int(net::IOBuffer*, int)> read_callback)
: SourceStream(net::SourceStreamType::kNone),
read_callback_(std::move(read_callback)) {}
PartialDecoder::RecordingStream::~RecordingStream() = default;
int PartialDecoder::RecordingStream::Read(
net::IOBuffer* dest_buffer,
int buffer_size,
net::CompletionOnceCallback callback) {
// Call the underlying read callback to fetch more data.
int result = read_callback_.Run(dest_buffer, buffer_size);
if (result == net::ERR_IO_PENDING) {
// If the read is pending, store the destination buffer and callback for
// later use in `OnReadCompleted`.
pending_dest_buffer_ = dest_buffer;
pending_callback_ = std::move(callback);
return result;
}
HandleReadCompleted(result, dest_buffer);
return result;
}
std::string PartialDecoder::RecordingStream::Description() const {
return std::string();
}
bool PartialDecoder::RecordingStream::MayHaveMoreBytes() const {
return true;
}
void PartialDecoder::RecordingStream::OnReadCompleted(int result) {
CHECK_NE(result, net::ERR_IO_PENDING);
CHECK(pending_dest_buffer_);
HandleReadCompleted(result, pending_dest_buffer_.get());
// Clear the pending buffer and callback, then invoke the callback to signal
// completion to the `decoding_stream_`.
pending_dest_buffer_ = nullptr;
std::move(pending_callback_).Run(result);
}
base::queue<scoped_refptr<net::IOBufferWithSize>>
PartialDecoder::RecordingStream::TakeRawBuffers() {
return std::move(raw_buffers_);
}
void PartialDecoder::RecordingStream::HandleReadCompleted(
int result,
net::IOBuffer* dest_buffer) {
CHECK_NE(result, net::ERR_IO_PENDING);
if (result > 0) {
// Record the raw data read result.
auto new_buffer = base::MakeRefCounted<net::IOBufferWithSize>(result);
new_buffer->span().copy_from(
dest_buffer->first(base::checked_cast<size_t>(result)));
raw_buffers_.push(std::move(new_buffer));
} else {
// If the read completed, store the completion status.
completion_status_ = static_cast<net::Error>(result);
}
}
PartialDecoder::PartialDecoder(
base::RepeatingCallback<int(net::IOBuffer*, int)> read_raw_data_callback,
const std::vector<net::SourceStreamType>& types,
size_t decoded_buffer_size)
: decoded_buffer_(base::MakeRefCounted<net::GrowableIOBuffer>()) {
decoded_buffer_->SetCapacity(base::checked_cast<int>(decoded_buffer_size));
// Create a `RecordingStream` to intercept and record the raw data from the
// underlying read callback.
auto recording_stream =
std::make_unique<RecordingStream>(std::move(read_raw_data_callback));
recording_stream_ = recording_stream.get();
// Create a decoding stream that uses the `RecordingStream` as its input.
// This stream will apply the specified decoders (if any) to the recorded
// raw data.
decoding_stream_ = net::FilterSourceStream::CreateDecodingSourceStream(
std::move(recording_stream), types);
}
PartialDecoder::~PartialDecoder() = default;
int PartialDecoder::ReadDecodedDataMore(
base::OnceCallback<void(int)> callback) {
CHECK(HasRemainingBuffer());
// Attempt to read more decoded data from the `decoding_stream_` into the
// remaining capacity of the `decoded_buffer_`.
int result = decoding_stream_->Read(
decoded_buffer_.get(), decoded_buffer_->RemainingCapacity(),
base::BindOnce(&PartialDecoder::OnReadDecodedDataAsyncComplete,
base::Unretained(this)));
if (result == net::ERR_IO_PENDING) {
// If the read is pending, store the callback.
pending_read_decoded_data_more_callback_ = std::move(callback);
} else if (result > 0) {
// If data was read synchronously, update the offset of the
// `decoded_buffer_` to reflect the new data.
decoded_buffer_->set_offset(decoded_buffer_->offset() + result);
}
return result;
}
void PartialDecoder::OnReadDecodedDataAsyncComplete(int result) {
if (result > 0) {
// If data was read, update the offset of the `decoded_buffer_`.
decoded_buffer_->set_offset(decoded_buffer_->offset() + result);
}
CHECK(pending_read_decoded_data_more_callback_);
// Run the stored callback to notify the caller that the read is complete.
std::move(pending_read_decoded_data_more_callback_).Run(result);
}
// Forwards the completion of a raw data read to the recording stream.
void PartialDecoder::OnReadRawDataCompleted(int bytes_read) {
CHECK(read_in_progress());
CHECK(recording_stream_);
// The `recording_stream_` will handle recording the raw data and invoking
// the original read callback.
recording_stream_->OnReadCompleted(bytes_read);
}
bool PartialDecoder::read_in_progress() const {
return !!pending_read_decoded_data_more_callback_;
}
bool PartialDecoder::HasRemainingBuffer() const {
return decoded_buffer_->RemainingCapacity() > 0;
}
base::span<const uint8_t> PartialDecoder::decoded_data() const {
return decoded_buffer_->span_before_offset();
}
PartialDecoderResult PartialDecoder::TakeResult() && {
return PartialDecoderResult(recording_stream_->TakeRawBuffers(),
recording_stream_->completion_status());
}
} // namespace network
|