File: connection.py

package info (click to toggle)
python-pyngus 2.2.2-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 580 kB
  • sloc: python: 5,436; sh: 25; makefile: 23
file content (888 lines) | stat: -rw-r--r-- 35,641 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
#    Licensed to the Apache Software Foundation (ASF) under one
#    or more contributor license agreements.  See the NOTICE file
#    distributed with this work for additional information
#    regarding copyright ownership.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

__all__ = [
    "ConnectionEventHandler",
    "Connection"
]

import heapq
import logging
import proton
import warnings
import ssl

from pyngus.endpoint import Endpoint
from pyngus.link import _Link
from pyngus.link import _SessionProxy

LOG = logging.getLogger(__name__)

_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
                   int(getattr(proton, "VERSION_MINOR", 0)))


class _CallbackLock(object):
    """A utility class for detecting when a callback invokes a non-reentrant
    Pyngus method.
    """
    def __init__(self):
        super(_CallbackLock, self).__init__()
        self.in_callback = 0

    def __enter__(self):
        self.in_callback += 1
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.in_callback -= 1
        # if a call is made to a non-reentrant method while this context is
        # held, then the method will raise a RuntimeError().  Return false to
        # propagate the exception to the caller
        return False


class ConnectionEventHandler(object):
    """An implementation of an AMQP 1.0 Connection."""
    def connection_active(self, connection):
        """Connection handshake has completed."""
        LOG.debug("connection_active (ignored)")

    def connection_failed(self, connection, error):
        """Connection's transport has failed in some way."""
        LOG.warn("connection_failed, error=%s (ignored)", str(error))

    def connection_remote_closed(self, connection, pn_condition):
        """Peer has closed its end of the connection."""
        LOG.debug("connection_remote_closed (ignored)")

    def connection_closed(self, connection):
        """The connection has cleanly closed."""
        LOG.debug("connection_closed (ignored)")

    def sender_requested(self, connection, link_handle,
                         name, requested_source,
                         properties):
        """Peer has requested a SenderLink be created."""
        # call accept_sender to accept new link,
        # reject_sender to reject it.
        LOG.debug("sender_requested (ignored)")

    def receiver_requested(self, connection, link_handle,
                           name, requested_target,
                           properties):
        """Peer has requested a ReceiverLink be created."""
        # call accept_receiver to accept new link,
        # reject_receiver to reject it.
        LOG.debug("receiver_requested (ignored)")

    # No longer supported by proton >= 0.10, so this method is deprecated
    def sasl_step(self, connection, pn_sasl):
        """DEPRECATED"""
        LOG.debug("sasl_step (ignored)")

    def sasl_done(self, connection, pn_sasl, result):
        """SASL exchange complete."""
        LOG.debug("sasl_done (ignored)")


