1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2019-2025, by Samuel Williams.
require_relative "readable"
require_relative "writable"
require_relative "stream"
module Protocol
module HTTP
module Body
# A body that invokes a block that can read and write to a stream.
#
# In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement {stream?} and return `true`. When {stream?} returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using {each}.
#
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
module Streamable
# Generate a new streaming request body using the given block to generate the body.
#
# @parameter block [Proc] The block that generates the body.
# @returns [RequestBody] The streaming request body.
def self.request(&block)
RequestBody.new(block)
end
# Generate a new streaming response body using the given block to generate the body.
#
# @parameter request [Request] The request.
# @parameter block [Proc] The block that generates the body.
# @returns [ResponseBody] The streaming response body.
def self.response(request, &block)
ResponseBody.new(block, request.body)
end
# A output stream that can be written to by a block.
class Output
# Schedule the block to be executed in a fiber.
#
# @parameter input [Readable] The input stream.
# @parameter block [Proc] The block that generates the output.
# @returns [Output] The output stream.
def self.schedule(input, block)
self.new(input, block).tap(&:schedule)
end
# Initialize the output stream with the given input and block.
#
# @parameter input [Readable] The input stream.
# @parameter block [Proc] The block that generates the output.
def initialize(input, block)
@output = Writable.new
@stream = Stream.new(input, @output)
@block = block
end
# Schedule the block to be executed in a fiber.
#
# @returns [Fiber] The fiber.
def schedule
@fiber ||= Fiber.schedule do
@block.call(@stream)
end
end
# Read from the output stream (may block).
def read
@output.read
end
# Close the output stream.
#
# @parameter error [Exception | Nil] The error that caused this stream to be closed, if any.
def close(error = nil)
@output.close_write(error)
end
end
# Raised when a streaming body is consumed more than once.
class ConsumedError < StandardError
end
# A streaming body that can be read from and written to.
class Body < Readable
# Initialize the body with the given block and input.
#
# @parameter block [Proc] The block that generates the body.
# @parameter input [Readable] The input stream, if known.
def initialize(block, input = nil)
@block = block
@input = input
@output = nil
end
# @returns [Boolean] Whether the body can be streamed, which is true.
def stream?
true
end
# Invokes the block in a fiber which yields chunks when they are available.
def read
# We are reading chunk by chunk, allocate an output stream and execute the block to generate the chunks:
if @output.nil?
if @block.nil?
raise ConsumedError, "Streaming body has already been consumed!"
end
@output = Output.schedule(@input, @block)
@block = nil
end
@output.read
end
# Invoke the block with the given stream. The block can read and write to the stream, and must close the stream when finishing.
#
# @parameter stream [Stream] The stream to read and write to.
def call(stream)
if @block.nil?
raise ConsumedError, "Streaming block has already been consumed!"
end
block = @block
@input = @output = @block = nil
# Ownership of the stream is passed into the block, in other words, the block is responsible for closing the stream.
block.call(stream)
rescue => error
# If, for some reason, the block raises an error, we assume it may not have closed the stream, so we close it here:
stream.close
raise
end
# Close the input. The streaming body will eventually read all the input.
#
# @parameter error [Exception | Nil] The error that caused this stream to be closed, if any.
def close_input(error = nil)
if input = @input
@input = nil
input.close(error)
end
end
# Close the output, the streaming body will be unable to write any more output.
#
# @parameter error [Exception | Nil] The error that caused this stream to be closed, if any.
def close_output(error = nil)
if output = @output
@output = nil
output.close(error)
end
end
# Inspect the streaming body.
#
# @returns [String] a string representation of the streaming body.
def inspect
if @block
"#<#{self.class} block available, not consumed>"
elsif @output
"#<#{self.class} block consumed, output active>"
else
"#<#{self.class} block consumed, output closed>"
end
end
end
# A response body is used on the server side to generate the response body using a block.
class ResponseBody < Body
# Close will be invoked when all the output is written.
def close(error = nil)
self.close_output(error)
end
end
# A request body is used on the client side to generate the request body using a block.
#
# As the response body isn't available until the request is sent, the response body must be {stream}ed into the request body.
class RequestBody < Body
# Initialize the request body with the given block.
#
# @parameter block [Proc] The block that generates the body.
def initialize(block)
super(block, Writable.new)
end
# Close will be invoked when all the input is read.
def close(error = nil)
self.close_input(error)
end
# Stream the response body into the block's input.
def stream(body)
body&.each do |chunk|
@input.write(chunk)
end
rescue => error
ensure
@input.close_write(error)
end
end
end
end
end
end
|