File: receiver.rb

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (125 lines) | stat: -rw-r--r-- 4,371 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


module Qpid::Proton

  # The receiving endpoint.
  #
  # @see Sender
  #
  class Receiver < Link

    # @private
    PROTON_METHOD_PREFIX = "pn_link"
    # @private
    include Util::Wrapper

    # Open {Receiver} link
    #
    # @overload open_receiver(address)
    #   @param address [String] address of the source to receive from
    # @overload open_receiver(opts)
    #   @param opts [Hash] Receiver options, see {Receiver#open}
    #   @option opts [Integer] :credit_window automatically maintain this much credit
    #     for messages to be pre-fetched while the current message is processed.
    #   @option opts [Boolean] :auto_accept if true, deliveries that are not settled by
    #     the application in {MessagingHandler#on_message} are automatically accepted.
    #   @option opts [Boolean] :dynamic (false) dynamic property for source {Terminus#dynamic}
    #   @option opts [String,Hash] :source source address or source options, see {Terminus#apply}
    #   @option opts [String,Hash] :target target address or target options, see {Terminus#apply}
    #   @option opts [String] :name (generated) unique name for the link.
    def open(opts=nil)
      opts ||= {}
      opts = { :source => opts } if opts.is_a? String
      @credit_window =  opts.fetch(:credit_window, 10)
      @auto_accept = opts.fetch(:auto_accept, true)
      source.apply(opts[:source])
      target.apply(opts[:target])
      source.dynamic = !!opts[:dynamic]
      super()
      self
    end

    # @return [Integer] credit window, see {#open}
    attr_reader :credit_window

    # @return [Boolean] auto_accept flag, see {#open}
    attr_reader :auto_accept

    # @!attribute drain
    #
    # The drain mode.
    #
    # If a receiver is in drain mode, then the sending endpoint of a link must
    # immediately use up all available credit on the link. If this is not
    # possible, the excess credit must be returned by invoking #drained.
    #
    # Only the receiving endpoint can set the drain mode.
    #
    # @return [Boolean] True if drain mode is set.
    #
    proton_set_get :drain

    # @!attribute [r] draining?
    #
    # Returns if a link is currently draining.
    #
    # A link is defined to be draining when drain mode is set to true and
    # the sender still has excess credit.
    #
    # @return [Boolean] True if the receiver is currently draining.
    #
    proton_caller :draining?

    # Grants credit for incoming deliveries.
    #
    # @param n [Integer] The amount to increment the link credit.
    #
    def flow(n)
      Cproton.pn_link_flow(@impl, n)
    end

    # Allows receiving up to the specified limit of data from the remote
    # endpoint.
    #
    # Note that large messages can be streamed across the network, so just
    # because there is no data to read does not imply the message is complete.
    #
    # To ensure the entirety of the message data has been read, either call
    # #receive until nil is returned, or verify that #partial? is false and
    # Delivery#pending is 0.
    #
    # @param limit [Integer] The maximum bytes to receive.
    #
    # @return [Integer, nil] The number of bytes received, or nil if the end of
    # the stream was reached.
    #
    # @see Deliver#pending To see how much buffer space is needed.
    #
    # @raise [LinkError] If an error occurs.
    #
    def receive(limit)
      (n, bytes) = Cproton.pn_link_recv(@impl, limit)
      return nil if n == Qpid::Proton::Error::EOS
      raise LinkError.new("[#{n}]: #{Cproton.pn_link_error(@impl)}") if n < 0
      return bytes
    end

  end

end