File: test_rabbitmq_send.py

package info (click to toggle)
azure-uamqp-python 1.6.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 35,564 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,731; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (49 lines) | stat: -rw-r--r-- 1,667 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import logging
import sys
import datetime
import uuid
import pytest

import uamqp
from uamqp import authentication


def get_logger(level):
    uamqp_logger = logging.getLogger("uamqp")
    if not uamqp_logger.handlers:
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
        uamqp_logger.addHandler(handler)
    uamqp_logger.setLevel(level)
    return uamqp_logger


log = get_logger(logging.INFO)


def test_rabbitmq_client_send_sync(rabbit_mq_config):
    pytest.skip("Not working yet")
    uri = "amqp://{}/{}/".format(rabbit_mq_config['hostname'], rabbit_mq_config['path'])
    sas_auth = uamqp.authentication.SASLAnonymous(hostname=rabbit_mq_config['hostname'], port=5672)
    send_client = uamqp.SendClient(uri, auth=sas_auth, debug=False)
    try:
        message = uamqp.Message("content")
        send_client.queue_message(message)
        results = send_client.send_all_messages(close_on_done=False)
        assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
    finally:
        send_client.close()

if __name__ == '__main__':
    config = {}
    config['hostname'] = os.environ['RABBITMQ_HOSTNAME']
    config['path'] = os.environ['RABBITMQ_PATH']

    test_rabbitmq_client_send_sync(config)