File: connection_driver.rb

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (212 lines) | stat: -rw-r--r-- 7,672 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

require 'socket'

module Qpid::Proton

  # Associate an AMQP {Connection} and {Transport} with an {IO}
  #
  # - {#read} reads AMQP binary data from the {IO} and generates events
  # - {#tick} generates timing-related events
  # - {#event} gets events to be dispatched to {Handler::MessagingHandler}s
  # - {#write} writes AMQP binary data to the {IO}
  #
  # Thread safety: The {ConnectionDriver} is not thread safe but separate
  # {ConnectionDriver} instances can be processed concurrently. The
  # {Container} handles multiple connections concurrently in multiple threads.
  #
  class ConnectionDriver
    # Create a {Connection} and {Transport} associated with +io+
    # @param io [IO] An {IO} or {IO}-like object that responds
    #   to {IO#read_nonblock} and {IO#write_nonblock}
    def initialize(io)
      @impl = Cproton.pni_connection_driver or raise NoMemoryError
      @io = io
      @rbuf = ""              # String for re-usable read buffer
    end

    # @return [Connection]
    def connection()
      @connection ||= Connection.wrap(Cproton.pni_connection_driver_connection(@impl))
    end

    # @return [Transport]
    def transport()
      @transport ||= Transport.wrap(Cproton.pni_connection_driver_transport(@impl))
    end

    # @return [IO] Allows ConnectionDriver to be passed directly to {IO#select}
    def to_io() @io; end

    # @return [Bool] True if the driver can read more data
    def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end

    # @return [Bool] True if the driver has data to write
    def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end

    # True if the ConnectionDriver has nothing left to do: both sides of the
    # transport are closed and there are no events to dispatch.
    def finished?() Cproton.pn_connection_driver_finished(@impl); end

    # Get the next event to dispatch, nil if no events available
    def event()
      e = Cproton.pn_connection_driver_next_event(@impl)
      Event.new(e) if e
    end

    # True if {#event} will return non-nil
    def event?() Cproton.pn_connection_driver_has_event(@impl); end

    # Iterator for all available events
    def each_event()
      while e = event
        yield e
      end
    end

    # Non-blocking read from {#io}, generate events for {#event}
    # IO errors are returned as transport errors by {#event}, not raised
    def read
      size = Cproton.pni_connection_driver_read_size(@impl)
      return if size <= 0
      @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time
      Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty?
    rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
      # Try again later.
    rescue EOFError         # EOF is not an error
      close_read
    rescue IOError, SystemCallError => e
      close e
    end

    # Non-blocking write to {#io}
    # IO errors are returned as transport errors by {#event}, not raised
    def write
      data = Cproton.pn_connection_driver_write_buffer(@impl)
      return unless data && data.size > 0
      n = @io.write_nonblock(data)
      Cproton.pn_connection_driver_write_done(@impl, n) if n > 0
    rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
      # Try again later.
    rescue IOError, SystemCallError => e
      close e
    end

    # Handle time-related work, for example idle-timeout events.
    # May generate events for {#event} and change {#can_read?}, {#can_write?}
    #
    # @param [Time] now the current time, defaults to {Time#now}.
    #
    # @return [Time] time of the next scheduled event, or nil if there are no
    # scheduled events. If non-nil you must call {#tick} again no later than
    # this time.
    def tick(now=Time.now)
      transport = Cproton.pni_connection_driver_transport(@impl)
      ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i)
      @next_tick = ms.zero? ? nil : Time.at(ms.to_r / 1000);
      unless @next_tick
        idle = Cproton.pn_transport_get_idle_timeout(transport);
        @next_tick = now + (idle.to_r / 1000) unless idle.zero?
      end
      @next_tick
    end

    # Time returned by the last call to {#tick}
    attr_accessor :next_tick

    # Disconnect the write side of the transport, *without* sending an AMQP
    # close frame. To close politely, you should use {Connection#close}, the
    # transport will close itself once the protocol close is complete.
    #
    def close_write error=nil
      set_error error
      Cproton.pn_connection_driver_write_close(@impl)
      @io.close_write rescue nil # Allow double-close
    end

    # Is the read side of the driver closed?
    def read_closed?() Cproton.pn_connection_driver_read_closed(@impl); end

    # Is the write side of the driver closed?
    def write_closed?() Cproton.pn_connection_driver_read_closed(@impl); end

    # Disconnect the read side of the transport, without waiting for an AMQP
    # close frame. See comments on {#close_write}
    def close_read error=nil
      set_error error
      Cproton.pn_connection_driver_read_close(@impl)
      @io.close_read rescue nil # Allow double-close
    end

    # Disconnect both sides of the transport sending/waiting for AMQP close
    # frames. See comments on {#close_write}
    def close error=nil
      close_write error
      close_read
    end

    private

    def set_error err
      transport.condition ||= Condition.convert(err, "proton:io") if err
    end
  end

  # A {ConnectionDriver} that feeds raw proton events to a handler.
  class HandlerDriver < ConnectionDriver
    # Combine an {IO} with a handler and provide
    # a simplified way to run the driver via {#process}
    #
    # @param io [IO]
    # @param handler [Handler::MessagingHandler] to receive raw events in  {#dispatch} and {#process}
    def initialize(io, handler)
      super(io)
      @handler = handler
      @adapter = Handler::Adapter.adapt(handler)
    end

    # @return [MessagingHandler] The handler dispatched to by {#process}
    attr_reader :handler

    # Dispatch all available raw proton events from {#event} to {#handler}
    def dispatch()
      each_event do |e|
        case e.method           # Events that affect the driver
        when :on_transport_tail_closed then close_read
        when :on_transport_head_closed then close_write
        when :on_transport_closed then @io.close rescue nil # Allow double-close
        end
        e.dispatch @adapter
      end
    end

    # Do {#read}, {#tick}, {#write} and {#dispatch} without blocking.
    # @param [Time] now the current time
    # @return [Time] Latest time to call {#process} again for scheduled events,
    #   or nil if there are no scheduled events
    def process(now=Time.now)
      read
      dispatch
      next_tick = tick(now)
      dispatch
      write
      dispatch
      return next_tick
    end
  end
end