File: stream.rb

package info (click to toggle)
ruby-protocol-http 0.55.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 840 kB
  • sloc: ruby: 6,904; makefile: 4
file content (426 lines) | stat: -rw-r--r-- 12,663 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2025, by Samuel Williams.
# Copyright, 2023, by Genki Takiuchi.
# Copyright, 2025, by William T. Nelson.

require_relative "buffered"

module Protocol
	module HTTP
		module Body
			# The input stream is an IO-like object which contains the raw HTTP POST data. When applicable, its external encoding must be “ASCII-8BIT” and it must be opened in binary mode, for Ruby 1.9 compatibility. The input stream must respond to gets, each, read and rewind.
			class Stream
				# The default line separator, used by {gets}.
				NEWLINE = "\n"
				
				# Initialize the stream with the given input and output.
				#
				# @parameter input [Readable] The input stream.
				# @parameter output [Writable] The output stream.
				def initialize(input = nil, output = Buffered.new)
					@input = input
					@output = output
					
					raise ArgumentError, "Non-writable output!" unless output.respond_to?(:write)
					
					# Will hold remaining data in `#read`.
					@buffer = nil
					
					@closed = false
					@closed_read = false
				end
				
				# @attribute [Readable] The input stream.
				attr :input
				
				# @attribute [Writable] The output stream.
				attr :output
				
				# This provides a read-only interface for data, which is surprisingly tricky to implement correctly.
				module Reader
					# Read data from the underlying stream.
					#
					# If given a non-negative length, it will read at most that many bytes from the stream. If the stream is at EOF, it will return nil.
					#
					# If the length is not given, it will read all data until EOF, or return an empty string if the stream is already at EOF.
					#
					# If buffer is given, then the read data will be placed into buffer instead of a newly created String object.
					#
					# @parameter length [Integer] the amount of data to read
					# @parameter buffer [String] the buffer which will receive the data
					# @returns [String] a buffer containing the data
					def read(length = nil, buffer = nil)
						return "" if length == 0
						
						buffer ||= String.new.force_encoding(Encoding::BINARY)
						
						# Take any previously buffered data and replace it into the given buffer.
						if @buffer
							buffer.replace(@buffer)
							@buffer = nil
						else
							buffer.clear
						end
						
						if length
							while buffer.bytesize < length and chunk = read_next
								buffer << chunk
							end
							
							# This ensures the subsequent `slice!` works correctly.
							buffer.force_encoding(Encoding::BINARY)
							
							# This will be at least one copy:
							@buffer = buffer.byteslice(length, buffer.bytesize)
							
							# This should be zero-copy:
							buffer.slice!(length, buffer.bytesize)
							
							if buffer.empty?
								return nil
							else
								return buffer
							end
						else
							while chunk = read_next
								buffer << chunk
							end
							
							return buffer
						end
					end
					
					# Read some bytes from the stream.
					#
					# If the length is given, at most length bytes will be read. Otherwise, one chunk of data from the underlying stream will be read.
					#
					# Will avoid reading from the underlying stream if there is buffered data available.
					#
					# @parameter length [Integer] The maximum number of bytes to read.
					def read_partial(length = nil, buffer = nil)
						if @buffer
							if buffer
								buffer.replace(@buffer)
							else
								buffer = @buffer
							end
							@buffer = nil
						else
							if chunk = read_next
								if buffer
									buffer.replace(chunk)
								else
									buffer = chunk
								end
							else
								buffer&.clear
								buffer = nil
							end
						end
						
						if buffer and length
							if buffer.bytesize > length
								# This ensures the subsequent `slice!` works correctly.
								buffer.force_encoding(Encoding::BINARY)
								
								@buffer = buffer.byteslice(length, buffer.bytesize)
								buffer.slice!(length, buffer.bytesize)
							end
						end
						
						return buffer
					end
					
					# Similar to {read_partial} but raises an `EOFError` if the stream is at EOF.
					#
					# @parameter length [Integer] The maximum number of bytes to read.
					# @parameter buffer [String] The buffer to read into.
					def readpartial(length, buffer = nil)
						read_partial(length, buffer) or raise EOFError, "End of file reached!"
					end
					
					# Iterate over each chunk of data from the input stream.
					#
					# @yields {|chunk| ...} Each chunk of data.
					def each(&block)
						return to_enum unless block_given?
						
						if @buffer
							yield @buffer
							@buffer = nil
						end
						
						while chunk = read_next
							yield chunk
						end
					end
					
					# Read data from the stream without blocking if possible.
					#
					# @parameter length [Integer] The maximum number of bytes to read.
					# @parameter buffer [String | Nil] The buffer to read into.
					def read_nonblock(length, buffer = nil, exception: nil)
						@buffer ||= read_next
						chunk = nil
						
						unless @buffer
							buffer&.clear
							return
						end
						
						if @buffer.bytesize > length
							chunk = @buffer.byteslice(0, length)
							@buffer = @buffer.byteslice(length, @buffer.bytesize)
						else
							chunk = @buffer
							@buffer = nil
						end
						
						if buffer
							buffer.replace(chunk)
						else
							buffer = chunk
						end
						
						return buffer
					end
					
					# Read data from the stream until encountering pattern.
					#
					# @parameter pattern [String] The pattern to match.
					# @parameter offset [Integer] The offset to start searching from.
					# @parameter chomp [Boolean] Whether to remove the pattern from the returned data.
					# @returns [String] The contents of the stream up until the pattern, which is consumed but not returned.
					def read_until(pattern, offset = 0, chomp: false)
						# We don't want to split on the pattern, so we subtract the size of the pattern.
						split_offset = pattern.bytesize - 1
						
						@buffer ||= read_next
						return nil if @buffer.nil?
						
						until index = @buffer.index(pattern, offset)
							offset = @buffer.bytesize - split_offset
							
							offset = 0 if offset < 0
							
							if chunk = read_next
								@buffer << chunk
							else
								return nil
							end
						end
						
						@buffer.freeze
						matched = @buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
						@buffer = @buffer.byteslice(index+pattern.bytesize, @buffer.bytesize)
						
						return matched
					end
					
					# Read a single line from the stream.
					#
					# @parameter separator [String] The line separator, defaults to `\n`.
					# @parameter limit [Integer] The maximum number of bytes to read.
					# @parameter *options [Hash] Additional options, passed to {read_until}.
					def gets(separator = NEWLINE, limit = nil, chomp: false)
						# If the separator is an integer, it is actually the limit:
						if separator.is_a?(Integer)
							limit = separator
							separator = NEWLINE
						end
						
						# If no separator is given, this is the same as a read operation:
						if separator.nil?
							# I tried using `read(limit)` here but it will block until the limit is reached, which is not usually desirable behaviour.
							return read_partial(limit)
						end
						
						# We don't want to split on the separator, so we subtract the size of the separator:
						split_offset = separator.bytesize - 1
						
						@buffer ||= read_next
						return nil if @buffer.nil?
						
						offset = 0
						until index = @buffer.index(separator, offset)
							offset = @buffer.bytesize - split_offset
							offset = 0 if offset < 0
							
							# If we have gone past the limit, we are done:
							if limit and offset >= limit
								@buffer.freeze
								matched = @buffer.byteslice(0, limit)
								@buffer = @buffer.byteslice(limit, @buffer.bytesize)
								return matched
							end
							
							# Read more data:
							if chunk = read_next
								@buffer << chunk
							else
								# No more data could be read, return the remaining data:
								buffer = @buffer
								@buffer = nil
								
								# Return nil for empty buffers, otherwise return the content:
								if buffer && !buffer.empty?
									return buffer
								else
									return nil
								end
							end
						end
						
						# Freeze the buffer, as this enables us to use byteslice without generating a hidden copy:
						@buffer.freeze
						
						if limit and index > limit
							line = @buffer.byteslice(0, limit)
							@buffer = @buffer.byteslice(limit, @buffer.bytesize)
						else
							line = @buffer.byteslice(0, index+(chomp ? 0 : separator.bytesize))
							@buffer = @buffer.byteslice(index+separator.bytesize, @buffer.bytesize)
						end
						
						return line
					end
				end
				
				include Reader
				
				# Write data to the underlying stream.
				#
				# @parameter buffer [String] The data to write.
				# @raises [IOError] If the stream is not writable.
				# @returns [Integer] The number of bytes written.
				def write(buffer)
					if @output
						@output.write(buffer)
						return buffer.bytesize
					else
						raise IOError, "Stream is not writable, output has been closed!"
					end
				end
				
				# Write data to the stream using {write}.
				#
				# Provided for compatibility with IO-like objects.
				#
				# @parameter buffer [String] The data to write.
				# @parameter exception [Boolean] Whether to raise an exception if the write would block, currently ignored.
				# @returns [Integer] The number of bytes written.
				def write_nonblock(buffer, exception: nil)
					write(buffer)
				end
				
				# Write data to the stream using {write}.
				def << buffer
					write(buffer)
				end
				
				# Write lines to the stream.
				#
				# The current implementation buffers the lines and writes them in a single operation.
				#
				# @parameter arguments [Array(String)] The lines to write.
				# @parameter separator [String] The line separator, defaults to `\n`.
				def puts(*arguments, separator: NEWLINE)
					buffer = ::String.new
					
					arguments.each do |argument|
						buffer << argument << separator
					end
					
					write(buffer)
				end
				
				# Flush the output stream.
				#
				# This is currently a no-op.
				def flush
				end
				
				# Close the input body.
				#
				# If, while processing the data that was read from this stream, an error is encountered, it should be passed to this method.
				#
				# @parameter error [Exception | Nil] The error that was encountered, if any.
				def close_read(error = nil)
					if input = @input
						@input = nil
						@closed_read = true
						@buffer = nil
						
						input.close(error)
					end
				end
				
				# Close the output body.
				#
				# If, while generating the data that is written to this stream, an error is encountered, it should be passed to this method.
				#
				# @parameter error [Exception | Nil] The error that was encountered, if any.
				def close_write(error = nil)
					if output = @output
						@output = nil
						
						output.close_write(error)
					end
				end
				
				# Close the input and output bodies.
				#
				# @parameter error [Exception | Nil] The error that caused this stream to be closed, if any.
				def close(error = nil)
					self.close_read(error)
					self.close_write(error)
					
					return nil
				ensure
					@closed = true
				end
				
				# @returns [Boolean] Whether the stream has been closed.
				def closed?
					@closed
				end
				
				# Inspect the stream.
				#
				# @returns [String] a string representation of the stream.
				def inspect
					buffer_info = @buffer ? "#{@buffer.bytesize} bytes buffered" : "no buffer"
					
					status = []
					status << "closed" if @closed
					status << "read-closed" if @closed_read
					
					status_info = status.empty? ? "open" : status.join(", ")
					
					return "#<#{self.class} #{buffer_info}, #{status_info}>"
				end
				
				# @returns [Boolean] Whether there are any output chunks remaining.
				def empty?
					@output.empty?
				end
				
				private
				
				# Read the next chunk of data from the input stream.
				#
				# @returns [String] The next chunk of data.
				# @raises [IOError] If the input stream was explicitly closed.
				def read_next
					if @input
						return @input.read
					elsif @closed_read
						raise IOError, "Stream is not readable, input has been closed!"
					end
				end
			end
		end
	end
end