#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import absolute_import

import os
import gc
from time import time, sleep
from proton import *
from proton.reactor import Container
from . import common
from .common import pump, Skipped


# older versions of gc do not provide the garbage list
if not hasattr(gc, "garbage"):
    gc.garbage = []

# future test areas
#  + different permutations of setup
#   - creating deliveries and calling input/output before opening the session/link
#  + shrinking output_size down to something small? should the engine buffer?
#  + resuming
#    - locally and remotely created deliveries with the same tag


OUTPUT_SIZE = 10 * 1024


class Test(common.Test):

    def __init__(self, *args):
        common.Test.__init__(self, *args)
        self._wires = []

    def connection(self):
        c1 = Connection()
        c2 = Connection()
        t1 = Transport()
        t1.bind(c1)
        t2 = Transport()
        t2.bind(c2)
        self._wires.append((c1, t1, c2, t2))

        mask1 = 0
        mask2 = 0

        for cat in ("TRACE_FRM", "TRACE_RAW"):
            trc = os.environ.get("PN_%s" % cat)
            if trc and trc.lower() in ("1", "2", "yes", "true"):
                mask1 = mask1 | getattr(Transport, cat)
            if trc == "2":
                mask2 = mask2 | getattr(Transport, cat)
        t1.trace(mask1)
        t2.trace(mask2)

        return c1, c2

    def link(self, name, max_frame=None, idle_timeout=None):
        c1, c2 = self.connection()
        if max_frame:
            c1.transport.max_frame_size = max_frame[0]
            c2.transport.max_frame_size = max_frame[1]
        if idle_timeout:
            # idle_timeout in seconds expressed as float
            c1.transport.idle_timeout = idle_timeout[0]
            c2.transport.idle_timeout = idle_timeout[1]
        c1.open()
        c2.open()
        ssn1 = c1.session()
        ssn1.open()
        self.pump()
        ssn2 = c2.session_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
        ssn2.open()
        self.pump()
        snd = ssn1.sender(name)
        rcv = ssn2.receiver(name)
        return snd, rcv

    def cleanup(self):
        self._wires = []

    def pump(self, buffer_size=OUTPUT_SIZE):
        for c1, t1, c2, t2 in self._wires:
            pump(t1, t2, buffer_size)


class ConnectionTest(Test):

    def setUp(self):
        gc.enable()
        self.c1, self.c2 = self.connection()

    def cleanup(self):
        # release resources created by this class
        super(ConnectionTest, self).cleanup()
        self.c1 = None
        self.c2 = None

    def tearDown(self):
        self.cleanup()
        gc.collect()
        assert not gc.garbage

    def test_open_close(self):
        assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
        assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        self.c1.open()
        self.pump()

        assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
        assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE

        self.c2.open()
        self.pump()

        assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.c1.close()
        self.pump()

        assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

        self.c2.close()
        self.pump()

        assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

    def test_simultaneous_open_close(self):
        assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
        assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        self.c1.open()
        self.c2.open()

        self.pump()

        assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.c1.close()
        self.c2.close()

        self.pump()

        assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

    def test_capabilities_array(self):
        self.c1.offered_capabilities = Array(UNDESCRIBED, Data.SYMBOL,
                                             symbol("O_one"),
                                             symbol("O_two"),
                                             symbol("O_three"))

        self.c1.desired_capabilities = Array(UNDESCRIBED, Data.SYMBOL,
                                             symbol("D_one"),
                                             symbol("D_two"),
                                             symbol("D_three"))
        self.c1.open()

        assert self.c2.remote_offered_capabilities is None
        assert self.c2.remote_desired_capabilities is None

        self.pump()

        assert self.c2.remote_offered_capabilities == self.c1.offered_capabilities, \
            (self.c2.remote_offered_capabilities, self.c1.offered_capabilities)
        assert self.c2.remote_desired_capabilities == self.c1.desired_capabilities, \
            (self.c2.remote_desired_capabilities, self.c1.desired_capabilities)

    def test_capabilities_symbol_list(self):
        self.c1.offered_capabilities = SymbolList(['O_one', 'O_two', symbol('O_three')])
        self.c1.desired_capabilities = SymbolList([symbol('D_one'), 'D_two', 'D_three'])
        self.c1.open()

        assert self.c2.remote_offered_capabilities is None
        assert self.c2.remote_desired_capabilities is None

        self.pump()

        assert self.c2.remote_offered_capabilities == self.c1.offered_capabilities, \
            (self.c2.remote_offered_capabilities, self.c1.offered_capabilities)
        assert self.c2.remote_desired_capabilities == self.c1.desired_capabilities, \
            (self.c2.remote_desired_capabilities, self.c1.desired_capabilities)

    def test_condition(self):
        self.c1.open()
        self.c2.open()
        self.pump()
        assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
        self.c1.condition = cond
        self.c1.close()

        self.pump()

        assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

        rcond = self.c2.remote_condition
        assert rcond == cond, (rcond, cond)

    def test_properties(self, p1=PropertyDict(key=symbol("value")), p2=None):
        self.c1.properties = p1
        self.c2.properties = p2
        self.c1.open()
        self.c2.open()
        self.pump()

        assert self.c2.remote_properties == p1, (self.c2.remote_properties, p1)
        assert self.c1.remote_properties == p2, (self.c1.remote_properties, p2)

    # The proton implementation limits channel_max to 32767.
    # If I set the application's limit lower than that, I should
    # get my wish.  If I set it higher -- not.
    def test_channel_max_low(self, value=1234):
        self.c1.transport.channel_max = value
        self.c1.open()
        self.pump()
        assert self.c1.transport.channel_max == value, (self.c1.transport.channel_max, value)

    def test_channel_max_high(self, value=65535):
        self.c1.transport.channel_max = value
        self.c1.open()
        self.pump()
        assert self.c1.transport.channel_max == 32767, (self.c1.transport.channel_max, value)

    def test_channel_max_raise_and_lower(self):
        upper_limit = 32767

        # It's OK to lower the max below upper_limit.
        self.c1.transport.channel_max = 12345
        assert self.c1.transport.channel_max == 12345

        # But it won't let us raise the limit above PN_IMPL_CHANNEL_MAX.
        self.c1.transport.channel_max = 65535
        assert self.c1.transport.channel_max == upper_limit

        # send the OPEN frame
        self.c1.open()
        self.pump()

        # Now it's too late to make any change, because
        # we have already sent the OPEN frame.
        try:
            self.c1.transport.channel_max = 666
            assert False, "expected session exception"
        except:
            pass

        assert self.c1.transport.channel_max == upper_limit

    # TODO: Currently failing test - PROTON-1759 - skip

    def test_channel_max_limits_sessions(self):
        raise Skipped('Test fails - PROTON-1759')
        # This is an index -- so max number of channels should be 1.
        self.c1.transport.channel_max = 0
        self.c1.open()
        self.c2.open()
        ssn_0 = self.c2.session()
        assert ssn_0 is not None
        ssn_0.open()
        self.pump()
        try:
            ssn_1 = self.c2.session()
            ssn_1.open()
            self.pump()
            assert False, "expected session exception"
        except SessionException:
            pass

    def test_cleanup(self):
        self.c1.open()
        self.c2.open()
        self.pump()
        assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        t1 = self.c1.transport
        t2 = self.c2.transport
        c2 = self.c2
        self.c1.close()
        # release all references to C1, except that held by the transport
        self.cleanup()
        gc.collect()
        # transport should flush last state from C1:
        pump(t1, t2)
        assert c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

    def test_user_config(self):
        self.c1.user = "vindaloo"
        self.c1.password = "secret"
        self.c1.open()
        self.pump()

        self.c2.user = "leela"
        self.c2.password = "trustno1"
        self.c2.open()
        self.pump()

        assert self.c1.user == "vindaloo", self.c1.user
        assert self.c1.password is None, self.c1.password
        assert self.c2.user == "leela", self.c2.user
        assert self.c2.password is None, self.c2.password


class SessionTest(Test):

    def setUp(self):
        gc.enable()
        self.c1, self.c2 = self.connection()
        self.ssn = self.c1.session()
        self.c1.open()
        self.c2.open()

    def cleanup(self):
        # release resources created by this class
        super(SessionTest, self).cleanup()
        self.c1 = None
        self.c2 = None
        self.ssn = None

    def tearDown(self):
        self.cleanup()
        gc.collect()
        assert not gc.garbage

    def test_open_close(self):
        assert self.ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        self.ssn.open()

        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT

        self.pump()

        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT

        ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)

        assert ssn is not None
        assert ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE
        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT

        ssn.open()

        assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT

        self.pump()

        assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        ssn.close()

        assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.pump()

        assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

        self.ssn.close()

        assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

        self.pump()

        assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

    def test_simultaneous_close(self):
        self.ssn.open()
        self.pump()
        ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
        assert ssn is not None
        ssn.open()
        self.pump()

        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.ssn.close()
        ssn.close()

        assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE

        self.pump()

        assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

    def test_closing_connection(self):
        self.ssn.open()
        self.pump()
        self.c1.close()
        self.pump()
        self.ssn.close()
        self.pump()

    def test_condition(self):
        self.ssn.open()
        self.pump()
        ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT)
        assert ssn is not None
        ssn.open()
        self.pump()

        assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
        self.ssn.condition = cond
        self.ssn.close()

        self.pump()

        assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

        rcond = ssn.remote_condition
        assert rcond == cond, (rcond, cond)

    def test_cleanup(self):
        snd, rcv = self.link("test-link")
        snd.open()
        rcv.open()
        self.pump()
        snd_ssn = snd.session
        rcv_ssn = rcv.session
        assert rcv_ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        self.ssn = None
        snd_ssn.close()
        snd_ssn.free()
        del snd_ssn
        gc.collect()
        self.pump()
        assert rcv_ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

    def test_reopen_on_same_session_without_free(self):
        """
        confirm that a link is correctly opened when attaching to a previously
        closed link *that has not been freed yet* on the same session
        """
        self.ssn.open()
        self.pump()

        ssn2 = self.c2.session_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
        ssn2.open()
        self.pump()
        snd = self.ssn.sender("test-link")
        rcv = ssn2.receiver("test-link")

        assert snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
        assert rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        snd.open()
        rcv.open()
        self.pump()

        assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        snd.close()
        rcv.close()
        self.pump()

        assert snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

        snd = self.ssn.sender("test-link")
        rcv = ssn2.receiver("test-link")
        assert snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
        assert rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        snd.open()
        rcv.open()
        self.pump()

        assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

    def test_set_get_outgoing_window(self):
        assert self.ssn.outgoing_window == 2147483647

        self.ssn.outgoing_window = 1024
        assert self.ssn.outgoing_window == 1024


