File: _delivery.py

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (442 lines) | stat: -rw-r--r-- 14,981 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from cproton import PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, pn_delivery_abort, \
    pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link, pn_delivery_local, pn_delivery_local_state, \
    pn_delivery_partial, pn_delivery_pending, pn_delivery_readable, pn_delivery_remote, pn_delivery_remote_state, \
    pn_delivery_settle, pn_delivery_settled, pn_delivery_tag, pn_delivery_update, pn_delivery_updated, \
    pn_delivery_writable, pn_disposition_annotations, pn_disposition_condition, pn_disposition_data, \
    pn_disposition_get_section_number, pn_disposition_get_section_offset, pn_disposition_is_failed, \
    pn_disposition_is_undeliverable, pn_disposition_set_failed, pn_disposition_set_section_number, \
    pn_disposition_set_section_offset, pn_disposition_set_undeliverable, pn_disposition_type, pn_work_next

from ._condition import cond2obj, obj2cond
from ._data import dat2obj, obj2dat
from ._wrapper import Wrapper

from typing import Dict, List, Optional, Type, Union, TYPE_CHECKING, Any

if TYPE_CHECKING:
    from ._condition import Condition
    from ._data import PythonAMQPData, symbol
    from ._endpoints import Receiver, Sender  # circular import
    from ._reactor import Connection, Session, Transport


class NamedInt(int):
    values: Dict[int, 'DispositionType'] = {}

    def __new__(cls: Type['DispositionType'], i: int, name: str) -> 'DispositionType':
        ni = super(NamedInt, cls).__new__(cls, i)
        cls.values[i] = ni
        return ni

    def __init__(self, i: int, name: str) -> None:
        self.name = name

    def __repr__(self) -> str:
        return self.name

    def __str__(self) -> str:
        return self.name

    @classmethod
    def get(cls, i: int) -> Union[int, 'DispositionType']:
        return cls.values.get(i, i)


class DispositionType(NamedInt):
    values = {}


