File: test_async_transport.py

package info (click to toggle)
python-zeep 4.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,604 kB
  • sloc: python: 15,562; makefile: 13
file content (75 lines) | stat: -rw-r--r-- 2,219 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import pytest
from lxml import etree
from pretend import stub
from pytest_httpx import HTTPXMock

from zeep import exceptions
from zeep.cache import InMemoryCache
from zeep.transports import AsyncTransport


@pytest.mark.requests
def test_no_cache(event_loop):
    transport = AsyncTransport()
    assert transport.cache is None


@pytest.mark.requests
def test_load(httpx_mock):
    cache = stub(get=lambda url: None, add=lambda url, content: None)
    transport = AsyncTransport(cache=cache)

    httpx_mock.add_response(url="http://tests.python-zeep.org/test.xml", content="x")
    result = transport.load("http://tests.python-zeep.org/test.xml")
    assert result == b"x"


@pytest.mark.requests
@pytest.mark.asyncio
def test_load_cache(httpx_mock):
    cache = InMemoryCache()
    transport = AsyncTransport(cache=cache)

    httpx_mock.add_response(url="http://tests.python-zeep.org/test.xml", content="x")
    result = transport.load("http://tests.python-zeep.org/test.xml")
    assert result == b"x"

    assert cache.get("http://tests.python-zeep.org/test.xml") == b"x"


@pytest.mark.requests
@pytest.mark.asyncio
async def test_post(httpx_mock: HTTPXMock):
    cache = stub(get=lambda url: None, add=lambda url, content: None)
    transport = AsyncTransport(cache=cache)

    envelope = etree.Element("Envelope")

    httpx_mock.add_response(url="http://tests.python-zeep.org/test.xml", content="x")
    result = await transport.post_xml(
        "http://tests.python-zeep.org/test.xml", envelope=envelope, headers={}
    )

    assert result.content == b"x"
    await transport.aclose()


@pytest.mark.requests
@pytest.mark.asyncio
async def test_session_close(httpx_mock: HTTPXMock):
    transport = AsyncTransport()
    return await transport.aclose()


@pytest.mark.requests
@pytest.mark.asyncio
async def test_http_error(httpx_mock: HTTPXMock):
    transport = AsyncTransport()

    httpx_mock.add_response(
        url="http://tests.python-zeep.org/test.xml", content="x", status_code=500
    )
    with pytest.raises(exceptions.TransportError) as exc:
        transport.load("http://tests.python-zeep.org/test.xml")
        assert exc.value.status_code == 500
        assert exc.value.message is None