File: operations.rb

package info (click to toggle)
ruby-ethon 0.16.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 676 kB
  • sloc: ruby: 5,403; sh: 9; makefile: 8
file content (228 lines) | stat: -rw-r--r-- 6,244 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# frozen_string_literal: true
module Ethon
  class Multi # :nodoc
    # This module contains logic to run a multi.
    module Operations
      STARTED_MULTI = "ETHON: started MULTI"
      PERFORMED_MULTI = "ETHON: performed MULTI"

      # Return the multi handle. Inititialize multi handle,
      # in case it didn't happened already.
      #
      # @example Return multi handle.
      #   multi.handle
      #
      # @return [ FFI::Pointer ] The multi handle.
      def handle
        @handle ||= FFI::AutoPointer.new(Curl.multi_init, Curl.method(:multi_cleanup))
      end

      # Initialize variables.
      #
      # @example Initialize variables.
      #   multi.init_vars
      #
      # @return [ void ]
      def init_vars
        if @execution_mode == :perform
          @timeout = ::FFI::MemoryPointer.new(:long)
          @timeval = Curl::Timeval.new
          @fd_read = Curl::FDSet.new
          @fd_write = Curl::FDSet.new
          @fd_excep = Curl::FDSet.new
          @max_fd = ::FFI::MemoryPointer.new(:int)
        elsif @execution_mode == :socket_action
          @running_count_pointer = FFI::MemoryPointer.new(:int)
        end
      end

      # Perform multi.
      #
      # @return [ nil ]
      #
      # @example Perform multi.
      #   multi.perform
      def perform
        ensure_execution_mode(:perform)

        Ethon.logger.debug(STARTED_MULTI)
        while ongoing?
          run
          timeout = get_timeout
          next if timeout == 0
          reset_fds
          set_fds(timeout)
        end
        Ethon.logger.debug(PERFORMED_MULTI)
        nil
      end

      # Prepare multi.
      #
      # @return [ nil ]
      #
      # @example Prepare multi.
      #   multi.prepare
      #
      # @deprecated It is no longer necessary to call prepare.
      def prepare
        Ethon.logger.warn(
          "ETHON: It is no longer necessay to call "+
          "Multi#prepare. Its going to be removed "+
          "in future versions."
        )
      end

      # Continue execution with an external IO loop.
      #
      # @example When no sockets are ready yet, or to begin.
      #   multi.socket_action
      #
      # @example When a socket is readable
      #   multi.socket_action(io_object, [:in])
      #
      # @example When a socket is readable and writable
      #   multi.socket_action(io_object, [:in, :out])
      #
      # @return [ Symbol ] The Curl.multi_socket_action return code.
      def socket_action(io = nil, readiness = 0)
        ensure_execution_mode(:socket_action)

        fd = if io.nil?
          ::Ethon::Curl::SOCKET_TIMEOUT
        elsif io.is_a?(Integer)
          io
        else
          io.fileno
        end

        code = Curl.multi_socket_action(handle, fd, readiness, @running_count_pointer)
        @running_count = @running_count_pointer.read_int

        check

        code
      end

      # Return whether the multi still contains requests or not.
      #
      # @example Return if ongoing.
      #   multi.ongoing?
      #
      # @return [ Boolean ] True if ongoing, else false.
      def ongoing?
        easy_handles.size > 0 || (!defined?(@running_count) || running_count > 0)
      end

      private

      # Get timeout.
      #
      # @example Get timeout.
      #   multi.get_timeout
      #
      # @return [ Integer ] The timeout.
      #
      # @raise [ Ethon::Errors::MultiTimeout ] If getting the timeout fails.
      def get_timeout
        code = Curl.multi_timeout(handle, @timeout)
        raise Errors::MultiTimeout.new(code) unless code == :ok
        timeout = @timeout.read_long
        timeout = 1 if timeout < 0
        timeout
      end

      # Reset file describtors.
      #
      # @example Reset fds.
      #   multi.reset_fds
      #
      # @return [ void ]
      def reset_fds
        @fd_read.clear
        @fd_write.clear
        @fd_excep.clear
      end

      # Set fds.
      #
      # @example Set fds.
      #   multi.set_fds
      #
      # @return [ void ]
      #
      # @raise [ Ethon::Errors::MultiFdset ] If setting the file descriptors fails.
      # @raise [ Ethon::Errors::Select ] If select fails.
      def set_fds(timeout)
        code = Curl.multi_fdset(handle, @fd_read, @fd_write, @fd_excep, @max_fd)
        raise Errors::MultiFdset.new(code) unless code == :ok
        max_fd = @max_fd.read_int
        if max_fd == -1
          sleep(0.001)
        else
          @timeval[:sec] = timeout / 1000
          @timeval[:usec] = (timeout * 1000) % 1000000
          loop do
            code = Curl.select(max_fd + 1, @fd_read, @fd_write, @fd_excep, @timeval)
            break unless code < 0 && ::FFI.errno == Errno::EINTR::Errno
          end
          raise Errors::Select.new(::FFI.errno) if code < 0
        end
      end

      # Check.
      #
      # @example Check.
      #   multi.check
      #
      # @return [ void ]
      def check
        msgs_left = ::FFI::MemoryPointer.new(:int)
        while true
          msg = Curl.multi_info_read(handle, msgs_left)
          break if msg.null?
          next if msg[:code] != :done
          easy = easy_handles.find{ |e| e.handle == msg[:easy_handle] }
          easy.return_code = msg[:data][:code]
          Ethon.logger.debug { "ETHON:         performed #{easy.log_inspect}" }
          delete(easy)
          easy.complete
        end
      end

      # Run.
      #
      # @example Run
      #   multi.run
      #
      # @return [ void ]
      def run
        running_count_pointer = FFI::MemoryPointer.new(:int)
        begin code = trigger(running_count_pointer) end while code == :call_multi_perform
        check
      end

      # Trigger.
      #
      # @example Trigger.
      #   multi.trigger
      #
      # @return [ Symbol ] The Curl.multi_perform return code.
      def trigger(running_count_pointer)
        code = Curl.multi_perform(handle, running_count_pointer)
        @running_count = running_count_pointer.read_int
        code
      end

      # Return number of running requests.
      #
      # @example Return count.
      #   multi.running_count
      #
      # @return [ Integer ] Number running requests.
      def running_count
        @running_count ||= nil
      end
    end
  end
end