class LinkTest(Test):

    def setUp(self):
        gc.enable()
        self.snd, self.rcv = self.link("test-link")

    def cleanup(self):
        # release resources created by this class
        super(LinkTest, self).cleanup()
        self.snd = None
        self.rcv = None

    def tearDown(self):
        self.cleanup()
        gc.collect()
        assert not gc.garbage, gc.garbage

    def test_open_close(self):
        assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
        assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        self.snd.open()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
        assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
        assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE

        self.rcv.open()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.snd.close()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

        self.rcv.close()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

    def test_simultaneous_open_close(self):
        assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT
        assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT

        self.snd.open()
        self.rcv.open()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.snd.close()
        self.rcv.close()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED
        assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED

    def test_multiple(self):
        rcv = self.snd.session.receiver("second-rcv")
        assert rcv.name == "second-rcv"
        self.snd.open()
        rcv.open()
        self.pump()
        c2 = self.rcv.session.connection
        l = c2.link_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
        while l:
            l.open()
            l = l.next(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE)
        self.pump()

        assert self.snd
        assert rcv
        self.snd.close()
        rcv.close()
        ssn = rcv.session
        conn = ssn.connection
        ssn.close()
        conn.close()
        self.pump()

    def test_closing_session(self):
        self.snd.open()
        self.rcv.open()
        ssn1 = self.snd.session
        self.pump()
        ssn1.close()
        self.pump()
        self.snd.close()
        self.pump()

    def test_closing_connection(self):
        self.snd.open()
        self.rcv.open()
        ssn1 = self.snd.session
        c1 = ssn1.connection
        self.pump()
        c1.close()
        self.pump()
        self.snd.close()
        self.pump()

    def assertEqualTermini(self, t1, t2):
        assert t1.type == t2.type, (t1.type, t2.type)
        assert t1.address == t2.address, (t1.address, t2.address)
        assert t1.durability == t2.durability, (t1.durability, t2.durability)
        assert t1.expiry_policy == t2.expiry_policy, (t1.expiry_policy, t2.expiry_policy)
        assert t1.timeout == t2.timeout, (t1.timeout, t2.timeout)
        assert t1.dynamic == t2.dynamic, (t1.dynamic, t2.dynamic)
        for attr in ["properties", "capabilities", "outcomes", "filter"]:
            d1 = getattr(t1, attr)
            d2 = getattr(t2, attr)
            assert d1.format() == d2.format(), (attr, d1.format(), d2.format())

    def _test_source_target(self, config_source, config_target):
        if config_source is None:
            self.snd.source.type = Terminus.UNSPECIFIED
        else:
            config_source(self.snd.source)
        if config_target is None:
            self.snd.target.type = Terminus.UNSPECIFIED
        else:
            config_target(self.snd.target)
        self.snd.open()
        self.pump()
        self.assertEqualTermini(self.rcv.remote_source, self.snd.source)
        self.assertEqualTermini(self.rcv.remote_target, self.snd.target)
        self.rcv.target.copy(self.rcv.remote_target)
        self.rcv.source.copy(self.rcv.remote_source)
        self.rcv.open()
        self.pump()
        self.assertEqualTermini(self.snd.remote_target, self.snd.target)
        self.assertEqualTermini(self.snd.remote_source, self.snd.source)

    def test_source_target(self):
        self._test_source_target(TerminusConfig(address="source"),
                                 TerminusConfig(address="target"))

    def test_source(self):
        self._test_source_target(TerminusConfig(address="source"), None)

    def test_target(self):
        self._test_source_target(None, TerminusConfig(address="target"))

    def test_coordinator(self):
        caps = Array(UNDESCRIBED, Data.SYMBOL, symbol("amqp:local-transactions"))
        self._test_source_target(None, TerminusConfig(type=Terminus.COORDINATOR,
                                                      capabilities=caps))

    def test_source_target_full(self):
        self._test_source_target(TerminusConfig(address="source",
                                                timeout=3,
                                                dist_mode=Terminus.DIST_MODE_MOVE,
                                                filter=[("int", 1), ("symbol", "two"), ("string", "three")],
                                                capabilities=["one", "two", "three"]),
                                 TerminusConfig(address="source",
                                                timeout=7,
                                                capabilities=None))

    def test_distribution_mode(self):
        self._test_source_target(TerminusConfig(address="source",
                                                dist_mode=Terminus.DIST_MODE_COPY),
                                 TerminusConfig(address="target"))
        assert self.rcv.remote_source.distribution_mode == Terminus.DIST_MODE_COPY
        assert self.rcv.remote_target.distribution_mode == Terminus.DIST_MODE_UNSPECIFIED

    def test_dynamic_link(self):
        self._test_source_target(TerminusConfig(address=None, dynamic=True), None)
        assert self.rcv.remote_source.dynamic
        assert self.rcv.remote_source.address is None

    def test_condition(self):
        self.snd.open()
        self.rcv.open()
        self.pump()

        assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"})
        self.snd.condition = cond
        self.snd.close()

        self.pump()

        assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE
        assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED

        rcond = self.rcv.remote_condition
        assert rcond == cond, (rcond, cond)

    def test_settle_mode(self):
        self.snd.snd_settle_mode = Link.SND_UNSETTLED
        assert self.snd.snd_settle_mode == Link.SND_UNSETTLED
        self.rcv.rcv_settle_mode = Link.RCV_SECOND
        assert self.rcv.rcv_settle_mode == Link.RCV_SECOND

        assert self.snd.remote_rcv_settle_mode != Link.RCV_SECOND
        assert self.rcv.remote_snd_settle_mode != Link.SND_UNSETTLED

        self.snd.open()
        self.rcv.open()
        self.pump()

        assert self.snd.remote_rcv_settle_mode == Link.RCV_SECOND
        assert self.rcv.remote_snd_settle_mode == Link.SND_UNSETTLED

    def test_max_message_size(self):
        assert self.snd.max_message_size == 0
        assert self.rcv.remote_max_message_size == 0
        self.snd.max_message_size = 13579
        self.snd.open()
        self.rcv.open()
        self.pump()
        assert self.rcv.remote_max_message_size == 13579

    def test_properties(self):
        sender_props = {symbol('key1'): 'value1',
                        symbol('key2'): 'value2'}
        self.snd.properties = sender_props
        self.snd.open()
        self.rcv.open()
        self.pump()

        assert self.rcv.remote_properties == sender_props, (self.rcv.remote_properties, sender_props)
        assert self.snd.remote_properties is None, (self.snd.remote_properties, None)

    def test_cleanup(self):
        snd, rcv = self.link("test-link")
        snd.open()
        rcv.open()
        self.pump()
        assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        snd.close()
        snd.free()
        del snd
        gc.collect()
        self.pump()
        assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED


class TerminusConfig:

    def __init__(self, type=None, address=None, timeout=None, durability=None,
                 filter=None, capabilities=None, dynamic=False, dist_mode=None):
        self.address = address
        self.timeout = timeout
        self.durability = durability
        self.filter = filter
        self.capabilities = capabilities
        self.dynamic = dynamic
        self.dist_mode = dist_mode
        self.type = type

    def __call__(self, terminus):
        if self.type is not None:
            terminus.type = self.type
        if self.address is not None:
            terminus.address = self.address
        if self.timeout is not None:
            terminus.timeout = self.timeout
        if self.durability is not None:
            terminus.durability = self.durability
        if self.capabilities is not None:
            terminus.capabilities.put_array(False, Data.SYMBOL)
            terminus.capabilities.enter()
            for c in self.capabilities:
                terminus.capabilities.put_symbol(c)
        if self.filter is not None:
            terminus.filter.put_map()
            terminus.filter.enter()
            for (t, v) in self.filter:
                setter = getattr(terminus.filter, "put_%s" % t)
                setter(v)
        if self.dynamic:
            terminus.dynamic = True
        if self.dist_mode is not None:
            terminus.distribution_mode = self.dist_mode


