File: protocol.py

package info (click to toggle)
python-aioxmpp 0.12.2-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 6,152 kB
  • sloc: python: 96,969; xml: 215; makefile: 155; sh: 72
file content (932 lines) | stat: -rw-r--r-- 31,550 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
########################################################################
# File name: protocol.py
# This file is part of: aioxmpp
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program.  If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
"""
:mod:`~aioxmpp.protocol` --- XML Stream implementation
######################################################

This module contains the :class:`XMLStream` class, which implements the XML
stream protocol used by XMPP. It makes extensive use of the :mod:`aioxmpp.xml`
module and the :mod:`aioxmpp.xso` subpackage to parse and serialize XSOs
received and sent on the stream.

In addition, helper functions to work with :class:`XMLStream` instances are
provided; these are not included in the class itself because they provide
additional functionality solely based on the public interface of the
class. Separating them helps with testing.

.. autoclass:: XMLStream

Utilities for XML streams
=========================

.. autofunction:: send_and_wait_for

.. autofunction:: reset_stream_and_get_features

Enumerations
============

.. autoclass:: Mode

.. autoclass:: State

"""

import asyncio
import contextlib
import functools
import logging

from enum import Enum

import xml.sax as sax
import xml.parsers.expat as pyexpat

from . import xml, errors, xso, nonza, stanza, callbacks, statemachine, utils
from .utils import namespaces

logger = logging.getLogger(__name__)


class Mode(Enum):
    """
    Possible modes of connection for an XML stream. These define the namespaces
    used.

    .. attribute:: C2S

       A client stream connected to a server. This is the default mode and,
       currently, the only available mode.

    """
    C2S = namespaces.client


@functools.total_ordering
class State(Enum):
    """
    The possible states of a :class:`XMLStream`:

    .. attribute:: READY

       The initial state; this is the case when no underlying transport is
       connected.

    .. attribute:: STREAM_HEADER_SENT

       After a :class:`asyncio.Transport` calls
       :meth:`XMLStream.connection_made` on the xml stream, it sends the stream
       header and enters this state.

    .. attribute:: OPEN

       When the stream header of the peer is received, this state is entered
       and the XML stream can be used for sending and receiving XSOs.

    .. attribute:: CLOSING

       After :meth:`XMLStream.close` is called, this state is entered. We sent
       a stream footer and an EOF, if the underlying transport supports
       this. We still have to wait for the peer to close the stream.

       In this state and all following states, :class:`ConnectionError`
       instances are raised whenever an attempt is made to write to the
       stream. The exact instance depends on the reason of the closure.

       In this state, the stream waits for the remote to send a stream footer
       and the connection to shut down. For application purposes, the stream is
       already closed.

    .. attribute:: CLOSING_STREAM_FOOTER_RECEIVED

       At this point, the stream is properly closed on the XML stream
       level. This is the point where :meth:`XMLStream.close_and_wait`
       returns.

    .. attribute:: CLOSED

       This state is entered when the connection is lost in any way. This is
       the final state.

    """

    def __lt__(self, other):
        return self.value < other.value

    READY = 0
    STREAM_HEADER_SENT = 1
    OPEN = 2
    CLOSING = 3
    CLOSING_STREAM_FOOTER_RECEIVED = 4
    CLOSED = 6


class DebugWrapper:
    def __init__(self, dest, logger):
        self.dest = dest
        self.logger = logger
        if hasattr(dest, "flush"):
            self._flush = dest.flush
        else:
            self._flush = lambda: None
        self._pieces = []
        self._total_len = 0
        self._muted = False
        self._written_mute_marker = False

    def _emit(self):
        self.logger.debug("SENT %r", b"".join(self._pieces))
        self._pieces = []
        self._total_len = 0

    def write(self, data):
        if self._muted:
            if not self._written_mute_marker:
                self._pieces.append(b"<!-- some bytes omitted -->")
                self._written_mute_marker = True
        else:
            self._pieces.append(data)
            self._total_len += len(data)
        result = self.dest.write(data)
        if self._total_len >= 4096:
            self._emit()
        return result

    def flush(self):
        self._emit()
        self._flush()

    @contextlib.contextmanager
    def mute(self):
        self._muted = True
        self._written_mute_marker = False
        try:
            yield
        finally:
            self._muted = False


