File: responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (84 lines) | stat: -rw-r--r-- 3,198 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import json

from moto.core.responses import BaseResponse

from .models import MediaStoreBackend, mediastore_backends


class MediaStoreResponse(BaseResponse):
    def __init__(self) -> None:
        super().__init__(service_name="mediastore")

    @property
    def mediastore_backend(self) -> MediaStoreBackend:
        return mediastore_backends[self.current_account][self.region]

    def create_container(self) -> str:
        name = self._get_param("ContainerName")
        tags = self._get_param("Tags")
        container = self.mediastore_backend.create_container(name=name, tags=tags)
        return json.dumps({"Container": container.to_dict()})

    def delete_container(self) -> str:
        name = self._get_param("ContainerName")
        self.mediastore_backend.delete_container(name=name)
        return "{}"

    def describe_container(self) -> str:
        name = self._get_param("ContainerName")
        container = self.mediastore_backend.describe_container(name=name)
        return json.dumps({"Container": container.to_dict()})

    def list_containers(self) -> str:
        containers = self.mediastore_backend.list_containers()
        return json.dumps(dict({"Containers": containers}, NextToken=None))

    def list_tags_for_resource(self) -> str:
        name = self._get_param("Resource")
        tags = self.mediastore_backend.list_tags_for_resource(name)
        return json.dumps({"Tags": tags})

    def put_lifecycle_policy(self) -> str:
        container_name = self._get_param("ContainerName")
        lifecycle_policy = self._get_param("LifecyclePolicy")
        self.mediastore_backend.put_lifecycle_policy(
            container_name=container_name, lifecycle_policy=lifecycle_policy
        )
        return "{}"

    def get_lifecycle_policy(self) -> str:
        container_name = self._get_param("ContainerName")
        lifecycle_policy = self.mediastore_backend.get_lifecycle_policy(
            container_name=container_name
        )
        return json.dumps({"LifecyclePolicy": lifecycle_policy})

    def put_container_policy(self) -> str:
        container_name = self._get_param("ContainerName")
        policy = self._get_param("Policy")
        self.mediastore_backend.put_container_policy(
            container_name=container_name, policy=policy
        )
        return "{}"

    def get_container_policy(self) -> str:
        container_name = self._get_param("ContainerName")
        policy = self.mediastore_backend.get_container_policy(
            container_name=container_name
        )
        return json.dumps({"Policy": policy})

    def put_metric_policy(self) -> str:
        container_name = self._get_param("ContainerName")
        metric_policy = self._get_param("MetricPolicy")
        self.mediastore_backend.put_metric_policy(
            container_name=container_name, metric_policy=metric_policy
        )
        return json.dumps(metric_policy)

    def get_metric_policy(self) -> str:
        container_name = self._get_param("ContainerName")
        metric_policy = self.mediastore_backend.get_metric_policy(
            container_name=container_name
        )
        return json.dumps({"MetricPolicy": metric_policy})