class TransferTest(Test):

    def setUp(self):
        gc.enable()
        self.snd, self.rcv = self.link("test-link")
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()

    def cleanup(self):
        # release resources created by this class
        super(TransferTest, self).cleanup()
        self.c1 = None
        self.c2 = None
        self.snd = None
        self.rcv = None

    def tearDown(self):
        self.cleanup()
        gc.collect()
        assert not gc.garbage

    def test_work_queue(self):
        assert self.c1.work_head is None
        self.snd.delivery("tag")
        assert self.c1.work_head is None
        self.rcv.flow(1)
        self.pump()
        d = self.c1.work_head
        assert d is not None
        tag = d.tag
        assert tag == "tag", tag
        assert d.writable

        n = self.snd.send(b"this is a test")
        assert self.snd.advance()
        assert self.c1.work_head is None

        self.pump()

        d = self.c2.work_head
        assert d.tag == "tag"
        assert d.readable

    def test_multiframe(self):
        self.rcv.flow(1)
        self.snd.delivery("tag")
        msg = b"this is a test"
        n = self.snd.send(msg)
        assert n == len(msg)

        self.pump()

        d = self.rcv.current
        assert d
        assert d.tag == "tag", repr(d.tag)
        assert d.readable

        binary = self.rcv.recv(1024)
        assert binary == msg, (binary, msg)

        binary = self.rcv.recv(1024)
        assert binary == b""

        msg = b"this is more"
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        self.pump()

        binary = self.rcv.recv(1024)
        assert binary == msg, (binary, msg)

        binary = self.rcv.recv(1024)
        assert binary is None

    def test_multiframe_abort(self):
        self.rcv.flow(1)
        sd = self.snd.delivery("tag")
        msg = b"this is a test"
        n = self.snd.send(msg)
        assert n == len(msg)

        self.pump()

        binary = self.rcv.recv(1024)
        assert binary == msg, (binary, msg)

        msg = b"this is more.  Error if not discarded."
        n = self.snd.send(msg)
        assert n == len(msg)
        sd.abort()
        assert sd.aborted

        # Confirm abort discards the sender's buffered content, i.e. no data in generated transfer frame.
        # We want:
        # @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"tag", message-format=0, settled=true, aborted=true]
        # wanted = b"\x00\x00\x00%\x02\x00\x00\x00\x00S\x14\xd0\x00\x00\x00\x15\x00\x00\x00\nR\x00R\x00\xa0\x03tagR\x00A@@@@A"
        # wanted = b"\x00\x00\x00\x26\x02\x00\x00\x00\x00S\x14\xd0\x00\x00\x00\x16\x00\x00\x00\x0bR\x00R\x00\xa0\x03tagR\x00A@@@@A@"
        # wanted = b'\x00\x00\x00\x20\x02\x00\x00\x00\x00S\x14\xc0\x13\x0bR\x00R\x00\xa0\x03tagR\x00A@@@@A@'
        # wanted = b'\x00\x00\x00"\x02\x00\x00\x00\x00S\x14\xd0\x00\x00\x00\x12\x00\x00\x00\nCC\xa0\x03tagCA@@@@A'
        # wanted = b'\x00\x00\x00\x1d\x02\x00\x00\x00\x00S\x14\xc0\x10\x0bCC\xa0\x03tagCA@@@@A@'
        wanted = b'\x00\x00\x00\x1c\x02\x00\x00\x00\x00S\x14\xc0\x0f\x0aCC\xa0\x03tagCA@@@@A'
        t = self.snd.transport
        wire_bytes = t.peek(1024)
        assert wanted == wire_bytes, wire_bytes

        self.pump()
        assert self.rcv.current.aborted

    def test_multiframe_last_empty(self):
        self.rcv.flow(1)
        sd = self.snd.delivery("tag")
        msg_p1 = b"this is a test"
        n = self.snd.send(msg_p1)
        assert n == len(msg_p1)

        self.pump()

        assert len(msg_p1) == self.rcv.current.pending
        assert self.rcv.current.partial
        msg_p2 = b"this is more."
        n = self.snd.send(msg_p2)
        assert n == len(msg_p2)

        self.pump()

        msg = msg_p1 + msg_p2
        assert len(msg) == self.rcv.current.pending
        assert self.rcv.current.partial
        # Advance.  Should send empty xfer frame with more flag false.
        assert self.snd.advance()

        self.pump()

        assert len(msg) == self.rcv.current.pending
        assert not self.rcv.current.partial
        binary = self.rcv.recv(self.rcv.current.pending)
        assert binary == msg

    def test_disposition(self):
        self.rcv.flow(1)

        self.pump()

        sd = self.snd.delivery("tag")
        msg = b"this is a test"
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        self.pump()

        rd = self.rcv.current
        assert rd is not None
        assert rd.tag == sd.tag
        rmsg = self.rcv.recv(1024)
        assert rmsg == msg
        rd.update(Delivery.ACCEPTED)

        self.pump()

        rdisp = sd.remote_state
        ldisp = rd.local_state
        assert rdisp == ldisp == Delivery.ACCEPTED, (rdisp, ldisp)
        assert sd.updated

        sd.update(Delivery.ACCEPTED)

        self.pump()

        assert sd.local_state == rd.remote_state == Delivery.ACCEPTED
        sd.settle()

    def test_delivery_id_ordering(self):
        self.rcv.flow(1024)
        self.pump(buffer_size=64 * 1024)

        # fill up delivery buffer on sender
        for m in range(1024):
            sd = self.snd.delivery("tag%s" % m)
            msg = ("message %s" % m).encode('ascii')
            n = self.snd.send(msg)
            assert n == len(msg)
            assert self.snd.advance()

        self.pump(buffer_size=64 * 1024)

        # receive a session-windows worth of messages and accept them
        for m in range(1024):
            rd = self.rcv.current
            assert rd is not None, m
            assert rd.tag == ("tag%s" % m), (rd.tag, m)
            msg = self.rcv.recv(1024)
            assert msg == ("message %s" % m).encode('ascii'), (msg, m)
            rd.update(Delivery.ACCEPTED)
            rd.settle()

        self.pump(buffer_size=64 * 1024)

        # add some new deliveries
        for m in range(1024, 1450):
            sd = self.snd.delivery("tag%s" % m)
            msg = ("message %s" % m).encode('ascii')
            n = self.snd.send(msg)
            assert n == len(msg)
            assert self.snd.advance()

        # handle all disposition changes to sent messages
        d = self.c1.work_head
        while d:
            next_d = d.work_next
            if d.updated:
                d.update(Delivery.ACCEPTED)
                d.settle()
            d = next_d

        # submit some more deliveries
        for m in range(1450, 1500):
            sd = self.snd.delivery("tag%s" % m)
            msg = ("message %s" % m).encode('ascii')
            n = self.snd.send(msg)
            assert n == len(msg)
            assert self.snd.advance()

        self.pump(buffer_size=64 * 1024)
        self.rcv.flow(1024)
        self.pump(buffer_size=64 * 1024)

        # verify remaining messages can be received and accepted
        for m in range(1024, 1500):
            rd = self.rcv.current
            assert rd is not None, m
            assert rd.tag == ("tag%s" % m), (rd.tag, m)
            msg = self.rcv.recv(1024)
            assert msg == ("message %s" % m).encode('ascii'), (msg, m)
            rd.update(Delivery.ACCEPTED)
            rd.settle()

    def test_cleanup(self):
        self.rcv.flow(10)
        self.pump()

        for x in range(10):
            self.snd.delivery("tag%d" % x)
            msg = b"this is a test"
            n = self.snd.send(msg)
            assert n == len(msg)
            assert self.snd.advance()
        self.snd.close()
        self.snd.free()
        self.snd = None
        gc.collect()

        self.pump()

        for x in range(10):
            rd = self.rcv.current
            assert rd is not None
            assert rd.tag == "tag%d" % x
            rmsg = self.rcv.recv(1024)
            assert self.rcv.advance()
            assert rmsg == msg
            # close of snd should've settled:
            assert rd.settled
            rd.settle()


