File: chunked.rb

package info (click to toggle)
ruby-async-http 0.59.5-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 572 kB
  • sloc: ruby: 4,164; javascript: 40; makefile: 4
file content (86 lines) | stat: -rwxr-xr-x 2,157 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env ruby

require 'async'
require 'async/clock'
require 'async/barrier'
require 'async/semaphore'
require_relative '../../lib/async/http/endpoint'
require_relative '../../lib/async/http/client'

Async do
	url = "https://static.openfoodfacts.org/data/en.openfoodfacts.org.products.csv"
	
	endpoint = Async::HTTP::Endpoint.parse(url)
	client = Async::HTTP::Client.new(endpoint)
	
	headers = {'user-agent' => 'curl/7.69.1', 'accept' => '*/*'}
	
	file = File.open("products.csv", "w")
	Console.logger.info(self) {"Saving download to #{Dir.pwd}"}
	
	begin
		response = client.head(endpoint.path, headers)
		content_length = nil
		
		if response.success?
			unless response.headers['accept-ranges'].include?('bytes')
				raise "Does not advertise support for accept-ranges: bytes!"
			end
			
			unless content_length = response.body&.length
				raise "Could not determine length of response!"
			end
		end
	ensure
		response&.close
	end
	
	Console.logger.info(self) {"Content length: #{content_length/(1024**2)}MiB"}
	
	parts = []
	offset = 0
	chunk_size = 1024*1024
	
	start_time = Async::Clock.now
	amount = 0
	
	while offset < content_length
		byte_range_start = offset
		byte_range_end = [offset + chunk_size, content_length].min
		parts << (byte_range_start...byte_range_end)
		
		offset += chunk_size
	end
	
	Console.logger.info(self) {"Breaking download into #{parts.size} parts..."}
	
	semaphore = Async::Semaphore.new(8)
	barrier = Async::Barrier.new(parent: semaphore)
	
	while !parts.empty?
		barrier.async do
			part = parts.shift
			
			Console.logger.info(self) {"Issuing range request range: bytes=#{part.min}-#{part.max}"}
			
			response = client.get(endpoint.path, [
				["range", "bytes=#{part.min}-#{part.max-1}"],
				*headers
			])
			
			if response.success?
				Console.logger.info(self) {"Got response: #{response}... writing data for #{part}."}
				written = file.pwrite(response.read, part.min)
				
				amount += written
				
				duration = Async::Clock.now - start_time
				Console.logger.info(self) {"Rate: #{((amount.to_f/(1024**2))/duration).round(2)}MiB/s"}
			end
		end
	end
	
	barrier.wait
ensure
	client&.close
end