class Disposition(object):
    """
    A delivery state.

    Dispositions record the current state or final outcome of a
    transfer. Every delivery contains both a local and remote
    disposition. The local disposition holds the local state of the
    delivery, and the remote disposition holds the last known remote
    state of the delivery.
    """

    RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
    """
    A non terminal state indicating how much (if any) message data
    has been received for a delivery.
    """

    ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
    """
    A terminal state indicating that the delivery was successfully
    processed. Once in this state there will be no further state
    changes prior to the delivery being settled.
    """

    REJECTED = DispositionType(PN_REJECTED, "REJECTED")
    """
    A terminal state indicating that the delivery could not be
    processed due to some error condition. Once in this state
    there will be no further state changes prior to the delivery
    being settled.
    """

    RELEASED = DispositionType(PN_RELEASED, "RELEASED")
    """
    A terminal state indicating that the delivery is being
    returned to the sender. Once in this state there will be no
    further state changes prior to the delivery being settled.
    """

    MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
    """
    A terminal state indicating that the delivery is being
    returned to the sender and should be annotated by the
    sender prior to further delivery attempts. Once in this
    state there will be no further state changes prior to the
    delivery being settled.
    """

    def __init__(self, impl, local):
        self._impl = impl
        self.local = local
        self._data = None
        self._condition = None
        self._annotations = None

    @property
    def type(self) -> Union[int, DispositionType]:
        """
        Get the type of this disposition object.

        Defined values are:

        * :const:`RECEIVED`
        * :const:`ACCEPTED`
        * :const:`REJECTED`
        * :const:`RELEASED`
        * :const:`MODIFIED`
        """
        return DispositionType.get(pn_disposition_type(self._impl))

    @property
    def section_number(self) -> int:
        """The section number associated with a disposition."""
        return pn_disposition_get_section_number(self._impl)

    @section_number.setter
    def section_number(self, n: int) -> None:
        pn_disposition_set_section_number(self._impl, n)

    @property
    def section_offset(self) -> int:
        """The section offset associated with a disposition."""
        return pn_disposition_get_section_offset(self._impl)

    @section_offset.setter
    def section_offset(self, n: int) -> None:
        pn_disposition_set_section_offset(self._impl, n)

    @property
    def failed(self) -> bool:
        """The failed flag for this disposition."""
        return pn_disposition_is_failed(self._impl)

    @failed.setter
    def failed(self, b: bool) -> None:
        pn_disposition_set_failed(self._impl, b)

    @property
    def undeliverable(self) -> bool:
        """The undeliverable flag for this disposition."""
        return pn_disposition_is_undeliverable(self._impl)

    @undeliverable.setter
    def undeliverable(self, b: bool) -> None:
        pn_disposition_set_undeliverable(self._impl, b)

    @property
    def data(self) -> Optional[List[int]]:
        """Access the disposition as a :class:`Data` object.

        Dispositions are an extension point in the AMQP protocol. The
        disposition interface provides setters/getters for those
        dispositions that are predefined by the specification, however
        access to the raw disposition data is provided so that other
        dispositions can be used.

        The :class:`Data` object returned by this operation is valid until
        the parent delivery is settled.
        """
        if self.local:
            return self._data
        else:
            return dat2obj(pn_disposition_data(self._impl))

    @data.setter
    def data(self, obj: List[int]) -> None:
        if self.local:
            self._data = obj
        else:
            raise AttributeError("data attribute is read-only")

    @property
    def annotations(self) -> Optional[Dict['symbol', 'PythonAMQPData']]:
        """The annotations associated with a disposition.

        The :class:`Data` object retrieved by this operation may be modified
        prior to updating a delivery. When a delivery is updated, the
        annotations described by the :class:`Data` are reported to the peer
        if applicable to the current delivery state, e.g. states such as
        :const:`MODIFIED`. The :class:`Data` must be empty or contain a symbol
        keyed map.

        The :class:`Data` object returned by this operation is valid until
        the parent delivery is settled.
        """
        if self.local:
            return self._annotations
        else:
            return dat2obj(pn_disposition_annotations(self._impl))

    @annotations.setter
    def annotations(self, obj: Dict[str, 'PythonAMQPData']) -> None:
        if self.local:
            self._annotations = obj
        else:
            raise AttributeError("annotations attribute is read-only")

    @property
    def condition(self) -> Optional['Condition']:
        """The condition object associated with a disposition.

        The :class:`Condition` object retrieved by this operation may be
        modified prior to updating a delivery. When a delivery is updated,
        the condition described by the disposition is reported to the peer
        if applicable to the current delivery state, e.g. states such as
        :const:`REJECTED`.
        """
        if self.local:
            return self._condition
        else:
            return cond2obj(pn_disposition_condition(self._impl))

    @condition.setter
    def condition(self, obj: 'Condition') -> None:
        if self.local:
            self._condition = obj
        else:
            raise AttributeError("condition attribute is read-only")