class MaxFrameTransferTest(Test):

    def setUp(self):
        pass

    def cleanup(self):
        # release resources created by this class
        super(MaxFrameTransferTest, self).cleanup()
        self.c1 = None
        self.c2 = None
        self.snd = None
        self.rcv = None

    def tearDown(self):
        self.cleanup()

    def message(self, size):
        parts = []
        for i in range(size):
            parts.append(str(i))
        return "/".join(parts)[:size].encode("utf-8")

    def testMinFrame(self):
        """
        Configure receiver to support minimum max-frame as defined by AMQP-1.0.
        Verify transfer of messages larger than 512.
        """
        self.snd, self.rcv = self.link("test-link", max_frame=[0, 512])
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()
        assert self.rcv.session.connection.transport.max_frame_size == 512
        assert self.snd.session.connection.transport.remote_max_frame_size == 512

        self.rcv.flow(1)
        self.snd.delivery("tag")
        msg = self.message(513)
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        self.pump()

        binary = self.rcv.recv(513)
        assert binary == msg

        binary = self.rcv.recv(1024)
        assert binary is None

    def testOddFrame(self):
        """
        Test an odd sized max limit with data that will require multiple frames to
        be transfered.
        """
        self.snd, self.rcv = self.link("test-link", max_frame=[0, 521])
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()
        assert self.rcv.session.connection.transport.max_frame_size == 521
        assert self.snd.session.connection.transport.remote_max_frame_size == 521

        self.rcv.flow(2)
        self.snd.delivery("tag")
        msg = ("X" * 1699).encode('utf-8')
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        self.pump()

        binary = self.rcv.recv(1699)
        assert binary == msg

        binary = self.rcv.recv(1024)
        assert binary is None

        self.rcv.advance()

        self.snd.delivery("gat")
        msg = self.message(1426)
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        self.pump()

        binary = self.rcv.recv(1426)
        assert binary == msg

        self.pump()

        binary = self.rcv.recv(1024)
        assert binary is None

    def testSendQueuedMultiFrameMessages(self, sendSingleFrameMsg=False):
        """
        Test that multiple queued messages on the same link
        with multi-frame content are sent correctly. Use an
        odd max frame size, send enough data to use many.
        """
        self.snd, self.rcv = self.link("test-link", max_frame=[0, 517])
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()
        assert self.rcv.session.connection.transport.max_frame_size == 517
        assert self.snd.session.connection.transport.remote_max_frame_size == 517

        self.rcv.flow(5)

        self.pump()

        # Send a delivery with 5 frames, all bytes as X1234
        self.snd.delivery("tag")
        msg = ("X1234" * 425).encode('utf-8')
        assert 2125 == len(msg)
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        # Send a delivery with 5 frames, all bytes as Y5678
        self.snd.delivery("tag2")
        msg2 = ("Y5678" * 425).encode('utf-8')
        assert 2125 == len(msg2)
        n = self.snd.send(msg2)
        assert n == len(msg2)
        assert self.snd.advance()

        self.pump()

        if sendSingleFrameMsg:
            # Send a delivery with 1 frame
            self.snd.delivery("tag3")
            msg3 = ("Z").encode('utf-8')
            assert 1 == len(msg3)
            n = self.snd.send(msg3)
            assert n == len(msg3)
            assert self.snd.advance()
            self.pump()

        binary = self.rcv.recv(5000)
        self.assertEqual(binary, msg)

        self.rcv.advance()

        binary2 = self.rcv.recv(5000)
        self.assertEqual(binary2, msg2)

        self.rcv.advance()

        if sendSingleFrameMsg:
            binary3 = self.rcv.recv(5000)
            self.assertEqual(binary3, msg3)
            self.rcv.advance()

        self.pump()

    def testSendQueuedMultiFrameMessagesThenSingleFrameMessage(self):
        self.testSendQueuedMultiFrameMessages(sendSingleFrameMsg=True)

    def testBigMessage(self):
        """
        Test transferring a big message.
        """
        self.snd, self.rcv = self.link("test-link")
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()

        self.rcv.flow(2)
        self.snd.delivery("tag")
        msg = self.message(1024 * 256)
        n = self.snd.send(msg)
        assert n == len(msg)
        assert self.snd.advance()

        self.pump()

        binary = self.rcv.recv(1024 * 256)
        assert binary == msg

        binary = self.rcv.recv(1024)
        assert binary is None

    def testMaxFrameAbort(self):
        self.snd, self.rcv = self.link("test-link", max_frame=[0, 512])
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.rcv.flow(1)
        sndt = self.snd.transport
        dblfrmbytes = ("0123456789" * 52).encode('utf-8')
        assert 520 == len(dblfrmbytes)
        self.pump()

        # Part 1: abort a delivery that would generate two frames - before they are generated.
        # Expect that no output is generated and credit is unaffected.
        assert self.snd.credit == 1
        sd = self.snd.delivery("tag_0")
        n = self.snd.send(dblfrmbytes)
        assert n == len(dblfrmbytes)
        sd.abort()
        generated = sndt.peek(2048)
        assert generated == b""
        assert self.snd.credit == 1

        # Part 2: abort a streaming delivery whose last content would generate two frames.
        # First send some un-aborted data across the wire.
        sd = self.snd.delivery("tag_1")
        n = self.snd.send(dblfrmbytes)
        assert n == len(dblfrmbytes)
        self.pump()
        binary = self.rcv.recv(2048)
        assert binary == dblfrmbytes, (binary, dblfrmbytes)
        # Now send more data spanning two frames and immediately abort.
        # Unlike part 1, an abort frame is required to sync with peer.
        n = self.snd.send(dblfrmbytes)
        assert n == len(dblfrmbytes)
        sd.abort()
        assert sd.aborted
        # Expect a single abort transfer frame with no content.  One credit is consumed.
        # @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"tag_1", message-format=0, settled=true, aborted=true]
        # wanted = b"\x00\x00\x00\x27\x02\x00\x00\x00\x00S\x14\xd0\x00\x00\x00\x17\x00\x00\x00\nR\x00R\x00\xa0\x05tag_1R\x00A@@@@A"
        # wanted = b"\x00\x00\x00\x28\x02\x00\x00\x00\x00S\x14\xd0\x00\x00\x00\x18\x00\x00\x00\x0bR\x00R\x00\xa0\x05tag_1R\x00A@@@@A@"
        # wanted = b'\x00\x00\x00\x22\x02\x00\x00\x00\x00S\x14\xc0\x15\x0bR\x00R\x00\xa0\x05tag_1R\x00A@@@@A@'
        # wanted = b'\x00\x00\x00\x24\x02\x00\x00\x00\x00S\x14\xd0\x00\x00\x00\x14\x00\x00\x00\nCC\xa0\x05tag_1CA@@@@A'
        # wanted = b'\x00\x00\x00\x1f\x02\x00\x00\x00\x00S\x14\xc0\x12\x0bCC\xa0\x05tag_1CA@@@@A@'
        wanted = b'\x00\x00\x00\x1e\x02\x00\x00\x00\x00S\x14\xc0\x11\x0aCC\xa0\x05tag_1CA@@@@A'
        t = self.snd.transport
        wire_bytes = t.peek(2048)
        assert wanted == wire_bytes, wire_bytes
        assert self.snd.credit == 0
        self.pump()
        assert self.rcv.current.aborted
        # Confirm no lingering transfers by closing the link.
        self.snd.close()
        # Expect just the detach frame.
        # @detach(22) [handle=0, closed=true]
        # wanted = b"\x00\x00\x00\x17\x02\x00\x00\x00\x00S\x16\xd0\x00\x00\x00\x07\x00\x00\x00\x02R\x00A"
        # wanted = b"\x00\x00\x00\x18\x02\x00\x00\x00\x00S\x16\xd0\x00\x00\x00\x08\x00\x00\x00\x03R\x00A@"
        # wanted = b'\x00\x00\x00\x12\x02\x00\x00\x00\x00S\x16\xc0\x05\x03R\x00A@'
        # wanted = b"\x00\x00\x00\x16\x02\x00\x00\x00\x00S\x16\xd0\x00\x00\x00\x06\x00\x00\x00\x02CA"
        # wanted = b'\x00\x00\x00\x11\x02\x00\x00\x00\x00S\x16\xc0\x04\x03CA@'
        wanted = b'\x00\x00\x00\x10\x02\x00\x00\x00\x00S\x16\xc0\x03\x02CA'
        wire_bytes = t.peek(2048)
        assert wanted == wire_bytes, wire_bytes


class IdleTimeoutTest(Test):

    def setUp(self):
        pass

    def cleanup(self):
        # release resources created by this class
        super(IdleTimeoutTest, self).cleanup()
        self.snd = None
        self.rcv = None
        self.c1 = None
        self.c2 = None

    def tearDown(self):
        self.cleanup()

    def message(self, size):
        parts = []
        for i in range(size):
            parts.append(str(i))
        return "/".join(parts)[:size]

    def testGetSet(self):
        """
        Verify the configuration and negotiation of the idle timeout.
        """

        self.snd, self.rcv = self.link("test-link", idle_timeout=[1.0, 2.0])
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()
        # proton advertises 1/2 the configured timeout to the peer:
        assert self.rcv.session.connection.transport.idle_timeout == 2.0
        assert self.rcv.session.connection.transport.remote_idle_timeout == 0.5
        assert self.snd.session.connection.transport.idle_timeout == 1.0
        assert self.snd.session.connection.transport.remote_idle_timeout == 1.0

    def testTimeout(self):
        """
        Verify the AMQP Connection idle timeout.
        """

        # snd will timeout the Connection if no frame is received within 1000 ticks
        self.snd, self.rcv = self.link("test-link", idle_timeout=[1.0, 0])
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()

        t_snd = self.snd.session.connection.transport
        t_rcv = self.rcv.session.connection.transport
        assert t_rcv.idle_timeout == 0.0
        # proton advertises 1/2 the timeout (see spec)
        assert t_rcv.remote_idle_timeout == 0.5
        assert t_snd.idle_timeout == 1.0
        assert t_snd.remote_idle_timeout == 0.0

        sndr_frames_in = t_snd.frames_input
        rcvr_frames_out = t_rcv.frames_output

        # at t+1msec, nothing should happen:
        clock = 0.001
        assert t_snd.tick(clock) == 1.001, "deadline for remote timeout"
        assert t_rcv.tick(clock) == 0.251, "deadline to send keepalive"
        self.pump()
        assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"

        # at one tick from expected idle frame send, nothing should happen:
        clock = 0.250
        assert t_snd.tick(clock) == 1.001, "deadline for remote timeout"
        assert t_rcv.tick(clock) == 0.251, "deadline to send keepalive"
        self.pump()
        assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"

        # this should cause rcvr to expire and send a keepalive
        clock = 0.251
        assert t_snd.tick(clock) == 1.001, "deadline for remote timeout"
        assert t_rcv.tick(clock) == 0.501, "deadline to send keepalive"
        self.pump()
        sndr_frames_in += 1
        rcvr_frames_out += 1
        assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"
        assert rcvr_frames_out == t_rcv.frames_output, "unexpected frame"

        # since a keepalive was received, sndr will rebase its clock against this tick:
        # and the receiver should not change its deadline
        clock = 0.498
        assert t_snd.tick(clock) == 1.498, "deadline for remote timeout"
        assert t_rcv.tick(clock) == 0.501, "deadline to send keepalive"
        self.pump()
        assert sndr_frames_in == t_snd.frames_input, "unexpected received frame"

        # now expire sndr
        clock = 1.499
        t_snd.tick(clock)
        self.pump()
        assert self.c2.state & Endpoint.REMOTE_CLOSED
        assert self.c2.remote_condition.name == "amqp:resource-limit-exceeded"


