File: __init__.py

package info (click to toggle)
python-aioamqp 0.15.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 456 kB
  • sloc: python: 2,741; makefile: 187
file content (103 lines) | stat: -rw-r--r-- 3,717 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import asyncio
import socket
import ssl
from urllib.parse import urlparse

from .exceptions import *  # pylint: disable=wildcard-import
from .protocol import AmqpProtocol

from .version import __version__
from .version import __packagename__


async def connect(host='localhost', port=None, login='guest', password='guest',
            virtualhost='/', ssl=None, login_method='PLAIN', insist=False,  # pylint: disable=redefined-outer-name
            protocol_factory=AmqpProtocol, **kwargs):
    """Convenient method to connect to an AMQP broker

        @host:          the host to connect to
        @port:          broker port
        @login:         login
        @password:      password
        @virtualhost:   AMQP virtualhost to use for this connection
        @ssl:           SSL context used for secure connections, omit for no SSL
                        - see https://docs.python.org/3/library/ssl.html
        @login_method:  AMQP auth method
        @insist:        Insist on connecting to a server
        @protocol_factory:
                        Factory to use, if you need to subclass AmqpProtocol

        @kwargs:        Arguments to be given to the protocol_factory instance

        Returns:        a tuple (transport, protocol) of an AmqpProtocol instance
    """
    factory = lambda: protocol_factory(**kwargs)  # pylint: disable=unnecessary-lambda

    create_connection_kwargs = {}

    if ssl is not None:
        create_connection_kwargs['ssl'] = ssl

    if port is None:
        if ssl:
            port = 5671
        else:
            port = 5672

    transport, protocol = await asyncio.get_running_loop().create_connection(
        factory, host, port, **create_connection_kwargs
    )

    # these 2 flags *may* show up in sock.type. They are only available on linux
    # see https://bugs.python.org/issue21327
    nonblock = getattr(socket, 'SOCK_NONBLOCK', 0)
    cloexec = getattr(socket, 'SOCK_CLOEXEC', 0)
    sock = transport.get_extra_info('socket')
    if sock is not None and (sock.type & ~nonblock & ~cloexec) == socket.SOCK_STREAM:
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    try:
        await protocol.start_connection(host, port, login, password, virtualhost, ssl=ssl, login_method=login_method,
                                        insist=insist)
    except Exception:
        await protocol.wait_closed()
        raise

    return transport, protocol


async def from_url(
        url, login_method='PLAIN', insist=False, protocol_factory=AmqpProtocol, **kwargs):
    """ Connect to the AMQP using a single url parameter and return the client.

        For instance:

            amqp://user:password@hostname:port/vhost

        @insist:        Insist on connecting to a server
        @protocol_factory:
                        Factory to use, if you need to subclass AmqpProtocol

        @kwargs:        Arguments to be given to the protocol_factory instance

        Returns:        a tuple (transport, protocol) of an AmqpProtocol instance
    """
    url = urlparse(url)

    if url.scheme not in ('amqp', 'amqps'):
        raise ValueError(f'Invalid protocol {url.scheme}, valid protocols are amqp or amqps')

    if url.scheme == 'amqps' and not kwargs.get('ssl'):
        kwargs['ssl'] = ssl.create_default_context()

    transport, protocol = await connect(
        host=url.hostname or 'localhost',
        port=url.port,
        login=url.username or 'guest',
        password=url.password or 'guest',
        virtualhost=(url.path[1:] if len(url.path) > 1 else '/'),
        login_method=login_method,
        insist=insist,
        protocol_factory=protocol_factory,
        **kwargs)
    return transport, protocol