class Connection(Endpoint):
    """A Connection to a peer."""
    EOS = -1   # indicates 'I/O stream closed'

    # set of all SASL connection configuration properties
    _SASL_PROPS = set(['x-username', 'x-password', 'x-require-auth',
                       'x-sasl-mechs', 'x-sasl-config-dir',
                       'x-sasl-config-name', 'x-force-sasl'])

    # set of all SSL connection configuration properties
    _SSL_PROPS = set(['x-ssl', 'x-ssl-identity', 'x-ssl-ca-file',
                      'x-ssl-verify-mode', 'x-ssl-server',
                      'x-ssl-peer-name', 'x-ssl-allow-cleartext'])

    # SSL peer certificate verification
    _VERIFY_MODES = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
                     'verify-cert': proton.SSLDomain.VERIFY_PEER,
                     'no-verify': proton.SSLDomain.ANONYMOUS_PEER}

    def _not_reentrant(func):
        """Decorator that prevents callbacks from calling into methods that are
        not reentrant
        """
        def wrap(self, *args, **kws):
            if self._callback_lock and self._callback_lock.in_callback:
                m = "Connection %s cannot be invoked from a callback!" % func
                raise RuntimeError(m)
            return func(self, *args, **kws)
        return wrap

    def __init__(self, container, name, event_handler=None, properties=None):
        """Create a new connection from the Container

        properties: map, properties of the new connection. The following keys
        and values are supported:

        idle-time-out: float, time in seconds before an idle link will be
        closed.

        hostname: string, the name of the host to which this connection is
        being made, sent in the Open frame.

        max-frame-size: int, maximum acceptable frame size in bytes.

        properties: map, proton connection properties sent to the peer.

        The following custom connection properties are supported:

        x-server: boolean, set this to True to configure the connection as a
        server side connection.  This should be set True if the connection was
        remotely initiated (e.g. accept on a listening socket).  If the
        connection was locally initiated (e.g. by calling connect()), then this
        value should be set to False.  This setting is used by authentication
        and encryption to configure the connection's role.  The default value
        is False for client mode.

        x-username: string, the client's username to use when authenticating
        with a server.

        x-password: string, the client's password, used for authentication.

        x-require-auth: boolean, reject remotely-initiated client connections
        that fail to provide valid credentials for authentication.

        x-sasl-mechs: string, a space-separated list of mechanisms
        that are allowed for authentication.  Defaults to "ANONYMOUS"

        x-sasl-config-dir: string, path to the directory containing the Cyrus
        SASL server configuration.

        x-sasl-config-name: string, name of the Cyrus SASL configuration file
        contained in the x-sasl-config-dir (without the '.conf' suffix)

        x-force-sasl: by default SASL authentication is disabled.  SASL will be
        enabled if any of the above x-sasl-* options are set. For clients using
        GSSAPI it is likely none of these options will be set.  In order for
        these clients to authenticate this flag must be set true.  The value of
        this property is ignored if any of the other SASL related properties
        are set.

        x-ssl: boolean, Allows clients to connect using SSL setting a minimum
        viable configuration (using the system's CA bundle to validate the
        peer's certificate). This setting is overwritten if subsequent SSL
        settings are found.

        x-ssl-identity: tuple, contains identifying certificate information
        which will be presented to the peer.  The first item in the tuple is
        the path to the certificate file (PEM format).  The second item is the
        path to a file containing the private key used to sign the certificate
        (PEM format, optional if private key is stored in the certificate
        itself). The last item is the password used to encrypt the private key
        (string, not required if private key is not encrypted)

        x-ssl-ca-file: string, path to a file containing the certificates of
        the trusted Certificate Authorities that will be used to check the
        signature of the peer's certificate.  Not used if x-ssl-verify-mode
        is set to 'no-verify'.  To use the system's default CAs instead leave
        this option out and set x-ssl to True.

        x-ssl-verify-mode: string, configure the level of security provided by
        SSL.  Possible values:
            "verify-peer" (default) - most secure, requires peer to supply a
            certificate signed by a valid CA (see x-ssl-ca-file), and check
            the CN or SAN entry in the certificate against the expected
            peer hostname (see hostname and x-ssl-peer-name properties)
            "verify-cert" (default if no x-ssl-peer-name given) - like
            verify-peer, but skips the check of the peer hostname.
             Vulnerable to man-in-the-middle attack.
            "no-verify" - do not require the peer to provide a certificate.
            Results in a weaker encryption stream, and other vulnerabilities.

        x-ssl-peer-name: string, DNS name of peer.  Override the hostname used
        to authenticate peer's certificate (see x-ssl-verify-mode).  The value
        of the 'hostname' property is used if this property is not supplied.

        x-ssl-allow-cleartext: boolean, Allows clients to connect without using
        SSL (eg, plain TCP). Used by a server that will accept clients
        requesting either trusted or untrusted connections.

        x-trace-protocol: boolean, if true, dump sent and received frames to
        stdout.
        """
        super(Connection, self).__init__(name)
        self._transport_bound = False
        self._container = container
        self._handler = event_handler
        self._properties = properties or {}
        old_flag = self._properties.get('x-ssl-server', False)
        self._server = self._properties.get('x-server', old_flag)

        self._pn_connection = proton.Connection()
        self._pn_connection.container = container.name
        if (_PROTON_VERSION < (0, 9)):
            self._pn_transport = proton.Transport()
        else:
            if self._server:
                mode = proton.Transport.SERVER
            else:
                mode = proton.Transport.CLIENT
            self._pn_transport = proton.Transport(mode)
        self._pn_collector = proton.Collector()
        self._pn_connection.collect(self._pn_collector)

        if 'hostname' in self._properties:
            self._pn_connection.hostname = self._properties['hostname']
        secs = self._properties.get("idle-time-out")
        if secs:
            self._pn_transport.idle_timeout = secs
        max_frame = self._properties.get("max-frame-size")
        if max_frame:
            self._pn_transport.max_frame_size = max_frame
        if 'properties' in self._properties:
            self._pn_connection.properties = self._properties["properties"]
        if self._properties.get("x-trace-protocol"):
            self._pn_transport.trace(proton.Transport.TRACE_FRM)

        # indexed by link-name
        self._sender_links = {}    # SenderLink
        self._receiver_links = {}  # ReceiverLink

        self._timers = {}  # indexed by expiration date
        self._timers_heap = []  # sorted by expiration date

        self._read_done = False
        self._write_done = False
        self._error = None
        self._next_deadline = 0
        self._user_context = None
        self._remote_session_id = 0
        self._callback_lock = _CallbackLock()

        self._pn_sasl = None
        self._sasl_done = False

        # if x-force-sasl is false remove it so it does not trigger the SASL
        # configuration logic below
        if not self._properties.get('x-force-sasl', True):
            del self._properties['x-force-sasl']

        if self._SASL_PROPS.intersection(set(self._properties.keys())):
            # SASL config specified, need to enable SASL
            if (_PROTON_VERSION < (0, 10)):
                # best effort map of 0.10 sasl config to pre-0.10 sasl
                if self._server:
                    self.pn_sasl.server()
                    if 'x-require-auth' in self._properties:
                        if not self._properties['x-require-auth']:
                            if _PROTON_VERSION >= (0, 8):
                                self.pn_sasl.allow_skip()
                else:
                    if 'x-username' in self._properties:
                        self.pn_sasl.plain(self._properties['x-username'],
                                           self._properties.get('x-password',
                                                                ''))
                    else:
                        self.pn_sasl.client()
                mechs = self._properties.get('x-sasl-mechs')
                if mechs:
                    self.pn_sasl.mechanisms(mechs)
            else:
                # new Proton SASL configuration:
                # maintain old behavior: allow PLAIN and ANONYMOUS
                # authentication.  Override this using x-sasl-mechs below:
                self.pn_sasl.allow_insecure_mechs = True
                if 'x-require-auth' in self._properties:
                    ra = self._properties['x-require-auth']
                    self._pn_transport.require_auth(ra)
                if 'x-username' in self._properties:
                    self._pn_connection.user = self._properties['x-username']
                if 'x-password' in self._properties:
                    self._pn_connection.password = \
                        self._properties['x-password']
                if 'x-sasl-mechs' in self._properties:
                    mechs = self._properties['x-sasl-mechs'].upper()
                    self.pn_sasl.allowed_mechs(mechs)
                    if 'PLAIN' not in mechs and 'ANONYMOUS' not in mechs:
                        self.pn_sasl.allow_insecure_mechs = False
                if 'x-sasl-config-dir' in self._properties:
                    self.pn_sasl.config_path(
                        self._properties['x-sasl-config-dir'])
                if 'x-sasl-config-name' in self._properties:
                    self.pn_sasl.config_name(
                        self._properties['x-sasl-config-name'])

        # intercept any SSL failures and cleanup resources before propagating
        # the exception:
        try:
            self._pn_ssl = self._configure_ssl(properties)
        except Exception:
            self.destroy()
            raise

    @property
    def container(self):
        return self._container

    @property
    # TODO(kgiusti) - hopefully remove
    def pn_transport(self):
        return self._pn_transport

    @property
    # TODO(kgiusti) - hopefully remove
    def pn_connection(self):
        return self._pn_connection

    @property
    def name(self):
        return self._name

    @property
    def remote_container(self):
        """Return the name of the remote container. Should be present once the
        connection is active.
        """
        return self._pn_connection.remote_container

    @property
    def remote_hostname(self):
        """Return the hostname advertised by the remote, if present."""
        if self._pn_connection:
            return self._pn_connection.remote_hostname
        return None

    @property
    def remote_properties(self):
        """Properties provided by the peer."""
        if self._pn_connection:
            return self._pn_connection.remote_properties
        return None

    @property
    def pn_sasl(self):
        if not self._pn_sasl:
            self._pn_sasl = self._pn_transport.sasl()
        return self._pn_sasl

    def pn_ssl(self):
        """Return the Proton SSL context for this Connection."""
        return self._pn_ssl

    def _get_user_context(self):
        return self._user_context

    def _set_user_context(self, ctxt):
        self._user_context = ctxt

    _uc_docstr = """Associate an arbitrary user object with this Connection."""
    user_context = property(_get_user_context, _set_user_context,
                            doc=_uc_docstr)

    def open(self):
        if not self._transport_bound:
            self._pn_transport.bind(self._pn_connection)
            self._transport_bound = True
        if self._pn_connection.state & proton.Endpoint.LOCAL_UNINIT:
            self._pn_connection.open()

    def close(self, pn_condition=None):
        for link in list(self._sender_links.values()):
            link.close(pn_condition)
        for link in list(self._receiver_links.values()):
            link.close(pn_condition)
        if pn_condition:
            self._pn_connection.condition = pn_condition
        if self._pn_connection.state & proton.Endpoint.LOCAL_ACTIVE:
            self._pn_connection.close()

    @property
    def active(self):
        """Return True if both ends of the Connection are open."""
        return self._endpoint_state == self._ACTIVE

    @property
    def closed(self):
        """Return True if the Connection has finished closing."""
        return (self._write_done and self._read_done)

    @_not_reentrant
    def destroy(self):
        # if a connection is destroyed without flushing pending output,
        # the remote will see an unclean shutdown (framing error)
        if self.has_output > 0:
            LOG.debug("Connection with buffered output destroyed")
        self._error = "Destroyed by the application"
        self._handler = None
        self._properties = None
        tmp = self._sender_links.copy()
        for l in tmp.values():
            l.destroy()
        assert(len(self._sender_links) == 0)
        tmp = self._receiver_links.copy()
        for l in tmp.values():
            l.destroy()
        assert(len(self._receiver_links) == 0)
        self._timers.clear()
        self._timers_heap = None
        self._container.remove_connection(self._name)
        self._container = None
        self._user_context = None
        self._callback_lock = None
        if self._transport_bound:
            self._pn_transport.unbind()
        self._pn_transport = None
        self._pn_connection.free()
        self._pn_connection = None
        if _PROTON_VERSION < (0, 8):
            # memory leak: drain the collector before releasing it
            while self._pn_collector.peek():
                self._pn_collector.pop()
        self._pn_collector = None
        self._pn_sasl = None
        self._pn_ssl = None

    _CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED)
    _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE)

    @_not_reentrant
    def process(self, now):
        """Perform connection state processing."""
        if self._pn_connection is None:
            LOG.error("Connection.process() called on destroyed connection!")
            return 0

        # do nothing until the connection has been opened
        if self._pn_connection.state & proton.Endpoint.LOCAL_UNINIT:
            return 0

        if self._pn_sasl and not self._sasl_done:
            # wait until SASL has authenticated
            if (_PROTON_VERSION < (0, 10)):
                if self._pn_sasl.state not in (proton.SASL.STATE_PASS,
                                               proton.SASL.STATE_FAIL):
                    LOG.debug("SASL in progress. State=%s",
                              str(self._pn_sasl.state))
                    if self._handler:
                        with self._callback_lock:
                            self._handler.sasl_step(self, self._pn_sasl)
                    return self._next_deadline

                self._sasl_done = True
                if self._handler:
                    with self._callback_lock:
                        self._handler.sasl_done(self, self._pn_sasl,
                                                self._pn_sasl.outcome)
            else:
                if self._pn_sasl.outcome is not None:
                    self._sasl_done = True
                    if self._handler:
                        with self._callback_lock:
                            self._handler.sasl_done(self, self._pn_sasl,
                                                    self._pn_sasl.outcome)

        # process timer events:
        timer_deadline = self._expire_timers(now)
        transport_deadline = self._pn_transport.tick(now)
        if timer_deadline and transport_deadline:
            self._next_deadline = min(timer_deadline, transport_deadline)
        else:
            self._next_deadline = timer_deadline or transport_deadline

        # process events from proton:
        pn_event = self._pn_collector.peek()
        while pn_event:
            # LOG.debug("pn_event: %s received", pn_event.type)
            if _Link._handle_proton_event(pn_event, self):
                pass
            elif self._handle_proton_event(pn_event):
                pass
            elif _SessionProxy._handle_proton_event(pn_event, self):
                pass
            self._pn_collector.pop()
            pn_event = self._pn_collector.peek()

        # check for connection failure after processing all pending
        # engine events:
        if self._error:
            if self._handler:
                # nag application until connection is destroyed
                self._next_deadline = now
                with self._callback_lock:
                    self._handler.connection_failed(self, self._error)
        elif (self._endpoint_state == self._CLOSED and
              self._read_done and self._write_done):
            # invoke closed callback after endpoint has fully closed and
            # all pending I/O has completed:
            if self._handler:
                with self._callback_lock:
                    self._handler.connection_closed(self)

        return self._next_deadline

    @property
    def next_tick(self):
        text = "next_tick deprecated, use deadline instead"
        warnings.warn(DeprecationWarning(text))
        return self.deadline

    @property
    def deadline(self):
        """Must invoke process() on or before this timestamp."""
        return self._next_deadline

    @property
    def needs_input(self):
        if self._read_done:
            LOG.debug("needs_input EOS")
            return self.EOS
        try:
            capacity = self._pn_transport.capacity()
        except Exception as e:
            self._read_done = True
            self._connection_failed(str(e))
            return self.EOS
        if capacity >= 0:
            return capacity
        LOG.debug("needs_input read done")
        self._read_done = True
        return self.EOS

    def process_input(self, in_data):
        c = min(self.needs_input, len(in_data))
        if c <= 0:
            return c
        try:
            rc = self._pn_transport.push(in_data[:c])
        except Exception as e:
            self._read_done = True
            self._connection_failed(str(e))
            return self.EOS
        if rc:  # error?
            LOG.debug("process_input read done")
            self._read_done = True
            return self.EOS
        # hack: check if this was the last input needed by the connection.
        # If so, this will set the _read_done flag and the 'connection closed'
        # callback can be issued on the next call to process()
        self.needs_input
        return c

    def close_input(self, reason=None):
        if not self._read_done:
            try:
                self._pn_transport.close_tail()
            except Exception as e:
                self._connection_failed(str(e))
            LOG.debug("close_input read done")
            self._read_done = True

    @property
    def has_output(self):
        if self._write_done:
            LOG.debug("has output EOS")
            return self.EOS
        try:
            pending = self._pn_transport.pending()
        except Exception as e:
            self._write_done = True
            self._connection_failed(str(e))
            return self.EOS
        if pending >= 0:
            return pending
        LOG.debug("has output write_done")
        self._write_done = True
        return self.EOS

    def output_data(self):
        """Get a buffer of data that needs to be written to the network.
        """
        c = self.has_output
        if c <= 0:
            return None
        try:
            buf = self._pn_transport.peek(c)
        except Exception as e:
            self._connection_failed(str(e))
            return None
        return buf

    def output_written(self, count):
        try:
            self._pn_transport.pop(count)
        except Exception as e:
            self._write_done = True
            self._connection_failed(str(e))
        # hack: check if this was the last output from the connection.  If so,
        # this will set the _write_done flag and the 'connection closed'
        # callback can be issued on the next call to process()
        self.has_output

    def close_output(self, reason=None):
        if not self._write_done:
            try:
                self._pn_transport.close_head()
            except Exception as e:
                self._connection_failed(str(e))
            LOG.debug("close output write done")
            self._write_done = True

    def create_sender(self, source_address, target_address=None,
                      event_handler=None, name=None, properties=None):
        """Factory method for Sender links."""
        ident = name or str(source_address)
        if ident in self._sender_links:
            raise KeyError("Sender %s already exists!" % ident)

        session = _SessionProxy("session-%s" % ident, self)
        session.open()
        sl = session.new_sender(ident)
        sl.configure(target_address, source_address, event_handler, properties)
        self._sender_links[ident] = sl
        return sl

    def accept_sender(self, link_handle, source_override=None,
                      event_handler=None, properties=None):
        link = self._sender_links.get(link_handle)
        if not link:
            raise Exception("Invalid link_handle: %s" % link_handle)
        pn_link = link._pn_link
        if pn_link.remote_source.dynamic and not source_override:
            raise Exception("A source address must be supplied!")
        source_addr = source_override or pn_link.remote_source.address
        link.configure(pn_link.remote_target.address,
                       source_addr,
                       event_handler, properties)
        return link

    def reject_sender(self, link_handle, pn_condition=None):
        """Rejects the SenderLink, and destroys the handle."""
        link = self._sender_links.get(link_handle)
        if not link:
            raise Exception("Invalid link_handle: %s" % link_handle)
        link.reject(pn_condition)
        # note: normally, link.destroy() cannot be called from a callback,
        # but this link was never made available to the application so this
        # link is only referenced by the connection
        link.destroy()

    def create_receiver(self, target_address, source_address=None,
                        event_handler=None, name=None, properties=None):
        """Factory method for creating Receive links."""
        ident = name or str(target_address)
        if ident in self._receiver_links:
            raise KeyError("Receiver %s already exists!" % ident)

        session = _SessionProxy("session-%s" % ident, self)
        session.open()
        rl = session.new_receiver(ident)
        rl.configure(target_address, source_address, event_handler, properties)
        self._receiver_links[ident] = rl
        return rl

    def accept_receiver(self, link_handle, target_override=None,
                        event_handler=None, properties=None):
        link = self._receiver_links.get(link_handle)
        if not link:
            raise Exception("Invalid link_handle: %s" % link_handle)
        pn_link = link._pn_link
        if pn_link.remote_target.dynamic and not target_override:
            raise Exception("A target address must be supplied!")
        target_addr = target_override or pn_link.remote_target.address
        link.configure(target_addr,
                       pn_link.remote_source.address,
                       event_handler, properties)
        return link

    def reject_receiver(self, link_handle, pn_condition=None):
        link = self._receiver_links.get(link_handle)
        if not link:
            raise Exception("Invalid link_handle: %s" % link_handle)
        link.reject(pn_condition)
        # note: normally, link.destroy() cannot be called from a callback,
        # but this link was never made available to the application so this
        # link is only referenced by the connection
        link.destroy()

    @property
    def _endpoint_state(self):
        return self._pn_connection.state

    def _remove_sender(self, name):
        if name in self._sender_links:
            del self._sender_links[name]

    def _remove_receiver(self, name):
        if name in self._receiver_links:
            del self._receiver_links[name]

    def _connection_failed(self, error="Error not specified!"):
        """Clean up after connection failure detected."""
        if not self._error:
            LOG.error("Connection failed: %s", str(error))
            self._error = error

    def _configure_ssl(self, properties):
        if (not properties or
                not self._SSL_PROPS.intersection(set(iter(properties)))):
            return None

        mode = proton.SSLDomain.MODE_CLIENT
        if properties.get('x-ssl-server', properties.get('x-server')):
            mode = proton.SSLDomain.MODE_SERVER

        identity = properties.get('x-ssl-identity')
        ca_file = properties.get('x-ssl-ca-file')
        if (not ca_file and properties.get('x-ssl') and
                hasattr(ssl, 'get_default_verify_paths')):
            ca_file = ssl.get_default_verify_paths().cafile
        hostname = properties.get('x-ssl-peer-name',
                                  properties.get('hostname'))
        # default to most secure level of certificate validation
        if not ca_file:
            vdefault = 'no-verify'
        elif not hostname:
            vdefault = 'verify-cert'
        else:
            vdefault = 'verify-peer'

        vmode = properties.get('x-ssl-verify-mode', vdefault)
        try:
            vmode = self._VERIFY_MODES[vmode]
        except KeyError:
            raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" %
                                      vmode)
        if vmode == proton.SSLDomain.VERIFY_PEER_NAME:
            if not hostname or not ca_file:
                raise proton.SSLException("verify-peer needs x-ssl-peer-name"
                                          " and x-ssl-ca-file")
        elif vmode == proton.SSLDomain.VERIFY_PEER:
            if not ca_file:
                raise proton.SSLException("verify-cert needs x-ssl-ca-file")

        # This will throw proton.SSLUnavailable if SSL support is not installed
        domain = proton.SSLDomain(mode)
        if identity:
            # our identity:
            domain.set_credentials(identity[0], identity[1], identity[2])
        if ca_file:
            # how we verify peers:
            domain.set_trusted_ca_db(ca_file)
        domain.set_peer_authentication(vmode, ca_file)
        if mode == proton.SSLDomain.MODE_SERVER:
            if properties.get('x-ssl-allow-cleartext'):
                domain.allow_unsecured_client()
        pn_ssl = proton.SSL(self._pn_transport, domain)
        if hostname:
            pn_ssl.peer_hostname = hostname
        LOG.debug("SSL configured for connection %s", self._name)
        return pn_ssl

    def _add_timer(self, deadline, callback):
        callbacks = self._timers.get(deadline)
        if callbacks is None:
            callbacks = set()
            self._timers[deadline] = callbacks
            heapq.heappush(self._timers_heap, deadline)
            if deadline < self._next_deadline:
                self._next_deadline = deadline
        callbacks.add(callback)

    def _cancel_timer(self, deadline, callback):
        callbacks = self._timers.get(deadline)
        if callbacks:
            callbacks.discard(callback)
        # next expire will discard empty deadlines

    def _expire_timers(self, now):
        while (self._timers_heap and
               self._timers_heap[0] <= now):
            deadline = heapq.heappop(self._timers_heap)
            callbacks = self._timers.get(deadline)
            while callbacks:
                callbacks.pop()()
            del self._timers[deadline]

        return self._timers_heap[0] if self._timers_heap else 0

    # Proton's event model was changed after 0.7
    if (_PROTON_VERSION >= (0, 8)):
        _endpoint_event_map = {
            proton.Event.CONNECTION_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
            proton.Event.CONNECTION_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
            proton.Event.CONNECTION_LOCAL_OPEN: Endpoint.LOCAL_OPENED,
            proton.Event.CONNECTION_LOCAL_CLOSE: Endpoint.LOCAL_CLOSED}

        def _handle_proton_event(self, pn_event):
            ep_event = Connection._endpoint_event_map.get(pn_event.type)
            if ep_event is not None:
                self._process_endpoint_event(ep_event)
            elif pn_event.type == proton.Event.CONNECTION_INIT:
                LOG.debug("Connection created: %s", pn_event.context)
            elif pn_event.type == proton.Event.CONNECTION_FINAL:
                LOG.debug("Connection finalized: %s", pn_event.context)
            elif pn_event.type == proton.Event.TRANSPORT_ERROR:
                self._connection_failed(str(self._pn_transport.condition))
            else:
                return False  # unknown
            return True  # handled
    elif hasattr(proton.Event, "CONNECTION_LOCAL_STATE"):
        # 0.7 proton event model
        def _handle_proton_event(self, pn_event):
            if pn_event.type == proton.Event.CONNECTION_LOCAL_STATE:
                self._process_local_state()
            elif pn_event.type == proton.Event.CONNECTION_REMOTE_STATE:
                self._process_remote_state()
            else:
                return False  # unknown
            return True  # handled
    else:
        raise Exception("The installed version of Proton is not supported.")

    # endpoint state machine actions:

    def _ep_active(self):
        """Both ends of the Endpoint have become active."""
        LOG.debug("Connection is up")
        if self._handler:
            with self._callback_lock:
                self._handler.connection_active(self)

    def _ep_need_close(self):
        """The remote has closed its end of the endpoint."""
        LOG.debug("Connection remotely closed")
        if self._handler:
            cond = self._pn_connection.remote_condition
            with self._callback_lock:
                self._handler.connection_remote_closed(self, cond)

    def _ep_error(self, error):
        """The endpoint state machine failed due to protocol error."""
        super(Connection, self)._ep_error(error)
        self._connection_failed("Protocol error occurred.")

    # order by name

    def __lt__(self, other):
        return self.name < other.name

    def __le__(self, other):
        return self < other or self.name == other.name

    def __gt__(self, other):
        return self.name > other.name

    def __ge__(self, other):
        return self > other or self.name == other.name