File: test_events.py

package info (click to toggle)
python-podman 5.4.0.1-2~bpo12%2B1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm-backports
  • size: 1,140 kB
  • sloc: python: 7,532; makefile: 82; sh: 75
file content (67 lines) | stat: -rw-r--r-- 1,816 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import io
import json
import unittest
from types import GeneratorType

import requests_mock

from podman import PodmanClient, tests
from podman.domain.events import EventsManager


class EventsManagerTestCase(unittest.TestCase):
    def setUp(self) -> None:
        super().setUp()

        self.client = PodmanClient(base_url=tests.BASE_SOCK)

    def tearDown(self) -> None:
        super().tearDown()

        self.client.close()

    @requests_mock.Mocker()
    def test_list(self, mock):
        stream = [
            {
                "Type": "pod",
                "Action": "create",
                "Actor": {
                    "ID": "",
                    "Attributes": {
                        "image": "",
                        "name": "",
                        "containerExitCode": 0,
                    },
                },
                "Scope": "local",
                "Time": 1615845480,
                "TimeNano": 1615845480,
            }
        ]
        buffer = io.StringIO()
        for item in stream:
            buffer.write(json.JSONEncoder().encode(item))
            buffer.write("\n")

        adapter = mock.get(tests.LIBPOD_URL + "/events", text=buffer.getvalue())  # noqa: F841

        manager = EventsManager(client=self.client.api)
        actual = manager.list(decode=True)
        self.assertIsInstance(actual, GeneratorType)

        for item in actual:
            self.assertIsInstance(item, dict)
            self.assertEqual(item["Type"], "pod")

        actual = manager.list(decode=False)
        self.assertIsInstance(actual, GeneratorType)

        for item in actual:
            self.assertIsInstance(item, bytes)
            event = json.loads(item)
            self.assertEqual(event["Type"], "pod")


if __name__ == '__main__':
    unittest.main()