File: test_client.py

package info (click to toggle)
azure-uamqp-python 1.6.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 35,564 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,731; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (35 lines) | stat: -rw-r--r-- 998 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import types
from uamqp import AMQPClient
from uamqp import authentication


def test_keep_alive_thread_fail_to_start():

    class MockThread:
        def __init__(self):
            pass

        def start(self):
            raise RuntimeError("Fail to start")

        def join(self):
            raise RuntimeError("Fail to join")

    def hack_open(ins):
        ins._keep_alive_thread = MockThread()
        ins._keep_alive_thread.start()

    sas_auth = authentication.SASTokenAuth.from_shared_access_key(
        "sb://fake/fake", "fake", "fake")
    target = "amqps://{}/{}".format("fake", "fake")
    client = AMQPClient(target, auth=sas_auth, keep_alive_interval=10)
    client.open = types.MethodType(hack_open, client)
    try:
        client.open()
    except Exception as exc:
        assert type(exc) == RuntimeError
        client.close()
        assert not client._keep_alive_thread

    # check that kwargs can be passed to client.do_work
    client.do_work(fake_kwarg="ignore")