class CreditTest(Test):

    def setUp(self):
        self.snd, self.rcv = self.link("test-link", max_frame=(16 * 1024, 16 * 1024))
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()

    def cleanup(self):
        # release resources created by this class
        super(CreditTest, self).cleanup()
        self.c1 = None
        self.snd = None
        self.c2 = None
        self.rcv2 = None
        self.snd2 = None

    def tearDown(self):
        self.cleanup()

    def testCreditSender(self, count=1024):
        credit = self.snd.credit
        assert credit == 0, credit
        self.rcv.flow(10)
        self.pump()
        credit = self.snd.credit
        assert credit == 10, credit

        self.rcv.flow(count)
        self.pump()
        credit = self.snd.credit
        assert credit == 10 + count, credit

    def testCreditReceiver(self):
        self.rcv.flow(10)
        self.pump()
        assert self.rcv.credit == 10, self.rcv.credit

        d = self.snd.delivery("tag")
        assert d
        assert self.snd.advance()
        self.pump()
        assert self.rcv.credit == 10, self.rcv.credit
        assert self.rcv.queued == 1, self.rcv.queued
        c = self.rcv.current
        assert c.tag == "tag", c.tag
        assert self.rcv.advance()
        assert self.rcv.credit == 9, self.rcv.credit
        assert self.rcv.queued == 0, self.rcv.queued

    def _testBufferingOnClose(self, a, b):
        for i in range(10):
            d = self.snd.delivery("tag-%s" % i)
            assert d
            d.settle()
        self.pump()
        assert self.snd.queued == 10

        endpoints = {"connection": (self.c1, self.c2),
                     "session": (self.snd.session, self.rcv.session),
                     "link": (self.snd, self.rcv)}

        local_a, remote_a = endpoints[a]
        local_b, remote_b = endpoints[b]

        remote_b.close()
        self.pump()
        assert local_b.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED
        local_a.close()
        self.pump()
        assert remote_a.state & Endpoint.REMOTE_CLOSED
        assert self.snd.queued == 10

    def testBufferingOnCloseLinkLink(self):
        self._testBufferingOnClose("link", "link")

    def testBufferingOnCloseLinkSession(self):
        self._testBufferingOnClose("link", "session")

    def testBufferingOnCloseLinkConnection(self):
        self._testBufferingOnClose("link", "connection")

    def testBufferingOnCloseSessionLink(self):
        self._testBufferingOnClose("session", "link")

    def testBufferingOnCloseSessionSession(self):
        self._testBufferingOnClose("session", "session")

    def testBufferingOnCloseSessionConnection(self):
        self._testBufferingOnClose("session", "connection")

    def testBufferingOnCloseConnectionLink(self):
        self._testBufferingOnClose("connection", "link")

    def testBufferingOnCloseConnectionSession(self):
        self._testBufferingOnClose("connection", "session")

    def testBufferingOnCloseConnectionConnection(self):
        self._testBufferingOnClose("connection", "connection")

    def testFullDrain(self):
        assert self.rcv.credit == 0
        assert self.snd.credit == 0
        self.rcv.drain(10)
        assert self.rcv.draining()
        assert self.rcv.credit == 10
        assert self.snd.credit == 0
        self.pump()
        assert self.rcv.credit == 10
        assert self.snd.credit == 10
        assert self.rcv.draining()
        self.snd.drained()
        assert self.rcv.credit == 10
        assert self.snd.credit == 0
        assert self.rcv.draining()
        self.pump()
        assert self.rcv.credit == 0
        assert self.snd.credit == 0
        assert not self.rcv.draining()
        drained = self.rcv.drained()
        assert drained == 10, drained

    def testPartialDrain(self):
        self.rcv.drain(2)
        assert self.rcv.draining()
        self.pump()

        d = self.snd.delivery("tag")
        assert d
        assert self.snd.advance()
        self.snd.drained()
        assert self.rcv.draining()
        self.pump()
        assert not self.rcv.draining()

        c = self.rcv.current
        assert self.rcv.queued == 1, self.rcv.queued
        assert c.tag == d.tag, c.tag
        assert self.rcv.advance()
        assert not self.rcv.current
        assert self.rcv.credit == 0, self.rcv.credit
        assert not self.rcv.draining()
        drained = self.rcv.drained()
        assert drained == 1, drained

    def testDrainFlow(self):
        assert self.rcv.credit == 0
        assert self.snd.credit == 0
        self.rcv.drain(10)
        assert self.rcv.credit == 10
        assert self.snd.credit == 0
        self.pump()
        assert self.rcv.credit == 10
        assert self.snd.credit == 10
        self.snd.drained()
        assert self.rcv.credit == 10
        assert self.snd.credit == 0
        self.pump()
        assert self.rcv.credit == 0
        assert self.snd.credit == 0
        self.rcv.flow(10)
        assert self.rcv.credit == 10
        assert self.snd.credit == 0
        self.pump()
        assert self.rcv.credit == 10
        assert self.snd.credit == 10
        self.snd.drained()
        assert self.rcv.credit == 10
        assert self.snd.credit == 10
        self.pump()
        assert self.rcv.credit == 10
        assert self.snd.credit == 10
        drained = self.rcv.drained()
        assert drained == 10, drained

    def testNegative(self):
        assert self.snd.credit == 0
        d = self.snd.delivery("tag")
        assert d
        assert self.snd.advance()
        self.pump()

        assert self.rcv.credit == 0
        assert self.rcv.queued == 0

        self.rcv.flow(1)
        assert self.rcv.credit == 1
        assert self.rcv.queued == 0
        self.pump()
        assert self.rcv.credit == 1
        assert self.rcv.queued == 1, self.rcv.queued

        c = self.rcv.current
        assert c
        assert c.tag == "tag"
        assert self.rcv.advance()
        assert self.rcv.credit == 0
        assert self.rcv.queued == 0

    def testDrainZero(self):
        assert self.snd.credit == 0
        assert self.rcv.credit == 0
        assert self.rcv.queued == 0
        drained = self.rcv.drained()
        assert drained == 0

        self.rcv.flow(10)
        self.pump()
        assert self.snd.credit == 10
        assert self.rcv.credit == 10
        assert self.rcv.queued == 0

        self.snd.drained()
        self.pump()
        assert self.snd.credit == 10
        assert self.rcv.credit == 10
        assert self.rcv.queued == 0
        drained = self.rcv.drained()
        assert drained == 0

        self.rcv.drain(0)
        assert self.snd.credit == 10
        assert self.rcv.credit == 10
        assert self.rcv.queued == 0

        self.pump()

        assert self.snd.credit == 10
        assert self.rcv.credit == 10
        assert self.rcv.queued == 0

        self.snd.drained()
        assert self.snd.credit == 0
        assert self.rcv.credit == 10
        assert self.rcv.queued == 0
        drained = self.rcv.drained()
        assert drained == 0
        self.pump()

        assert self.snd.credit == 0
        assert self.rcv.credit == 0
        assert self.rcv.queued == 0
        drained = self.rcv.drained()
        assert drained == 10

    def testDrainOrder(self):
        """ Verify drain/drained works regardless of ordering.  See PROTON-401
        """
        assert self.snd.credit == 0
        assert self.rcv.credit == 0
        assert self.rcv.queued == 0

        # self.rcv.session.connection.transport.trace(Transport.TRACE_FRM)
        # self.snd.session.connection.transport.trace(Transport.TRACE_FRM)

        # verify that a sender that has reached the drain state will respond
        # promptly to a drain issued by the peer.
        self.rcv.flow(10)
        self.pump()
        assert self.snd.credit == 10, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        sd = self.snd.delivery("tagA")
        assert sd
        n = self.snd.send(b"A")
        assert n == 1
        self.pump()
        self.snd.advance()

        # done sending, so signal that we are drained:
        self.snd.drained()
        self.pump()
        assert self.snd.credit == 9, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        self.rcv.drain(0)
        self.pump()
        assert self.snd.credit == 9, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        data = self.rcv.recv(10)
        assert data == b"A", data
        self.rcv.advance()
        self.pump()
        assert self.snd.credit == 9, self.snd.credit
        assert self.rcv.credit == 9, self.rcv.credit

        self.snd.drained()
        self.pump()
        assert self.snd.credit == 0, self.snd.credit
        assert self.rcv.credit == 0, self.rcv.credit

        # verify that a drain requested by the peer is not "acknowledged" until
        # after the sender has completed sending its pending messages

        self.rcv.flow(10)
        self.pump()
        assert self.snd.credit == 10, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        sd = self.snd.delivery("tagB")
        assert sd
        n = self.snd.send(b"B")
        assert n == 1
        self.snd.advance()
        self.pump()
        assert self.snd.credit == 9, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        self.rcv.drain(0)
        self.pump()
        assert self.snd.credit == 9, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        sd = self.snd.delivery("tagC")
        assert sd
        n = self.snd.send(b"C")
        assert n == 1
        self.snd.advance()
        self.pump()
        assert self.snd.credit == 8, self.snd.credit
        assert self.rcv.credit == 10, self.rcv.credit

        # now that the sender has finished sending everything, it can signal
        # drained
        self.snd.drained()
        self.pump()
        assert self.snd.credit == 0, self.snd.credit
        assert self.rcv.credit == 2, self.rcv.credit

        data = self.rcv.recv(10)
        assert data == b"B", data
        self.rcv.advance()
        data = self.rcv.recv(10)
        assert data == b"C", data
        self.rcv.advance()
        self.pump()
        assert self.snd.credit == 0, self.snd.credit
        assert self.rcv.credit == 0, self.rcv.credit

    def testPushback(self, count=10):
        assert self.snd.credit == 0
        assert self.rcv.credit == 0

        self.rcv.flow(count)
        self.pump()

        for i in range(count):
            d = self.snd.delivery("tag%s" % i)
            assert d
            self.snd.advance()

        assert self.snd.queued == count
        assert self.rcv.queued == 0
        self.pump()
        assert self.snd.queued == 0
        assert self.rcv.queued == count

        d = self.snd.delivery("extra")
        self.snd.advance()

        assert self.snd.queued == 1
        assert self.rcv.queued == count
        self.pump()
        assert self.snd.queued == 1
        assert self.rcv.queued == count

    def testHeadOfLineBlocking(self):
        self.snd2 = self.snd.session.sender("link-2")
        self.rcv2 = self.rcv.session.receiver("link-2")
        self.snd2.open()
        self.rcv2.open()
        self.pump()
        assert self.snd2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
        assert self.rcv2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE

        self.rcv.flow(5)
        self.rcv2.flow(10)
        self.pump()

        assert self.snd.credit == 5
        assert self.snd2.credit == 10

        for i in range(10):
            tag = "test %d" % i
            self.snd.delivery(tag)
            self.snd.send(tag.encode("ascii"))
            assert self.snd.advance()
            self.snd2.delivery(tag)
            self.snd2.send(tag.encode("ascii"))
            assert self.snd2.advance()

        self.pump()

        for i in range(5):
            b = self.rcv.recv(512)
            assert self.rcv.advance()
            b = self.rcv2.recv(512)
            assert self.rcv2.advance()

        for i in range(5):
            b = self.rcv2.recv(512)
            assert self.rcv2.advance()


