File: multi.rb

package info (click to toggle)
ruby-curb 1.2.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 860 kB
  • sloc: ansic: 5,798; ruby: 4,466; makefile: 4
file content (294 lines) | stat: -rw-r--r-- 9,547 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# frozen_string_literal: true
module Curl
  class Multi
    class DownloadError < RuntimeError
      attr_accessor :errors
    end
    class << self
      # call-seq:
      #   Curl::Multi.get(['url1','url2','url3','url4','url5'], :follow_location => true) do|easy|
      #     easy
      #   end
      #
      # Blocking call to fetch multiple url's in parallel.
      def get(urls, easy_options={}, multi_options={}, &blk)
        url_confs = []
        urls.each do|url|
          url_confs << {:url => url, :method => :get}.merge(easy_options)
        end
        self.http(url_confs, multi_options) {|c,code,method| blk.call(c) if blk }
      end

      # call-seq:
      #
      #   Curl::Multi.post([{:url => 'url1', :post_fields => {'field1' => 'value1', 'field2' => 'value2'}},
      #                     {:url => 'url2', :post_fields => {'field1' => 'value1', 'field2' => 'value2'}},
      #                     {:url => 'url3', :post_fields => {'field1' => 'value1', 'field2' => 'value2'}}],
      #                    { :follow_location => true, :multipart_form_post => true },
      #                    {:pipeline => Curl::CURLPIPE_HTTP1}) do|easy|
      #     easy_handle_on_request_complete
      #   end
      #
      # Blocking call to POST multiple form's in parallel.
      #
      # urls_with_config: is a hash of url's pointing to the postfields to send
      # easy_options: are a set of common options to set on all easy handles
      # multi_options: options to set on the Curl::Multi handle
      #
      def post(urls_with_config, easy_options={}, multi_options={}, &blk)
        url_confs = []
        urls_with_config.each do|uconf|
          url_confs << uconf.merge(:method => :post).merge(easy_options)
        end
        self.http(url_confs, multi_options) {|c,code,method| blk.call(c) }
      end

      # call-seq:
      #
      #   Curl::Multi.put([{:url => 'url1', :put_data => "some message"},
      #                    {:url => 'url2', :put_data => IO.read('filepath')},
      #                    {:url => 'url3', :put_data => "maybe another string or socket?"],
      #                    {:follow_location => true},
      #                    {:pipeline => Curl::CURLPIPE_HTTP1}) do|easy|
      #     easy_handle_on_request_complete
      #   end
      #
      # Blocking call to POST multiple form's in parallel.
      #
      # urls_with_config: is a hash of url's pointing to the postfields to send
      # easy_options: are a set of common options to set on all easy handles
      # multi_options: options to set on the Curl::Multi handle
      #
      def put(urls_with_config, easy_options={}, multi_options={}, &blk)
        url_confs = []
        urls_with_config.each do|uconf|
          url_confs << uconf.merge(:method => :put).merge(easy_options)
        end
        self.http(url_confs, multi_options) {|c,code,method| blk.call(c) }
      end


      # call-seq:
      #
      # Curl::Multi.http( [
      #   { :url => 'url1', :method => :post,
      #     :post_fields => {'field1' => 'value1', 'field2' => 'value2'} },
      #   { :url => 'url2', :method => :get,
      #     :follow_location => true, :max_redirects => 3 },
      #   { :url => 'url3', :method => :put, :put_data => File.open('file.txt','rb') },
      #   { :url => 'url4', :method => :head }
      # ], {:pipeline => Curl::CURLPIPE_HTTP1})
      #
      # Blocking call to issue multiple HTTP requests with varying verb's.
      #
      # urls_with_config: is a hash of url's pointing to the easy handle options as well as the special option :method, that can by one of [:get, :post, :put, :delete, :head], when no verb is provided e.g. :method => nil -> GET is used
      # multi_options: options for the multi handle
      # blk: a callback, that yeilds when a handle is completed
      #
      def http(urls_with_config, multi_options={}, &blk)
        m = Curl::Multi.new

        # maintain a sane number of easy handles
        multi_options[:max_connects] = max_connects = multi_options.key?(:max_connects) ? multi_options[:max_connects] : 10

        free_handles = [] # keep a list of free easy handles

        # configure the multi handle
        multi_options.each { |k,v| m.send("#{k}=", v) }
        callbacks = [:on_progress,:on_debug,:on_failure,:on_success,:on_redirect,:on_missing,:on_body,:on_header]

        add_free_handle = proc do|conf, easy|
          c       = conf.dup # avoid being destructive to input
          url     = c.delete(:url)
          method  = c.delete(:method)
          headers = c.delete(:headers)

          easy    = Curl::Easy.new if easy.nil?

          easy.url = url

          # assign callbacks
          callbacks.each do |cb|
            cbproc = c.delete(cb)
            easy.send(cb,&cbproc) if cbproc
          end

          case method
          when :post
            fields = c.delete(:post_fields)
            # set the post post using the url fields
            easy.post_body = fields.map{|f,k| "#{easy.escape(f)}=#{easy.escape(k)}"}.join('&')
          when :put
            easy.put_data = c.delete(:put_data)
          when :head
            easy.head = true
          when :delete
            easy.delete = true
          when :get
          else
            # XXX: nil is treated like a GET
          end

          # headers is a special key
          headers.each {|k,v| easy.headers[k] = v } if headers

          #
          # use the remaining options as specific configuration to the easy handle
          # bad options should raise an undefined method error
          #
          c.each { |k,v| easy.send("#{k}=",v) }

          easy.on_complete {|curl|
            free_handles << curl
            blk.call(curl,curl.response_code,method) if blk
          }
          m.add(easy)
        end

        max_connects.times do
          conf = urls_with_config.pop
          add_free_handle.call(conf, nil) if conf
          break if urls_with_config.empty?
        end

        consume_free_handles = proc do
          # as we idle consume free handles
          if urls_with_config.size > 0 && free_handles.size > 0
            easy = free_handles.pop
            conf = urls_with_config.pop
            add_free_handle.call(conf, easy) if conf
          end
        end

        if urls_with_config.empty?
          m.perform
        else
          until urls_with_config.empty?
            m.perform do
              consume_free_handles.call
            end
            consume_free_handles.call
          end
          free_handles = nil
        end

      end

      # call-seq:
      #
      # Curl::Multi.download(['http://example.com/p/a/t/h/file1.txt','http://example.com/p/a/t/h/file2.txt']){|c|}
      #
      # will create 2 new files file1.txt and file2.txt
      #
      # 2 files will be opened, and remain open until the call completes
      #
      # when using the :post or :put method, urls should be a hash, including the individual post fields per post
      #
      def download(urls,easy_options={},multi_options={},download_paths=nil,&blk)
        errors = []
        procs = []
        files = []
        urls_with_config = []
        url_to_download_paths = {}

        urls.each_with_index do|urlcfg,i|
          if urlcfg.is_a?(Hash)
            url = url[:url]
          else
            url = urlcfg
          end

          if download_paths and download_paths[i]
            download_path = download_paths[i]
          else
            download_path = File.basename(url)
          end

          file = lambda do|dp|
            file = File.open(dp,"wb")
            procs << (lambda {|data| file.write data; data.size })
            files << file
            file
          end.call(download_path)

          if urlcfg.is_a?(Hash)
            urls_with_config << urlcfg.merge({:on_body => procs.last}.merge(easy_options))
          else
            urls_with_config << {:url => url, :on_body => procs.last, :method => :get}.merge(easy_options)
          end
          url_to_download_paths[url] = {:path => download_path, :file => file} # store for later
        end

        if blk
          # when injecting the block, ensure file is closed before yielding
          Curl::Multi.http(urls_with_config, multi_options) do |c,code,method|
            info = url_to_download_paths[c.url]
            begin
              file = info[:file]
              files.reject!{|f| f == file }
              file.close
            rescue => e
              errors << e
            end
            blk.call(c,info[:path])
          end
        else
          Curl::Multi.http(urls_with_config, multi_options)
        end

      ensure
        files.each {|f|
          begin
            f.close
          rescue => e
            errors << e
          end
        }
        if errors.any?
          de = Curl::Multi::DownloadError.new
          de.errors = errors
          raise de
        end
      end
    end

    def cancel!
      requests.each do |_,easy|
        remove(easy)
      end
    end

    def idle?
      requests.empty?
    end

    def requests
      @requests ||= {}
    end

    def add(easy)
      return self if requests[easy.object_id]
      requests[easy.object_id] = easy
      _add(easy)
      self
    end

    def remove(easy)
      return self if !requests[easy.object_id]
      requests.delete(easy.object_id)
      _remove(easy)
      self
    end

    def close
      requests.values.each {|easy|
        _remove(easy)
      }
      @requests = {}
      _close
      self
    end


  end
end