File: models.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (123 lines) | stat: -rw-r--r-- 4,412 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from typing import Any

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import utcnow
from moto.moto_api._internal import mock_random as random
from moto.utilities.utils import get_partition

from .exceptions import ResourceInUseException, ResourceNotFoundException


class Stream(BaseModel):
    def __init__(
        self,
        account_id: str,
        region_name: str,
        device_name: str,
        stream_name: str,
        media_type: str,
        kms_key_id: str,
        data_retention_in_hours: int,
        tags: dict[str, str],
    ):
        self.region_name = region_name
        self.stream_name = stream_name
        self.device_name = device_name
        self.media_type = media_type
        self.kms_key_id = kms_key_id
        self.data_retention_in_hours = data_retention_in_hours
        self.tags = tags
        self.status = "ACTIVE"
        self.version = random.get_random_string(include_digits=False, lower_case=True)
        self.creation_time = utcnow()
        stream_arn = f"arn:{get_partition(region_name)}:kinesisvideo:{region_name}:{account_id}:stream/{stream_name}/1598784211076"
        self.data_endpoint_number = random.get_random_hex()
        self.arn = stream_arn

    def get_data_endpoint(self, api_name: str) -> str:
        data_endpoint_prefix = "s-" if api_name in ("PUT_MEDIA", "GET_MEDIA") else "b-"
        return f"https://{data_endpoint_prefix}{self.data_endpoint_number}.kinesisvideo.{self.region_name}.amazonaws.com"

    def to_dict(self) -> dict[str, Any]:
        return {
            "DeviceName": self.device_name,
            "StreamName": self.stream_name,
            "StreamARN": self.arn,
            "MediaType": self.media_type,
            "KmsKeyId": self.kms_key_id,
            "Version": self.version,
            "Status": self.status,
            "CreationTime": self.creation_time.isoformat(),
            "DataRetentionInHours": self.data_retention_in_hours,
        }


class KinesisVideoBackend(BaseBackend):
    def __init__(self, region_name: str, account_id: str):
        super().__init__(region_name, account_id)
        self.streams: dict[str, Stream] = {}

    def create_stream(
        self,
        device_name: str,
        stream_name: str,
        media_type: str,
        kms_key_id: str,
        data_retention_in_hours: int,
        tags: dict[str, str],
    ) -> str:
        streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
        if len(streams) > 0:
            raise ResourceInUseException(f"The stream {stream_name} already exists.")
        stream = Stream(
            self.account_id,
            self.region_name,
            device_name,
            stream_name,
            media_type,
            kms_key_id,
            data_retention_in_hours,
            tags,
        )
        self.streams[stream.arn] = stream
        return stream.arn

    def _get_stream(self, stream_name: str, stream_arn: str) -> Stream:
        if stream_name:
            streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
            if len(streams) == 0:
                raise ResourceNotFoundException()
            return streams[0]
        # Assume stream_arn is supplied instead
        if not (stream := self.streams.get(stream_arn)):
            raise ResourceNotFoundException()
        return stream

    def describe_stream(self, stream_name: str, stream_arn: str) -> dict[str, Any]:
        stream = self._get_stream(stream_name, stream_arn)
        return stream.to_dict()

    def list_streams(self) -> list[dict[str, Any]]:
        """
        Pagination and the StreamNameCondition-parameter are not yet implemented
        """
        return [_.to_dict() for _ in self.streams.values()]

    def delete_stream(self, stream_arn: str) -> None:
        """
        The CurrentVersion-parameter is not yet implemented
        """
        stream = self.streams.get(stream_arn)
        if stream is None:
            raise ResourceNotFoundException()
        del self.streams[stream_arn]

    def get_data_endpoint(
        self, stream_name: str, stream_arn: str, api_name: str
    ) -> str:
        stream = self._get_stream(stream_name, stream_arn)
        return stream.get_data_endpoint(api_name)


kinesisvideo_backends = BackendDict(KinesisVideoBackend, "kinesisvideo")