class SessionCreditTest(Test):

    def tearDown(self):
        self.cleanup()

    def testBuffering(self, count=32, size=1024, capacity=16 * 1024, max_frame=1024):
        snd, rcv = self.link("test-link", max_frame=(max_frame, max_frame))
        rcv.session.incoming_capacity = capacity
        snd.open()
        rcv.open()
        rcv.flow(count)
        self.pump()

        assert count > 0

        total_bytes = count * size

        assert snd.session.outgoing_bytes == 0, snd.session.outgoing_bytes
        assert rcv.session.incoming_bytes == 0, rcv.session.incoming_bytes
        assert snd.queued == 0, snd.queued
        assert rcv.queued == 0, rcv.queued

        data = bytes(bytearray(size))
        idx = 0
        while snd.credit:
            d = snd.delivery("tag%s" % idx)
            assert d
            n = snd.send(data)
            assert n == size, (n, size)
            assert snd.advance()
            self.pump()
            idx += 1

        assert idx == count, (idx, count)

        assert snd.session.outgoing_bytes < total_bytes, (snd.session.outgoing_bytes, total_bytes)
        assert rcv.session.incoming_bytes < capacity, (rcv.session.incoming_bytes, capacity)
        assert snd.session.outgoing_bytes + rcv.session.incoming_bytes == total_bytes, \
            (snd.session.outgoing_bytes, rcv.session.incoming_bytes, total_bytes)
        if snd.session.outgoing_bytes > 0:
            available = rcv.session.incoming_capacity - rcv.session.incoming_bytes
            assert available < max_frame, (available, max_frame)

        for i in range(count):
            d = rcv.current
            assert d, i
            pending = d.pending
            before = rcv.session.incoming_bytes
            assert rcv.advance()
            after = rcv.session.incoming_bytes
            assert before - after == pending, (before, after, pending)
            snd_before = snd.session.incoming_bytes
            self.pump()
            snd_after = snd.session.incoming_bytes

            assert rcv.session.incoming_bytes < capacity
            if snd_before > 0:
                assert capacity - after <= max_frame
                assert snd_before > snd_after
            if snd_after > 0:
                available = rcv.session.incoming_capacity - rcv.session.incoming_bytes
                assert available < max_frame, available

    def testBufferingSize16(self):
        self.testBuffering(size=16)

    def testBufferingSize256(self):
        self.testBuffering(size=256)

    def testBufferingSize512(self):
        self.testBuffering(size=512)

    def testBufferingSize2048(self):
        self.testBuffering(size=2048)

    def testBufferingSize1025(self):
        self.testBuffering(size=1025)

    def testBufferingSize1023(self):
        self.testBuffering(size=1023)

    def testBufferingSize989(self):
        self.testBuffering(size=989)

    def testBufferingSize1059(self):
        self.testBuffering(size=1059)

    def testCreditWithBuffering(self):
        snd, rcv = self.link("test-link", max_frame=(1024, 1024))
        rcv.session.incoming_capacity = 64 * 1024
        snd.open()
        rcv.open()
        rcv.flow(128)
        self.pump()

        assert snd.credit == 128, snd.credit
        assert rcv.queued == 0, rcv.queued

        idx = 0
        while snd.credit:
            d = snd.delivery("tag%s" % idx)
            snd.send(("x" * 1024).encode('ascii'))
            assert d
            assert snd.advance()
            self.pump()
            idx += 1

        assert idx == 128, idx
        assert rcv.queued < 128, rcv.queued

        rcv.flow(1)
        self.pump()
        assert snd.credit == 1, snd.credit


class SettlementTest(Test):

    def setUp(self):
        self.snd, self.rcv = self.link("test-link")
        self.c1 = self.snd.session.connection
        self.c2 = self.rcv.session.connection
        self.snd.open()
        self.rcv.open()
        self.pump()

    def cleanup(self):
        # release resources created by this class
        super(SettlementTest, self).cleanup()
        self.c1 = None
        self.snd = None
        self.c2 = None
        self.rcv2 = None
        self.snd2 = None

    def tearDown(self):
        self.cleanup()

    def testSettleCurrent(self):
        self.rcv.flow(10)
        self.pump()

        assert self.snd.credit == 10, self.snd.credit
        d = self.snd.delivery("tag")
        e = self.snd.delivery("tag2")
        assert d
        assert e
        c = self.snd.current
        assert c.tag == "tag", c.tag
        c.settle()
        c = self.snd.current
        assert c.tag == "tag2", c.tag
        c.settle()
        c = self.snd.current
        assert not c
        self.pump()

        c = self.rcv.current
        assert c
        assert c.tag == "tag", c.tag
        assert c.settled
        c.settle()
        c = self.rcv.current
        assert c
        assert c.tag == "tag2", c.tag
        assert c.settled
        c.settle()
        c = self.rcv.current
        assert not c

    def testUnsettled(self):
        self.rcv.flow(10)
        self.pump()

        assert self.snd.unsettled == 0, self.snd.unsettled
        assert self.rcv.unsettled == 0, self.rcv.unsettled

        d = self.snd.delivery("tag")
        assert d
        assert self.snd.unsettled == 1, self.snd.unsettled
        assert self.rcv.unsettled == 0, self.rcv.unsettled
        assert self.snd.advance()
        self.pump()

        assert self.snd.unsettled == 1, self.snd.unsettled
        assert self.rcv.unsettled == 1, self.rcv.unsettled

        c = self.rcv.current
        assert c
        c.settle()

        assert self.snd.unsettled == 1, self.snd.unsettled
        assert self.rcv.unsettled == 0, self.rcv.unsettled

    def testMultipleUnsettled(self, count=1024, size=1024):
        self.rcv.flow(count)
        self.pump()

        assert self.snd.unsettled == 0, self.snd.unsettled
        assert self.rcv.unsettled == 0, self.rcv.unsettled

        unsettled = []

        for i in range(count):
            sd = self.snd.delivery("tag%s" % i)
            assert sd
            n = self.snd.send(("x" * size).encode('ascii'))
            assert n == size, n
            assert self.snd.advance()
            self.pump()

            rd = self.rcv.current
            assert rd, "did not receive delivery %s" % i
            n = rd.pending
            b = self.rcv.recv(n)
            assert len(b) == n, (b, n)
            rd.update(Delivery.ACCEPTED)
            assert self.rcv.advance()
            self.pump()
            unsettled.append(rd)

        assert self.rcv.unsettled == count

        for rd in unsettled:
            rd.settle()

    def testMultipleUnsettled2K1K(self):
        self.testMultipleUnsettled(2048, 1024)

    def testMultipleUnsettled4K1K(self):
        self.testMultipleUnsettled(4096, 1024)

    def testMultipleUnsettled1K2K(self):
        self.testMultipleUnsettled(1024, 2048)

    def testMultipleUnsettled2K2K(self):
        self.testMultipleUnsettled(2048, 2048)

    def testMultipleUnsettled4K2K(self):
        self.testMultipleUnsettled(4096, 2048)


class PipelineTest(Test):

    def setUp(self):
        self.c1, self.c2 = self.connection()

    def cleanup(self):
        # release resources created by this class
        super(PipelineTest, self).cleanup()
        self.c1 = None
        self.c2 = None

    def tearDown(self):
        self.cleanup()

    def test(self):
        ssn = self.c1.session()
        snd = ssn.sender("sender")
        self.c1.open()
        ssn.open()
        snd.open()

        for i in range(10):
            t = "delivery-%s" % i
            d = snd.delivery(t)
            snd.send(t.encode('ascii'))
            d.settle()

        snd.close()
        ssn.close()
        self.c1.close()

        self.pump()

        state = self.c2.state
        assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
        ssn2 = self.c2.session_head(Endpoint.LOCAL_UNINIT)
        assert ssn2
        state == ssn2.state
        assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state
        rcv = self.c2.link_head(Endpoint.LOCAL_UNINIT)
        assert rcv
        state = rcv.state
        assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state

        self.c2.open()
        ssn2.open()
        rcv.open()
        rcv.flow(10)
        assert rcv.queued == 0, rcv.queued

        self.pump()

        assert rcv.queued == 10, rcv.queued
        state = rcv.state
        assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
        state = ssn2.state
        assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state
        state = self.c2.state
        assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state

        for i in range(rcv.queued):
            d = rcv.current
            assert d
            assert d.tag == "delivery-%s" % i
            d.settle()

        assert rcv.queued == 0, rcv.queued


