File: test_monitor.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (23 lines) | stat: -rw-r--r-- 791 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pytest

from aiobotocore.session import AioSession


@pytest.mark.moto
@pytest.mark.asyncio
async def test_monitor_response_received(session: AioSession, s3_client):
    # Basic smoke test to ensure we can talk to s3.
    handler_kwargs = {}

    def handler(**kwargs):
        nonlocal handler_kwargs
        handler_kwargs = kwargs

    s3_client.meta.events.register('response-received.s3.ListBuckets', handler)
    result = await s3_client.list_buckets()
    # Can't really assume anything about whether or not they have buckets,
    # but we can assume something about the structure of the response.
    actual_keys = sorted(list(result.keys()))
    assert actual_keys == ['Buckets', 'Owner', 'ResponseMetadata']

    assert handler_kwargs['response_dict']['status_code'] == 200