File: multipart_stream_uploader.rb

package info (click to toggle)
ruby-aws-sdk-s3 1.170.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,740 kB
  • sloc: ruby: 16,388; makefile: 3
file content (202 lines) | stat: -rw-r--r-- 6,281 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# frozen_string_literal: true

require 'thread'
require 'set'
require 'tempfile'
require 'stringio'

module Aws
  module S3
    # @api private
    class MultipartStreamUploader
      # api private
      PART_SIZE = 5 * 1024 * 1024 # 5MB

      # api private
      THREAD_COUNT = 10

      # api private
      TEMPFILE_PREIX = 'aws-sdk-s3-upload_stream'.freeze

      # @api private
      CREATE_OPTIONS =
        Set.new(Client.api.operation(:create_multipart_upload).input.shape.member_names)

      # @api private
      UPLOAD_PART_OPTIONS =
        Set.new(Client.api.operation(:upload_part).input.shape.member_names)

      # @api private
      COMPLETE_UPLOAD_OPTIONS =
        Set.new(Client.api.operation(:complete_multipart_upload).input.shape.member_names)

      # @option options [Client] :client
      def initialize(options = {})
        @client = options[:client] || Client.new
        @tempfile = options[:tempfile]
        @part_size = options[:part_size] || PART_SIZE
        @thread_count = options[:thread_count] || THREAD_COUNT
      end

      # @return [Client]
      attr_reader :client

      # @option options [required,String] :bucket
      # @option options [required,String] :key
      # @option options [Integer] :thread_count (THREAD_COUNT)
      # @return [Seahorse::Client::Response] - the CompleteMultipartUploadResponse
      def upload(options = {}, &block)
        Aws::Plugins::UserAgent.metric('S3_TRANSFER') do
          upload_id = initiate_upload(options)
          parts = upload_parts(upload_id, options, &block)
          complete_upload(upload_id, parts, options)
        end
      end

      private

      def initiate_upload(options)
        @client.create_multipart_upload(create_opts(options)).upload_id
      end

      def complete_upload(upload_id, parts, options)
        @client.complete_multipart_upload(
          **complete_opts(options).merge(
            upload_id: upload_id,
            multipart_upload: { parts: parts }
          )
        )
      end

      def upload_parts(upload_id, options, &block)
        completed = Queue.new
        thread_errors = []
        errors = begin
          IO.pipe do |read_pipe, write_pipe|
            threads = upload_in_threads(
              read_pipe, completed,
              upload_part_opts(options).merge(upload_id: upload_id),
              thread_errors)
            begin
              block.call(write_pipe)
            ensure
              # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111
              write_pipe.close
            end
            threads.map(&:value).compact
          end
        rescue => e
          thread_errors + [e]
        end

        if errors.empty?
          Array.new(completed.size) { completed.pop }.sort_by { |part| part[:part_number] }
        else
          abort_upload(upload_id, options, errors)
        end
      end

      def abort_upload(upload_id, options, errors)
        @client.abort_multipart_upload(
          bucket: options[:bucket],
          key: options[:key],
          upload_id: upload_id
        )
        msg = "multipart upload failed: #{errors.map(&:message).join('; ')}"
        raise MultipartUploadError.new(msg, errors)
      rescue MultipartUploadError => error
        raise error
      rescue => error
        msg = "failed to abort multipart upload: #{error.message}. "\
          "Multipart upload failed: #{errors.map(&:message).join('; ')}"
        raise MultipartUploadError.new(msg, errors + [error])
      end

      def create_opts(options)
        CREATE_OPTIONS.inject({}) do |hash, key|
          hash[key] = options[key] if options.key?(key)
          hash
        end
      end

      def upload_part_opts(options)
        UPLOAD_PART_OPTIONS.inject({}) do |hash, key|
          hash[key] = options[key] if options.key?(key)
          hash
        end
      end

      def complete_opts(options)
        COMPLETE_UPLOAD_OPTIONS.inject({}) do |hash, key|
          hash[key] = options[key] if options.key?(key)
          hash
        end
      end

      def read_to_part_body(read_pipe)
        return if read_pipe.closed?
        temp_io = @tempfile ? Tempfile.new(TEMPFILE_PREIX) : StringIO.new(String.new)
        temp_io.binmode
        bytes_copied = IO.copy_stream(read_pipe, temp_io, @part_size)
        temp_io.rewind
        if bytes_copied == 0
          if Tempfile === temp_io
            temp_io.close
            temp_io.unlink
          end
          nil
        else
          temp_io
        end
      end

      def upload_in_threads(read_pipe, completed, options, thread_errors)
        mutex = Mutex.new
        part_number = 0
        options.fetch(:thread_count, @thread_count).times.map do
          thread = Thread.new do
            begin
              loop do
                body, thread_part_number = mutex.synchronize do
                  [read_to_part_body(read_pipe), part_number += 1]
                end
                break unless (body || thread_part_number == 1)
                begin
                  part = options.merge(
                    body: body,
                    part_number: thread_part_number,
                  )
                  resp = @client.upload_part(part)
                  completed_part = {etag: resp.etag, part_number: part[:part_number]}

                  # get the requested checksum from the response
                  if part[:checksum_algorithm]
                    k = "checksum_#{part[:checksum_algorithm].downcase}".to_sym
                    completed_part[k] = resp[k]
                  end
                  completed.push(completed_part)
                ensure
                  if Tempfile === body
                    body.close
                    body.unlink
                  elsif StringIO === body
                    body.string.clear
                  end
                end
              end
              nil
            rescue => error
              # keep other threads from uploading other parts
              mutex.synchronize do
                thread_errors.push(error)
                read_pipe.close_read unless read_pipe.closed?
              end
              error
            end
          end
          thread
        end
      end
    end
  end
end