class ServerTest(Test):

    def testKeepalive(self):
        """ Verify that idle frames are sent to keep a Connection alive
        """
        idle_timeout = self.delay
        server = common.TestServer()
        server.start()

        class Program:

            def on_reactor_init(self, event):
                self.conn = event.reactor.connection()
                self.conn.hostname = "%s:%s" % (server.host, server.port)
                self.conn.open()
                self.old_count = None
                event.reactor.schedule(3 * idle_timeout, self)

            def on_connection_bound(self, event):
                event.transport.idle_timeout = idle_timeout

            def on_connection_remote_open(self, event):
                self.old_count = event.transport.frames_input

            def on_timer_task(self, event):
                assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection terminated"
                assert self.conn.transport.frames_input > self.old_count, "No idle frames received"
                self.conn.close()

        Container(Program()).run()
        server.stop()

    def testIdleTimeout(self):
        """ Verify that a Connection is terminated properly when Idle frames do not
        arrive in a timely manner.
        """
        idle_timeout = self.delay
        server = common.TestServer(idle_timeout=idle_timeout)
        server.start()

        class Program:

            def on_reactor_init(self, event):
                self.conn = event.reactor.connection()
                self.conn.hostname = "%s:%s" % (server.host, server.port)
                self.conn.open()
                self.remote_condition = None
                self.old_count = None
                # verify the connection stays up even if we don't explicitly send stuff
                # wait up to 3x the idle timeout
                event.reactor.schedule(3 * idle_timeout, self)

            def on_connection_bound(self, event):
                self.transport = event.transport

            def on_connection_remote_open(self, event):
                self.old_count = event.transport.frames_output

            def on_connection_remote_close(self, event):
                assert self.conn.remote_condition
                assert self.conn.remote_condition.name == "amqp:resource-limit-exceeded"
                self.remote_condition = self.conn.remote_condition

            def on_timer_task(self, event):
                assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection terminated"
                assert self.conn.transport.frames_output > self.old_count, "No idle frames sent"

                # now wait to explicitly cause the other side to expire:
                suspend_time = 3 * idle_timeout
                if os.name == "nt":
                    # On windows, the full delay gets too close to the graceful/hard close tipping point
                    suspend_time = 2.5 * idle_timeout
                sleep(suspend_time)

        p = Program()
        Container(p).run()
        assert p.remote_condition
        assert p.remote_condition.name == "amqp:resource-limit-exceeded"
        server.stop()


class NoValue:

    def __init__(self):
        pass

    def apply(self, dlv):
        pass

    def check(self, dlv):
        assert dlv.data is None
        assert dlv.section_number == 0
        assert dlv.section_offset == 0
        assert dlv.condition is None
        assert dlv.failed == False
        assert dlv.undeliverable == False
        assert dlv.annotations is None


class RejectValue:
    def __init__(self, condition):
        self.condition = condition

    def apply(self, dlv):
        dlv.condition = self.condition

    def check(self, dlv):
        assert dlv.data is None, dlv.data
        assert dlv.section_number == 0
        assert dlv.section_offset == 0
        assert dlv.condition == self.condition, (dlv.condition, self.condition)
        assert dlv.failed == False
        assert dlv.undeliverable == False
        assert dlv.annotations is None


class ReceivedValue:
    def __init__(self, section_number, section_offset):
        self.section_number = section_number
        self.section_offset = section_offset

    def apply(self, dlv):
        dlv.section_number = self.section_number
        dlv.section_offset = self.section_offset

    def check(self, dlv):
        assert dlv.data is None, dlv.data
        assert dlv.section_number == self.section_number, (dlv.section_number, self.section_number)
        assert dlv.section_offset == self.section_offset
        assert dlv.condition is None
        assert dlv.failed == False
        assert dlv.undeliverable == False
        assert dlv.annotations is None


class ModifiedValue:
    def __init__(self, failed, undeliverable, annotations):
        self.failed = failed
        self.undeliverable = undeliverable
        self.annotations = annotations

    def apply(self, dlv):
        dlv.failed = self.failed
        dlv.undeliverable = self.undeliverable
        dlv.annotations = self.annotations

    def check(self, dlv):
        assert dlv.data is None, dlv.data
        assert dlv.section_number == 0
        assert dlv.section_offset == 0
        assert dlv.condition is None
        assert dlv.failed == self.failed
        assert dlv.undeliverable == self.undeliverable
        assert dlv.annotations == self.annotations, (dlv.annotations, self.annotations)


class CustomValue:
    def __init__(self, data):
        self.data = data

    def apply(self, dlv):
        dlv.data = self.data

    def check(self, dlv):
        assert dlv.data == self.data, (dlv.data, self.data)
        assert dlv.section_number == 0
        assert dlv.section_offset == 0
        assert dlv.condition is None
        assert dlv.failed == False
        assert dlv.undeliverable == False
        assert dlv.annotations is None


class DeliveryTest(Test):

    def tearDown(self):
        self.cleanup()

    def testDisposition(self, count=1, tag="tag%i", type=Delivery.ACCEPTED, value=NoValue()):
        snd, rcv = self.link("test-link")
        snd.open()
        rcv.open()

        snd_deliveries = []
        for i in range(count):
            d = snd.delivery(tag % i)
            snd_deliveries.append(d)
            snd.advance()

        rcv.flow(count)
        self.pump()

        rcv_deliveries = []
        for i in range(count):
            d = rcv.current
            assert d.tag == (tag % i)
            rcv_deliveries.append(d)
            rcv.advance()

        for d in rcv_deliveries:
            value.apply(d.local)
            d.update(type)

        self.pump()

        for d in snd_deliveries:
            assert d.remote_state == type
            assert d.remote.type == type
            value.check(d.remote)
            value.apply(d.local)
            d.update(type)

        self.pump()

        for d in rcv_deliveries:
            assert d.remote_state == type
            assert d.remote.type == type
            value.check(d.remote)

        for d in snd_deliveries:
            d.settle()

        self.pump()

        for d in rcv_deliveries:
            assert d.settled, d.settled
            d.settle()

    def testReceived(self):
        self.testDisposition(type=Disposition.RECEIVED, value=ReceivedValue(1, 2))

    def testRejected(self):
        self.testDisposition(type=Disposition.REJECTED, value=RejectValue(Condition(symbol("foo"))))

    def testReleased(self):
        self.testDisposition(type=Disposition.RELEASED)

    def testModified(self):
        self.testDisposition(type=Disposition.MODIFIED,
                             value=ModifiedValue(failed=True, undeliverable=True,
                                                 annotations={"key": "value"}))

    def testCustom(self):
        self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3]))


class CollectorTest(Test):

    def setUp(self):
        self.collector = Collector()

    def drain(self):
        result = []
        while True:
            e = self.collector.peek()
            if e:
                result.append(e)
                self.collector.pop()
            else:
                break
        return result

    def expect(self, *types):
        return self.expect_oneof(types)

    def expect_oneof(self, *sequences):
        events = self.drain()
        types = tuple([e.type for e in events])

        for alternative in sequences:
            if types == alternative:
                if len(events) == 1:
                    return events[0]
                elif len(events) > 1:
                    return events
                else:
                    return

        assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences)

    def expect_until(self, *types):
        events = self.drain()
        etypes = tuple([e.type for e in events[-len(types):]])
        assert etypes == types, "actual events %s did not end in expect sequence: %s" % (events, types)


