File: test_session.py

package info (click to toggle)
python-aiobotocore 2.25.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,524 kB
  • sloc: python: 15,437; makefile: 84
file content (56 lines) | stat: -rw-r--r-- 1,548 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import logging

import pytest
from _pytest.logging import LogCaptureFixture

from aiobotocore import __version__, httpsession
from aiobotocore.config import AioConfig
from aiobotocore.session import AioSession


async def test_get_service_data(session):
    handler_called = False

    def handler(**kwargs):
        nonlocal handler_called
        handler_called = True

    session.register('service-data-loaded.s3', handler)
    await session.get_service_data('s3')

    assert handler_called


async def test_retry(
    session: AioSession, caplog: LogCaptureFixture, monkeypatch
):
    caplog.set_level(logging.DEBUG)

    config = AioConfig(
        connect_timeout=1,
        read_timeout=1,
        # this goes through a slightly different codepath than regular retries
        retries={
            "mode": "standard",
            "total_max_attempts": 3,
        },
    )

    async with session.create_client(
        's3',
        config=config,
        aws_secret_access_key="xxx",
        aws_access_key_id="xxx",
        endpoint_url='http://localhost:7878',
    ) as client:
        # this needs the new style exceptions to work
        with pytest.raises(httpsession.EndpointConnectionError):
            await client.get_object(Bucket='foo', Key='bar')

        assert 'sleeping for' in caplog.text


async def test_set_user_agent_for_session(session: AioSession):
    assert session.user_agent_name == "aiobotocore"
    assert session.user_agent_version == __version__
    assert session.user_agent_extra.startswith("botocore/")