File: test_errors.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (44 lines) | stat: -rw-r--r-- 1,805 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import logging

from uamqp import errors as AMQPErrors, constants as AMQPConstants
from azure.servicebus.exceptions import (
    _create_servicebus_exception,
    ServiceBusConnectionError,
    ServiceBusError
)


def test_link_idle_timeout():
    logger = logging.getLogger("testlogger")
    amqp_error = AMQPErrors.LinkDetach(AMQPConstants.ErrorCodes.LinkDetachForced, description="Details: AmqpMessageConsumer.IdleTimerExpired: Idle timeout: 00:10:00.")
    sb_error = _create_servicebus_exception(logger, amqp_error)
    assert isinstance(sb_error, ServiceBusConnectionError)
    assert sb_error._retryable
    assert sb_error._shutdown_handler


def test_unknown_connection_error():
    logger = logging.getLogger("testlogger")
    amqp_error = AMQPErrors.AMQPConnectionError(AMQPConstants.ErrorCodes.UnknownError)
    sb_error = _create_servicebus_exception(logger, amqp_error)
    assert isinstance(sb_error,ServiceBusConnectionError)
    assert sb_error._retryable
    assert sb_error._shutdown_handler

    amqp_error = AMQPErrors.AMQPError(AMQPConstants.ErrorCodes.UnknownError)
    sb_error = _create_servicebus_exception(logger, amqp_error)
    assert not isinstance(sb_error,ServiceBusConnectionError)
    assert isinstance(sb_error,ServiceBusError)
    assert not sb_error._retryable
    assert sb_error._shutdown_handler

def test_internal_server_error():
    logger = logging.getLogger("testlogger")
    amqp_error = AMQPErrors.LinkDetach(
        description="The service was unable to process the request; please retry the operation.",
        condition=AMQPConstants.ErrorCodes.InternalServerError
    )
    sb_error = _create_servicebus_exception(logger, amqp_error)
    assert isinstance(sb_error, ServiceBusError)
    assert sb_error._retryable
    assert sb_error._shutdown_handler