class EventTest(CollectorTest):

    def tearDown(self):
        self.cleanup()

    def testEndpointEvents(self):
        c1, c2 = self.connection()
        c1.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        self.pump()
        self.expect()
        c2.open()
        self.pump()
        self.expect(Event.CONNECTION_REMOTE_OPEN)
        self.pump()
        self.expect()

        ssn = c2.session()
        snd = ssn.sender("sender")
        ssn.open()
        snd.open()

        self.expect()
        self.pump()
        self.expect(Event.SESSION_INIT, Event.SESSION_REMOTE_OPEN,
                    Event.LINK_INIT, Event.LINK_REMOTE_OPEN)

        c1.open()
        ssn2 = c1.session()
        ssn2.open()
        rcv = ssn2.receiver("receiver")
        rcv.open()
        self.pump()
        self.expect(Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
                    Event.SESSION_INIT, Event.SESSION_LOCAL_OPEN,
                    Event.TRANSPORT, Event.LINK_INIT, Event.LINK_LOCAL_OPEN,
                    Event.TRANSPORT)

        rcv.close()
        self.expect(Event.LINK_LOCAL_CLOSE, Event.TRANSPORT)
        self.pump()
        rcv.free()
        del rcv
        self.expect(Event.LINK_FINAL)
        ssn2.free()
        del ssn2
        self.pump()
        c1.free()
        c1.transport.unbind()
        self.expect_oneof((Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL,
                           Event.CONNECTION_UNBOUND, Event.CONNECTION_FINAL),
                          (Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.LINK_FINAL,
                           Event.SESSION_FINAL, Event.CONNECTION_FINAL))

    def testConnectionINIT_FINAL(self):
        c = Connection()
        c.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        c.free()
        self.expect(Event.CONNECTION_FINAL)

    def testSessionINIT_FINAL(self):
        c = Connection()
        c.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        s = c.session()
        self.expect(Event.SESSION_INIT)
        s.free()
        self.expect(Event.SESSION_FINAL)
        c.free()
        self.expect(Event.CONNECTION_FINAL)

    def testLinkINIT_FINAL(self):
        c = Connection()
        c.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        s = c.session()
        self.expect(Event.SESSION_INIT)
        r = s.receiver("asdf")
        self.expect(Event.LINK_INIT)
        r.free()
        self.expect(Event.LINK_FINAL)
        c.free()
        self.expect(Event.SESSION_FINAL, Event.CONNECTION_FINAL)

    def testFlowEvents(self):
        snd, rcv = self.link("test-link")
        snd.session.connection.collect(self.collector)
        rcv.open()
        rcv.flow(10)
        self.pump()
        self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT,
                    Event.LINK_INIT, Event.LINK_REMOTE_OPEN, Event.LINK_FLOW)
        rcv.flow(10)
        self.pump()
        self.expect(Event.LINK_FLOW)
        return snd, rcv

    def testDeliveryEvents(self):
        snd, rcv = self.link("test-link")
        rcv.session.connection.collect(self.collector)
        rcv.open()
        rcv.flow(10)
        self.pump()
        self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT,
                    Event.LINK_INIT, Event.LINK_LOCAL_OPEN, Event.TRANSPORT)
        snd.delivery("delivery")
        snd.send(b"Hello World!")
        snd.advance()
        self.pump()
        self.expect()
        snd.open()
        self.pump()
        self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY)
        rcv.session.connection.transport.unbind()
        rcv.session.connection.free()
        self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT, Event.LINK_FINAL,
                    Event.SESSION_FINAL, Event.CONNECTION_FINAL)

    def testDeliveryEventsDisp(self):
        snd, rcv = self.testFlowEvents()
        snd.open()
        dlv = snd.delivery("delivery")
        snd.send(b"Hello World!")
        assert snd.advance()
        self.expect(Event.LINK_LOCAL_OPEN, Event.TRANSPORT)
        self.pump()
        self.expect(Event.LINK_FLOW)
        rdlv = rcv.current
        assert rdlv is not None
        assert rdlv.tag == "delivery"
        rdlv.update(Delivery.ACCEPTED)
        self.pump()
        event = self.expect(Event.DELIVERY)
        assert event.context == dlv, (dlv, event.context)

    def testConnectionBOUND_UNBOUND(self):
        c = Connection()
        c.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        t = Transport()
        t.bind(c)
        self.expect(Event.CONNECTION_BOUND)
        t.unbind()
        self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT)

    def testTransportERROR_CLOSE(self):
        c = Connection()
        c.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        t = Transport()
        t.bind(c)
        self.expect(Event.CONNECTION_BOUND)
        assert t.condition is None
        t.push(b"asdf")
        self.expect(Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
        assert t.condition is not None
        assert t.condition.name == "amqp:connection:framing-error"
        assert "AMQP protocol header" in t.condition.description
        p = t.pending()
        assert p > 0
        t.pop(p)
        self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)

    def testTransportCLOSED(self):
        c = Connection()
        c.collect(self.collector)
        self.expect(Event.CONNECTION_INIT)
        t = Transport()
        t.bind(c)
        c.open()

        self.expect(Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT)

        c2 = Connection()
        t2 = Transport()
        t2.bind(c2)
        c2.open()
        c2.close()

        pump(t, t2)

        self.expect(Event.CONNECTION_REMOTE_OPEN, Event.CONNECTION_REMOTE_CLOSE,
                    Event.TRANSPORT_TAIL_CLOSED)

        c.close()

        pump(t, t2)

        self.expect(Event.CONNECTION_LOCAL_CLOSE, Event.TRANSPORT,
                    Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)

    def testLinkDetach(self):
        c1 = Connection()
        c1.collect(self.collector)
        t1 = Transport()
        t1.bind(c1)
        c1.open()
        s1 = c1.session()
        s1.open()
        l1 = s1.sender("asdf")
        l1.open()
        l1.detach()
        self.expect_until(Event.LINK_LOCAL_DETACH, Event.TRANSPORT)

        c2 = Connection()
        c2.collect(self.collector)
        t2 = Transport()
        t2.bind(c2)

        pump(t1, t2)

        self.expect_until(Event.LINK_REMOTE_DETACH)


class PeerTest(CollectorTest):

    def setUp(self):
        CollectorTest.setUp(self)
        self.connection = Connection()
        self.connection.collect(self.collector)
        self.transport = Transport()
        self.transport.bind(self.connection)
        self.peer = Connection()
        self.peer_transport = Transport()
        self.peer_transport.bind(self.peer)
        self.peer_transport.trace(Transport.TRACE_OFF)

    def pump(self):
        pump(self.transport, self.peer_transport)


class TeardownLeakTest(PeerTest):

    def doLeak(self, local, remote):
        self.connection.open()
        self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
                    Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT)

        ssn = self.connection.session()
        ssn.open()
        self.expect(Event.SESSION_INIT, Event.SESSION_LOCAL_OPEN, Event.TRANSPORT)

        snd = ssn.sender("sender")
        snd.open()
        self.expect(Event.LINK_INIT, Event.LINK_LOCAL_OPEN, Event.TRANSPORT)

        self.pump()

        self.peer.open()
        self.peer.session_head(0).open()
        self.peer.link_head(0).open()

        self.pump()
        self.expect_oneof((Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN,
                           Event.LINK_REMOTE_OPEN, Event.LINK_FLOW),
                          (Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN,
                           Event.LINK_REMOTE_OPEN))

        if local:
            snd.close()  # ha!!
            self.expect(Event.LINK_LOCAL_CLOSE, Event.TRANSPORT)
        ssn.close()
        self.expect(Event.SESSION_LOCAL_CLOSE, Event.TRANSPORT)
        self.connection.close()
        self.expect(Event.CONNECTION_LOCAL_CLOSE, Event.TRANSPORT)

        if remote:
            self.peer.link_head(0).close()  # ha!!
        self.peer.session_head(0).close()
        self.peer.close()

        self.pump()

        if remote:
            self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE,
                        Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE,
                        Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_CLOSED)
        else:
            self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.SESSION_REMOTE_CLOSE,
                        Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED,
                        Event.TRANSPORT_CLOSED)

        self.connection.free()
        self.expect(Event.LINK_FINAL, Event.SESSION_FINAL)
        self.transport.unbind()

        self.expect(Event.CONNECTION_UNBOUND, Event.CONNECTION_FINAL)

    def testLocalRemoteLeak(self):
        self.doLeak(True, True)

    def testLocalLeak(self):
        self.doLeak(True, False)

    def testRemoteLeak(self):
        self.doLeak(False, True)

    def testLeak(self):
        self.doLeak(False, False)


class IdleTimeoutEventTest(PeerTest):

    def half_pump(self):
        p = self.transport.pending()
        if p > 0:
            self.transport.pop(p)

    def testTimeoutWithZombieServer(self, expectOpenCloseFrames=True):
        self.transport.idle_timeout = self.delay
        self.connection.open()
        self.half_pump()
        t = time()
        self.transport.tick(t)
        self.transport.tick(t + self.delay * 4)
        self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND,
                    Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT,
                    Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED)
        assert self.transport.capacity() < 0
        if expectOpenCloseFrames:
            assert self.transport.pending() > 0
        self.half_pump()
        self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED)
        assert self.transport.pending() < 0

    def testTimeoutWithZombieServerAndSASL(self):
        sasl = self.transport.sasl()
        self.testTimeoutWithZombieServer(expectOpenCloseFrames=False)


class DeliverySegFaultTest(Test):

    def testDeliveryAfterUnbind(self):
        conn = Connection()
        t = Transport()
        ssn = conn.session()
        snd = ssn.sender("sender")
        dlv = snd.delivery("tag")
        dlv.settle()
        del dlv
        t.bind(conn)
        t.unbind()
        dlv = snd.delivery("tag")


class SaslEventTest(CollectorTest):

    def testAnonymousNoInitialResponse(self):
        conn = Connection()
        conn.collect(self.collector)
        transport = Transport(Transport.SERVER)
        transport.bind(conn)
        self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND)

        transport.push(b'AMQP\x03\x01\x00\x00\x00\x00\x00 \x02\x01\x00\x00\x00SA'
                       b'\xd0\x00\x00\x00\x10\x00\x00\x00\x02\xa3\tANONYMOUS@'
                       b'AMQP\x00\x01\x00\x00')
        self.expect(Event.TRANSPORT)
        for i in range(1024):
            p = transport.pending()
            self.drain()
        p = transport.pending()
        self.expect()

    def testPipelinedServerReadFirst(self):
        conn = Connection()
        conn.collect(self.collector)
        transport = Transport(Transport.CLIENT)
        s = transport.sasl()
        s.allowed_mechs("ANONYMOUS PLAIN")
        transport.bind(conn)
        self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND)
        transport.push(
            # SASL
            b'AMQP\x03\x01\x00\x00'
            # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]]
            b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS'
            # @sasl-outcome(68) [code=0]
            b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00'
            # AMQP
            b'AMQP\x00\x01\x00\x00'
        )
        self.expect(Event.TRANSPORT)
        p = transport.pending()
        bytes = transport.peek(p)
        transport.pop(p)

        server = Transport(Transport.SERVER)
        server.push(bytes)
        assert s.outcome == SASL.OK
        assert server.sasl().outcome == SASL.OK

    def testPipelinedServerWriteFirst(self):
        conn = Connection()
        conn.collect(self.collector)
        transport = Transport(Transport.CLIENT)
        s = transport.sasl()
        s.allowed_mechs("ANONYMOUS")
        transport.bind(conn)
        p = transport.pending()
        bytes = transport.peek(p)
        transport.pop(p)
        self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND)
        transport.push(
            # SASL
            b'AMQP\x03\x01\x00\x00'
            # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]]
            b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS'
            # @sasl-outcome(68) [code=0]
            b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00'
            # AMQP
            b'AMQP\x00\x01\x00\x00'
        )
        self.expect(Event.TRANSPORT)
        p = transport.pending()
        bytes = transport.peek(p)
        transport.pop(p)
        assert s.outcome == SASL.OK
        # XXX: the bytes above appear to be correct, but we don't get any
        # sort of event indicating that the transport is authenticated
