File: streaming_retry.rb

package info (click to toggle)
ruby-aws-sdk-s3 1.170.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,740 kB
  • sloc: ruby: 16,388; makefile: 3
file content (139 lines) | stat: -rw-r--r-- 4,565 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# frozen_string_literal: true

require 'forwardable'

module Aws
  module S3
    module Plugins

      # A wrapper around BlockIO that adds no-ops for truncate and rewind
      # @api private
      class RetryableBlockIO
        extend Forwardable
        def_delegators :@block_io, :write, :read, :size

        def initialize(block_io)
          @block_io = block_io
        end

        def truncate(_integer); end

        def rewind; end
      end

      # A wrapper around ManagedFile that adds no-ops for truncate and rewind
      # @api private
      class RetryableManagedFile
        extend Forwardable
        def_delegators :@file, :write, :read, :size, :open?, :close

        def initialize(managed_file)
          @file = managed_file
        end

        def truncate(_integer); end

        def rewind; end
      end

      class NonRetryableStreamingError < StandardError

        def initialize(error)
          super('Unable to retry request - retry could result in processing duplicated chunks.')
          set_backtrace(error.backtrace)
          @original_error = error
        end

        attr_reader :original_error
      end

      # This handler works with the ResponseTarget plugin to provide smart
      # retries of S3 streaming operations that support the range parameter
      # (currently only: get_object).  When a 200 OK with a TruncatedBodyError
      # is received this handler will add a range header that excludes the
      # data that has already been processed (written to file or sent to
      # the target Proc).
      # It is important to not write data to the custom target in the case of
      # a non-success response. We do not want to write an XML error
      # message to someone's file or pass it to a user's Proc.
      # @api private
      class StreamingRetry < Seahorse::Client::Plugin

        class Handler < Seahorse::Client::Handler

          def call(context)
            target = context.params[:response_target] || context[:response_target]

            # retry is only supported when range is NOT set on the initial request
            if supported_target?(target) && !context.params[:range]
              add_event_listeners(context, target)
            end
            @handler.call(context)
          end

          private

          def add_event_listeners(context, target)
            context.http_response.on_headers(200..299) do
              case context.http_response.body
              when Seahorse::Client::BlockIO then
                context.http_response.body = RetryableBlockIO.new(context.http_response.body)
              when Seahorse::Client::ManagedFile then
                context.http_response.body = RetryableManagedFile.new(context.http_response.body)
              end
            end

            context.http_response.on_headers(400..599) do
              context.http_response.body = StringIO.new # something to write the error to
            end

            context.http_response.on_success(200..299) do
              body = context.http_response.body
              if body.is_a?(RetryableManagedFile) && body.open?
                body.close
              end
            end

            context.http_response.on_error do |error|
              if retryable_body?(context)
                if truncated_body?(error)
                  context.http_request.headers[:range] = "bytes=#{context.http_response.body.size}-"
                else
                  case context.http_response.body
                  when RetryableManagedFile
                    # call rewind on the underlying file
                    context.http_response.body.instance_variable_get(:@file).rewind
                  else
                    raise NonRetryableStreamingError, error
                  end
                end
              end
            end
          end

          def truncated_body?(error)
            error.is_a?(Seahorse::Client::NetworkingError) &&
              error.original_error.is_a?(
                Seahorse::Client::NetHttp::Handler::TruncatedBodyError
              )
          end

          def retryable_body?(context)
            context.http_response.body.is_a?(RetryableBlockIO) ||
              context.http_response.body.is_a?(RetryableManagedFile)
          end

          def supported_target?(target)
            case target
            when Proc, String, Pathname then true
            else false
            end
          end
        end

        handler(Handler, step: :sign, operations: [:get_object], priority: 10)

      end
    end
  end
end