class XMLStream(asyncio.Protocol):
    """
    XML stream implementation. This is an streaming :class:`asyncio.Protocol`
    which translates the received bytes into XSOs.

    :param to: Domain of the server the stream connects to.
    :type to: :class:`~aioxmpp.JID`
    :param features_future: Use :meth:`features_future` instead.
    :type features_future: :class:`asyncio.Future`
    :param sorted_attributes: Sort attributes deterministically on output
        (debug option; not part of the public interface)
    :type sorted_attributes: :class:`bool`
    :param base_logger: Parent logger for this stream
    :type base_logger: :class:`logging.Logger`

    `to` must identify the remote server to connect to. This is used as the
    ``to`` attribute on the stream header.

    `features_future` may be a future. The XML stream will set the first
    :class:`~aioxmpp.nonza.StreamFeatures` node it receives as the result of
    the future. The future will also receive any pre-stream-features
    exception.

    `sorted_attributes` is a testing/debugging option to enable sorted output
    of the XML attributes emitted on the stream. See
    :class:`~aioxmpp.xml.XMPPXMLGenerator` for details. Do not use outside of
    unit testing code, as it has a negative performance impact.

    `base_logger` may be a :class:`logging.Logger` instance to use. The XML
    stream will create a child called ``XMLStream`` at that logger and use that
    child for logging purposes. This eases debugging and allows for
    connection-specific loggers.

    .. deprecated:: 0.12

        Using `features_future` as positional or keyword argument is
        deprecated and will be removed in version 1.0. Use
        :meth:`features_future` to obtain a future instead.

    Receiving XSOs:

    .. attribute:: stanza_parser

       A :class:`~aioxmpp.xso.XSOParser` instance which is wired to a
       :class:`~aioxmpp.xml.XMPPXMLProcessor` which processes the received
       bytes.

       To receive XSOs over the XML stream, use :attr:`stanza_parser` and
       register class callbacks on it using
       :meth:`~aioxmpp.xso.XSOParser.add_class`.

    .. attribute:: error_handler

       This should be assigned a callable, taking two arguments: a
       :class:`xso.XSO` instance, which is the partial(!) top-level stream
       element and an exception indicating the failure.

       Partial here means that it is not guaranteed that anything but the
       attributes on the partial XSO itself are there. Any children or text
       payload is most likely missing, as it probably caused the error.

       .. versionadded:: 0.4

    Sending XSOs:

    .. automethod:: send_xso

    Manipulating stream state:

    .. automethod:: starttls

    .. automethod:: reset

    .. automethod:: close

    .. automethod:: abort

    Controlling debug output:

    .. automethod:: mute

    Waiting for stream state changes:

    .. automethod:: error_future

    .. automethod:: features_future

    Monitoring stream aliveness:

    .. autoattribute:: deadtime_soft_limit

    .. autoattribute:: deadtime_hard_limit

    Signals:

    .. signal:: on_closing(reason)

       A :class:`~aioxmpp.callbacks.Signal` which fires when the underlying
       transport of the stream reports an error or when a stream error is
       received. The signal is fired with the corresponding exception as the
       only argument.

       If the stream gets closed by the application without any error, the
       argument is :data:`None`.

       By the time the callback fires, the stream is already unusable for
       sending stanzas. It *may* however still receive stanzas, if the stream
       shutdown was initiated by the application and the peer has not yet send
       its stream footer.

       If the application is not able to handle these stanzas, it is legitimate
       to disconnect their handlers from the :attr:`stanza_parser`; the stream
       will be able to deal with unhandled top level stanzas correctly at this
       point (by ignoring them).

    .. signal:: on_deadtime_soft_limit_tripped

        Emits when the soft limit dead time has been exceeded.

        See :attr:`deadtime_soft_limit` for general information on the timeout
        handling.

        .. versionadded:: 0.10

    Timeouts:

    .. attribute:: shutdown_timeout

       The maximum time to wait for the peer ``</stream:stream>`` before
       forcing to close the transport and considering the stream closed.

    """

    on_closing = callbacks.Signal()
    on_deadtime_soft_limit_tripped = callbacks.Signal()

    shutdown_timeout = 15

    def __init__(self, to,
                 features_future=None,
                 sorted_attributes=False,
                 base_logger=logging.getLogger("aioxmpp"),
                 loop=None):
        self._to = to
        self._sorted_attributes = sorted_attributes
        self._logger = base_logger.getChild("XMLStream")
        self._transport = None
        self._exception = None
        self._loop = loop or asyncio.get_event_loop()
        self._features_futures = []
        self._error_futures = []
        if features_future is not None:
            self._features_futures.append(features_future)
            self._error_futures.append(features_future)
        self._smachine = statemachine.OrderedStateMachine(State.READY)
        self._transport_closing = False
        self._monitor = utils.AlivenessMonitor(self._loop)
        self._monitor.on_deadtime_hard_limit_tripped.connect(
            self._deadtime_hard_limit_triggered
        )
        self._monitor.on_deadtime_soft_limit_tripped.connect(
            self.on_deadtime_soft_limit_tripped
        )

        self._closing_future = asyncio.ensure_future(
            self._smachine.wait_for(
                State.CLOSING
            ),
            loop=loop
        )
        self._closing_future.add_done_callback(
            self._stream_starts_closing
        )

        self.stanza_parser = xso.XSOParser()
        self.stanza_parser.add_class(nonza.StreamError,
                                     self._rx_stream_error)
        self.stanza_parser.add_class(nonza.StreamFeatures,
                                     self._rx_stream_features)
        self.error_handler = None

    def _invalid_transition(self, to, via=None):
        text = "invalid state transition: from={} to={}".format(
            self._smachine.state,
            to)
        if via:
            text += " (via: {})".format(via)
        return RuntimeError(text)

    def _invalid_state(self, at=None):
        text = "invalid state: {}".format(self._smachine.state)
        if at:
            text += " (at: {})".format(at)
        return RuntimeError(text)

    def _close_transport(self):
        if self._transport_closing:
            return
        self._transport_closing = True
        self._transport.close()

    def _stream_starts_closing(self, task):
        exc = self._exception
        if exc is None:
            exc = ConnectionError("stream shut down")

        self.on_closing(self._exception)
        for fut in self._error_futures:
            if not fut.done():
                fut.set_exception(exc)
        self._error_futures.clear()

        if task.cancelled():
            return
        if task.exception() is not None:
            return
        task.result()

    def _fail(self, err):
        self._exception = err
        self.close()

    def _require_connection(self, accept_partial=False):
        if (self._smachine.state == State.OPEN or
                (accept_partial and
                 self._smachine.state == State.STREAM_HEADER_SENT)):
            return

        if self._exception:
            raise self._exception

        raise ConnectionError("xmlstream not connected")

    def _rx_exception(self, exc):
        if isinstance(exc, stanza.StanzaError):
            if self.error_handler:
                self.error_handler(exc.partial_obj, exc)
        elif isinstance(exc, xso.UnknownTopLevelTag):
            if self._smachine.state >= State.CLOSING:
                self._logger.info("ignoring unknown top-level tag, "
                                  "we’re closing")
                return

            raise errors.StreamError(
                condition=errors.StreamErrorCondition.UNSUPPORTED_STANZA_TYPE,
                text="unsupported stanza: {}".format(
                    xso.tag_to_str((exc.ev_args[0], exc.ev_args[1]))
                )) from None
        else:
            context = exc.__context__ or exc.__cause__
            raise exc from context

    def _rx_stream_header(self):
        if self._processor.remote_version != (1, 0):
            raise errors.StreamError(
                errors.StreamErrorCondition.UNSUPPORTED_VERSION,
                text="unsupported version"
            )
        self._smachine.state = State.OPEN

    def _rx_stream_error(self, err):
        self._fail(err.to_exception())

    def _rx_stream_footer(self):
        if self._smachine.state < State.CLOSING:
            # any other state, this is an issue
            if self._exception is None:
                self._fail(ConnectionError("stream closed by peer"))
            self.close()
        elif self._smachine.state >= State.CLOSING_STREAM_FOOTER_RECEIVED:
            self._logger.info("late stream footer received")
            return

        self._close_transport()
        self._smachine.state = State.CLOSING_STREAM_FOOTER_RECEIVED

    def _rx_stream_features(self, features):
        for fut in self._features_futures:
            if fut.done():
                continue
            fut.set_result(features)
            try:
                self._error_futures.remove(fut)
            except ValueError:
                pass
        self._features_futures.clear()

    def _rx_feed(self, blob):
        try:
            self._parser.feed(blob)
        except sax.SAXParseException as exc:
            if (exc.getException().args[0].startswith(
                    pyexpat.errors.XML_ERROR_UNDEFINED_ENTITY)):
                # this will raise an appropriate stream error
                xml.XMPPLexicalHandler.startEntity("foo")
            raise errors.StreamError(
                condition=errors.StreamErrorCondition.BAD_FORMAT,
                text=str(exc)
            )
        except errors.StreamError:
            raise
        except Exception:
            self._logger.exception(
                "unexpected exception while parsing stanza"
                " bubbled up through parser. stream so ded.")
            raise errors.StreamError(
                condition=errors.StreamErrorCondition.INTERNAL_SERVER_ERROR,
                text="Internal error while parsing XML. Client logs have more"
                     " details."
            )

    def _deadtime_hard_limit_triggered(self):
        self._logger.debug("dead time hard limit exceeded")
        # pretend full shut-down handshake has happened
        if self._smachine.state != State.CLOSED:
            self._smachine.state = State.CLOSING_STREAM_FOOTER_RECEIVED
        self._transport_closing = True
        if self._transport is not None:
            self._transport.abort()
        self._exception = self._exception or ConnectionError(
            "connection timeout (dead time hard limit exceeded)"
        )

    def connection_made(self, transport):
        if self._smachine.state != State.READY:
            raise self._invalid_state("connection_made")

        assert self._transport is None
        self._transport = transport
        self._writer = None
        self._exception = None
        # we need to set the state before we call reset()
        self._smachine.state = State.STREAM_HEADER_SENT
        self.reset()

    def connection_lost(self, exc):
        # in connection_lost, we really cannot do anything except shutting down
        # the stream without sending any more data
        if self._smachine.state == State.CLOSED:
            return
        self._smachine.state = State.CLOSED
        self._exception = self._exception or exc
        self._kill_state()
        self._writer = None
        self._transport = None
        self._monitor.deadtime_hard_limit = None
        self._monitor.deadtime_soft_limit = None
        self._closing_future.cancel()

    def data_received(self, blob):
        self._logger.debug("RECV %r", blob)
        self._monitor.notify_received()
        try:
            self._rx_feed(blob)
        except errors.StreamError as exc:
            stanza_obj = nonza.StreamError.from_exception(exc)
            if not self._writer.closed:
                self._writer.send(stanza_obj)
            self._fail(exc)
            # shutdown, we do not really care about </stream:stream> by the
            # server at this point
            self._close_transport()

    def eof_received(self):
        if self._smachine.state == State.OPEN:
            # close and set to EOF received
            self.close()
            # common actions below
        elif (self._smachine.state == State.CLOSING or
              self._smachine.state == State.CLOSING_STREAM_FOOTER_RECEIVED):
            # these states are fine, common actions below
            pass
        else:
            self._logger.warn("unexpected eof_received (in %s state)",
                              self._smachine.state)
            # common actions below

        self._smachine.state = State.CLOSING_STREAM_FOOTER_RECEIVED
        self._close_transport()

    def close(self):
        """
        Close the XML stream and the underlying transport.

        This gracefully shuts down the XML stream and the transport, if
        possible by writing the eof using :meth:`asyncio.Transport.write_eof`
        after sending the stream footer.

        After a call to :meth:`close`, no other stream manipulating or sending
        method can be called; doing so will result in a
        :class:`ConnectionError` exception or any exception caused by the
        transport during shutdown.

        Calling :meth:`close` while the stream is closing or closed is a
        no-op.
        """
        if (self._smachine.state == State.CLOSING or
                self._smachine.state == State.CLOSED):
            return
        self._writer.close()
        if self._transport.can_write_eof():
            self._transport.write_eof()
        if self._smachine.state == State.STREAM_HEADER_SENT:
            # at this point, we cannot wait for the peer to send
            # </stream:stream>
            self._close_transport()
        self._smachine.state = State.CLOSING

    async def close_and_wait(self):
        """
        Close the XML stream and the underlying transport and wait for for the
        XML stream to be properly terminated.

        The underlying transport may still be open when this coroutine returns,
        but closing has already been initiated.

        The other remarks about :meth:`close` hold.
        """
        self.close()
        await self._smachine.wait_for_at_least(
            State.CLOSING_STREAM_FOOTER_RECEIVED
        )

    def _kill_state(self):
        if self._writer:
            self._writer.abort()

        self._processor = None
        self._parser = None

    def _reset_state(self):
        self._kill_state()

        self._processor = xml.XMPPXMLProcessor()
        self._processor.stanza_parser = self.stanza_parser
        self._processor.on_stream_header = self._rx_stream_header
        self._processor.on_stream_footer = self._rx_stream_footer
        self._processor.on_exception = self._rx_exception
        self._parser = xml.make_parser()
        self._parser.setContentHandler(self._processor)
        self._debug_wrapper = None

        if self._logger.getEffectiveLevel() <= logging.DEBUG:
            dest = DebugWrapper(self._transport, self._logger)
            self._debug_wrapper = dest
        else:
            dest = self._transport
        self._writer = xml.XMLStreamWriter(
            dest,
            self._to,
            nsmap={None: "jabber:client"},
            sorted_attributes=self._sorted_attributes)

    def reset(self):
        """
        Reset the stream by discarding all state and re-sending the stream
        header.

        Calling :meth:`reset` when the stream is disconnected or currently
        disconnecting results in either :class:`ConnectionError` being raised
        or the exception which caused the stream to die (possibly a received
        stream error or a transport error) to be reraised.

        :meth:`reset` puts the stream into :attr:`~State.STREAM_HEADER_SENT`
        state and it cannot be used for sending XSOs until the peer stream
        header has been received. Usually, this is not a problem as stream
        resets only occur during stream negotiation and stream negotiation
        typically waits for the peers feature node to arrive first.
        """
        self._require_connection(accept_partial=True)
        self._reset_state()
        self._writer.start()
        self._smachine.rewind(State.STREAM_HEADER_SENT)

    def abort(self):
        """
        Abort the stream by writing an EOF if possible and closing the
        transport.

        The transport is closed using :meth:`asyncio.BaseTransport.close`, so
        buffered data is sent, but no more data will be received. The stream is
        in :attr:`State.CLOSED` state afterwards.

        This also works if the stream is currently closing, that is, waiting
        for the peer to send a stream footer. In that case, the stream will be
        closed locally as if the stream footer had been received.

        .. versionadded:: 0.5
        """
        if self._smachine.state == State.CLOSED:
            return
        if self._smachine.state == State.READY:
            self._smachine.state = State.CLOSED
            return
        if (self._smachine.state != State.CLOSING and
                self._transport.can_write_eof()):
            self._transport.write_eof()
        self._close_transport()

    def send_xso(self, obj):
        """
        Send an XSO over the stream.

        :param obj: The object to send.
        :type obj: :class:`~.XSO`
        :raises ConnectionError: if the connection is not fully established
                                 yet.
        :raises aioxmpp.errors.StreamError: if a stream error was received or
                                            sent.
        :raises OSError: if the stream got disconnected due to a another
                         permanent transport error
        :raises Exception: if serialisation of `obj` failed

        Calling :meth:`send_xso` while the stream is disconnected,
        disconnecting or still waiting for the remote to send a stream header
        causes :class:`ConnectionError` to be raised. If the stream got
        disconnected due to a transport or stream error, that exception is
        re-raised instead of the :class:`ConnectionError`.

        .. versionchanged:: 0.9

           Exceptions occuring during serialisation of `obj` are re-raised and
           *no* content is sent over the stream. The stream is still valid and
           usable afterwards.

        """
        self._require_connection()
        self._writer.send(obj)

    def can_starttls(self):
        """
        Return true if the transport supports STARTTLS and false otherwise.

        If the stream is currently not connected, this returns false.
        """
        return (hasattr(self._transport, "can_starttls") and
                self._transport.can_starttls())

    async def starttls(self, ssl_context, post_handshake_callback=None):
        """
        Start TLS on the transport and wait for it to complete.

        The `ssl_context` and `post_handshake_callback` arguments are forwarded
        to the transports
        :meth:`aioopenssl.STARTTLSTransport.starttls` coroutine method.

        If the transport does not support starttls, :class:`RuntimeError` is
        raised; support for starttls can be discovered by querying
        :meth:`can_starttls`.

        After :meth:`starttls` returns, you must call :meth:`reset`. Any other
        method may fail in interesting ways as the internal state is discarded
        when starttls succeeds, for security reasons. :meth:`reset` re-creates
        the internal structures.
        """
        self._require_connection()
        if not self.can_starttls():
            raise RuntimeError("starttls not available on transport")

        await self._transport.starttls(ssl_context, post_handshake_callback)
        self._reset_state()

    def error_future(self):
        """
        Return a future which will receive the next XML stream error as
        exception.

        It is safe to cancel the future at any time.
        """
        fut = asyncio.Future(loop=self._loop)
        self._error_futures.append(fut)
        return fut

    def features_future(self):
        """
        Return a future which will recieve the next XML stream features (as
        return value) or the next XML stream error (as exception), whichever
        happens first.

        It is safe to cancel this future at any time.
        """
        fut = self.error_future()
        self._features_futures.append(fut)
        return fut

    @property
    def transport(self):
        """
        The underlying :class:`asyncio.Transport` instance. This attribute is
        :data:`None` if the :class:`XMLStream` is currently not connected.

        This attribute cannot be set.
        """
        return self._transport

    @property
    def state(self):
        """
        The current :class:`State` of the XML stream.

        This attribute cannot be set.
        """
        return self._smachine.state

    @contextlib.contextmanager
    def mute(self):
        """
        A context-manager which prohibits logging of data sent over the stream.

        Data sent over the stream is replaced with
        ``<!-- some bytes omitted -->``. This is mainly useful during
        authentication.
        """
        if self._debug_wrapper is None:
            yield
        else:
            with self._debug_wrapper.mute():
                yield

    @property
    def deadtime_soft_limit(self):
        """
        This is part of the timeout handling of :class:`XMLStream` objects. The
        timeout handling works like this:

        * There exist two timers, *soft* and *hard* limit.
        * Reception of *any* data resets both timers.
        * When the *soft* limit timer is triggered, the
          :meth:`on_deadtime_soft_limit_tripped` signal is emitted. Nothing
          else happens. The user is expected to do something which would cause
          the server to send data to prevent the *hard* limit from tripping.
        * When the *hard* limit timer is triggered, the stream is considered
          dead and it is aborted and closed with an appropriate
          :class:`ConnectionError`.

        This attribute controls the timeout for the *soft* limit timer, as
        :class:`datetime.timedelta`. The default is :data:`None`, which
        disables the timer altogether.

        .. versionadded:: 0.10
        """
        return self._monitor.deadtime_soft_limit

    @deadtime_soft_limit.setter
    def deadtime_soft_limit(self, value):
        self._monitor.deadtime_soft_limit = value

    @property
    def deadtime_hard_limit(self):
        """
        This is part of the timeout handling of :class:`XMLStream` objects.
        See :attr:`deadtime_soft_limit` for details.

        This attribute controls the timeout for the *hard* limit timer, as
        :class:`datetime.timedelta`. The default is :data:`None`, which
        disables the timer altogether.

        Setting the *hard* limit timer to :data:`None` means that the
        :class:`XMLStream` will never timeout by itself.

        .. versionadded:: 0.10
        """
        return self._monitor.deadtime_hard_limit

    @deadtime_hard_limit.setter
    def deadtime_hard_limit(self, value):
        self._monitor.deadtime_hard_limit = value