class Delivery(Wrapper):
    """
    Tracks and/or records the delivery of a message over a link.
    """

    RECEIVED = Disposition.RECEIVED
    """
    A non terminal state indicating how much (if any) message data
    has been received for a delivery.
    """

    ACCEPTED = Disposition.ACCEPTED
    """
    A terminal state indicating that the delivery was successfully
    processed. Once in this state there will be no further state
    changes prior to the delivery being settled.
    """

    REJECTED = Disposition.REJECTED
    """
    A terminal state indicating that the delivery could not be
    processed due to some error condition. Once in this state
    there will be no further state changes prior to the delivery
    being settled.
    """

    RELEASED = Disposition.RELEASED
    """
    A terminal state indicating that the delivery is being
    returned to the sender. Once in this state there will be no
    further state changes prior to the delivery being settled.
    """

    MODIFIED = Disposition.MODIFIED
    """
    A terminal state indicating that the delivery is being
    returned to the sender and should be annotated by the
    sender prior to further delivery attempts. Once in this
    state there will be no further state changes prior to the
    delivery being settled.
    """

    @staticmethod
    def wrap(impl):
        if impl is None:
            return None
        else:
            return Delivery(impl)

    def __init__(self, impl):
        Wrapper.__init__(self, impl, pn_delivery_attachments)

    def _init(self) -> None:
        self.local = Disposition(pn_delivery_local(self._impl), True)
        self.remote = Disposition(pn_delivery_remote(self._impl), False)

    @property
    def tag(self) -> str:
        """
        The identifier for the delivery.
        """
        return pn_delivery_tag(self._impl)

    @property
    def writable(self) -> bool:
        """
        ``True`` for an outgoing delivery to which data can now be written,
        ``False`` otherwise..
        """
        return pn_delivery_writable(self._impl)

    @property
    def readable(self) -> bool:
        """
        ``True`` for an incoming delivery that has data to read,
        ``False`` otherwise..
        """
        return pn_delivery_readable(self._impl)

    @property
    def updated(self) -> bool:
        """
        ``True`` if the state of the delivery has been updated
        (e.g. it has been settled and/or accepted, rejected etc),
        ``False`` otherwise.
        """
        return pn_delivery_updated(self._impl)

    def update(self, state: Union[int, DispositionType]) -> None:
        """
        Set the local state of the delivery e.g. :const:`ACCEPTED`,
        :const:`REJECTED`, :const:`RELEASED`.

        :param state: State of delivery
        """
        obj2dat(self.local._data, pn_disposition_data(self.local._impl))
        obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
        obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
        pn_delivery_update(self._impl, state)

    @property
    def pending(self) -> int:
        """
        The amount of pending message data for a delivery.
        """
        return pn_delivery_pending(self._impl)

    @property
    def partial(self) -> bool:
        """
        ``True`` for an incoming delivery if not all the data is
        yet available, ``False`` otherwise.
        """
        return pn_delivery_partial(self._impl)

    @property
    def local_state(self) -> DispositionType:
        """A local state of the delivery."""
        return DispositionType.get(pn_delivery_local_state(self._impl))

    @property
    def remote_state(self) -> Union[int, DispositionType]:
        """A remote state of the delivery as indicated by the remote peer."""
        return DispositionType.get(pn_delivery_remote_state(self._impl))

    @property
    def settled(self) -> bool:
        """
        ``True`` if the delivery has been settled by the remote peer,
        ``False`` otherwise.
        """
        return pn_delivery_settled(self._impl)

    def settle(self) -> None:
        """
        Settles the delivery locally. This indicates the application
        considers the delivery complete and does not wish to receive any
        further events about it. Every delivery should be settled locally.
        """
        pn_delivery_settle(self._impl)

    @property
    def aborted(self) -> bool:
        """
        ``True`` if the delivery has been aborted, ``False`` otherwise.
        """
        return pn_delivery_aborted(self._impl)

    def abort(self) -> None:
        """
        Aborts the delivery.  This indicates the application wishes to
        invalidate any data that may have already been sent on this delivery.
        The delivery cannot be aborted after it has been completely delivered.
        """
        pn_delivery_abort(self._impl)

    @property
    def work_next(self) -> Optional['Delivery']:
        """Deprecated: use on_message(), on_accepted(), on_rejected(),
        on_released(), and on_settled() instead.

        The next :class:`Delivery` on the connection that has pending
        operations.
        """
        return Delivery.wrap(pn_work_next(self._impl))

    @property
    def link(self) -> Union['Receiver', 'Sender']:
        """
        The :class:`Link` on which the delivery was sent or received.
        """
        from . import _endpoints
        return _endpoints.Link.wrap(pn_delivery_link(self._impl))

    @property
    def session(self) -> 'Session':
        """
        The :class:`Session` over which the delivery was sent or received.
        """
        return self.link.session

    @property
    def connection(self) -> 'Connection':
        """
        The :class:`Connection` over which the delivery was sent or received.
        """
        return self.session.connection

    @property
    def transport(self) -> 'Transport':
        """
        The :class:`Transport` bound to the :class:`Connection` over which
        the delivery was sent or received.
        """
        return self.connection.transport