File: responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (114 lines) | stat: -rw-r--r-- 4,393 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import json
import re
from typing import Optional

from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse
from moto.utilities.utils import ARN_PARTITION_REGEX

from .models import ElasticTranscoderBackend, elastictranscoder_backends


class ElasticTranscoderResponse(BaseResponse):
    def __init__(self) -> None:
        super().__init__(service_name="elastictranscoder")

    @property
    def elastictranscoder_backend(self) -> ElasticTranscoderBackend:
        return elastictranscoder_backends[self.current_account][self.region]

    def create_pipeline(self) -> TYPE_RESPONSE:
        name = self._get_param("Name")
        input_bucket = self._get_param("InputBucket")
        output_bucket = self._get_param("OutputBucket")
        role = self._get_param("Role")
        content_config = self._get_param("ContentConfig")
        thumbnail_config = self._get_param("ThumbnailConfig")
        if not role:
            return self.err("Role cannot be blank")
        if not re.match(f"{ARN_PARTITION_REGEX}:iam::[0-9]+:role/.+", role):
            return self.err(f"Role ARN is invalid: {role}")
        if not output_bucket and not content_config:
            return self.err(
                "[OutputBucket and ContentConfig:Bucket are not allowed to both be null.]"
            )
        if output_bucket and content_config:
            return self.err("[OutputBucket and ContentConfig are mutually exclusive.]")
        if content_config and not thumbnail_config:
            return self.err(
                "[ThumbnailConfig:Bucket is not allowed to be null if ContentConfig is specified.]"
            )
        pipeline, warnings = self.elastictranscoder_backend.create_pipeline(
            name=name,
            input_bucket=input_bucket,
            output_bucket=output_bucket,
            role=role,
            content_config=content_config,
            thumbnail_config=thumbnail_config,
        )
        return (
            201,
            {"status": 201},
            json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}),
        )

    def list_pipelines(self) -> TYPE_RESPONSE:
        return (
            200,
            {},
            json.dumps({"Pipelines": self.elastictranscoder_backend.list_pipelines()}),
        )

    def validate_pipeline_id(self, pipeline_id: str) -> Optional[TYPE_RESPONSE]:
        r = "^\\d{13}-\\w{6}$"
        if not re.match(r, pipeline_id):
            msg = f"1 validation error detected: Value '{pipeline_id}' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: {r}"
            return self.err(msg)
        try:
            self.elastictranscoder_backend.read_pipeline(pipeline_id)
            return None
        except KeyError:
            msg = f"The specified pipeline was not found: account={self.current_account}, pipelineId={pipeline_id}."
            return (
                404,
                {"status": 404, "x-amzn-ErrorType": "ResourceNotFoundException"},
                json.dumps({"message": msg}),
            )

    def read_pipeline(self) -> TYPE_RESPONSE:
        _id = self.path.rsplit("/", 1)[-1]
        err = self.validate_pipeline_id(_id)
        if err:
            return err
        pipeline = self.elastictranscoder_backend.read_pipeline(_id)
        return 200, {}, json.dumps({"Pipeline": pipeline.to_dict()})

    def update_pipeline(self) -> TYPE_RESPONSE:
        _id = self.path.rsplit("/", 1)[-1]
        name = self._get_param("Name")
        input_bucket = self._get_param("InputBucket")
        role = self._get_param("Role")
        err = self.validate_pipeline_id(_id)
        if err:
            return err
        pipeline, warnings = self.elastictranscoder_backend.update_pipeline(
            pipeline_id=_id, name=name, input_bucket=input_bucket, role=role
        )

        return (
            200,
            {},
            json.dumps({"Pipeline": pipeline.to_dict(), "Warnings": warnings}),
        )

    def delete_pipeline(self) -> str:
        _id = self.path.rsplit("/", 1)[-1]
        self.elastictranscoder_backend.delete_pipeline(pipeline_id=_id)
        return "{}"

    def err(self, msg: str) -> TYPE_RESPONSE:
        return (
            400,
            {"status": 400, "x-amzn-ErrorType": "ValidationException"},
            json.dumps({"message": msg}),
        )