async def send_and_wait_for(xmlstream, send, wait_for,
                            timeout=None,
                            cb=None):
    fut = asyncio.Future()
    wait_for = list(wait_for)

    def receive(obj):
        nonlocal fut, stack
        if cb is not None:
            cb(obj)
        fut.set_result(obj)
        stack.close()

    failure_future = xmlstream.error_future()

    with contextlib.ExitStack() as stack:
        for anticipated_cls in wait_for:
            xmlstream.stanza_parser.add_class(
                anticipated_cls,
                receive)
            stack.callback(
                xmlstream.stanza_parser.remove_class,
                anticipated_cls,
            )

        for to_send in send:
            xmlstream.send_xso(to_send)

        done, pending = await asyncio.wait(
            [
                fut,
                failure_future,
            ],
            timeout=timeout,
            return_when=asyncio.FIRST_COMPLETED,
            loop=xmlstream._loop)

        for other_fut in pending:
            other_fut.cancel()

        if fut in done:
            return fut.result()

        if failure_future in done:
            failure_future.result()
        else:
            failure_future.cancel()

        raise TimeoutError()


async def reset_stream_and_get_features(xmlstream, timeout=None):
    xmlstream.reset()
    fut = xmlstream.features_future()
    if timeout is not None:
        try:
            result = await asyncio.wait_for(fut, timeout=timeout)
        except asyncio.TimeoutError:
            raise TimeoutError from None
    else:
        result = await xmlstream.features_future()
    return result


def send_stream_error_and_close(
        xmlstream,
        condition,
        text,
        custom_condition=None):
    xmlstream.send_xso(nonza.StreamError(
        condition=condition,
        text=text))
    if custom_condition is not None:
        logger.warn("custom_condition argument to send_stream_error_and_close"
                    " not implemented")
    xmlstream.close()