File: responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (130 lines) | stat: -rw-r--r-- 5,453 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Handles Firehose API requests, invokes method and returns response."""

import json

from moto.core.responses import BaseResponse

from .models import FirehoseBackend, firehose_backends


class FirehoseResponse(BaseResponse):
    """Handler for Firehose requests and responses."""

    def __init__(self) -> None:
        super().__init__(service_name="firehose")

    @property
    def firehose_backend(self) -> FirehoseBackend:
        """Return backend instance specific to this region."""
        return firehose_backends[self.current_account][self.region]

    def create_delivery_stream(self) -> str:
        """Prepare arguments and respond to CreateDeliveryStream request."""
        delivery_stream_arn = self.firehose_backend.create_delivery_stream(
            self.region,
            self._get_param("DeliveryStreamName"),
            self._get_param("DeliveryStreamType"),
            self._get_param("KinesisStreamSourceConfiguration"),
            self._get_param("DeliveryStreamEncryptionConfigurationInput"),
            self._get_param("S3DestinationConfiguration"),
            self._get_param("ExtendedS3DestinationConfiguration"),
            self._get_param("RedshiftDestinationConfiguration"),
            self._get_param("ElasticsearchDestinationConfiguration"),
            self._get_param("SplunkDestinationConfiguration"),
            self._get_param("HttpEndpointDestinationConfiguration"),
            self._get_param("SnowflakeDestinationConfiguration"),
            self._get_param("Tags"),
        )
        return json.dumps({"DeliveryStreamARN": delivery_stream_arn})

    def delete_delivery_stream(self) -> str:
        """Prepare arguments and respond to DeleteDeliveryStream request."""
        self.firehose_backend.delete_delivery_stream(
            self._get_param("DeliveryStreamName")
        )
        return json.dumps({})

    def describe_delivery_stream(self) -> str:
        """Prepare arguments and respond to DescribeDeliveryStream request."""
        result = self.firehose_backend.describe_delivery_stream(
            self._get_param("DeliveryStreamName")
        )
        return json.dumps(result)

    def list_delivery_streams(self) -> str:
        """Prepare arguments and respond to ListDeliveryStreams request."""
        stream_list = self.firehose_backend.list_delivery_streams(
            self._get_param("Limit"),
            self._get_param("DeliveryStreamType"),
            self._get_param("ExclusiveStartDeliveryStreamName"),
        )
        return json.dumps(stream_list)

    def list_tags_for_delivery_stream(self) -> str:
        """Prepare arguments and respond to ListTagsForDeliveryStream()."""
        result = self.firehose_backend.list_tags_for_delivery_stream(
            self._get_param("DeliveryStreamName"),
            self._get_param("ExclusiveStartTagKey"),
            self._get_param("Limit"),
        )
        return json.dumps(result)

    def put_record(self) -> str:
        """Prepare arguments and response to PutRecord()."""
        result = self.firehose_backend.put_record(
            self._get_param("DeliveryStreamName"), self._get_param("Record")
        )
        return json.dumps(result)

    def put_record_batch(self) -> str:
        """Prepare arguments and response to PutRecordBatch()."""
        result = self.firehose_backend.put_record_batch(
            self._get_param("DeliveryStreamName"), self._get_param("Records")
        )
        return json.dumps(result)

    def tag_delivery_stream(self) -> str:
        """Prepare arguments and respond to TagDeliveryStream request."""
        self.firehose_backend.tag_delivery_stream(
            self._get_param("DeliveryStreamName"), self._get_param("Tags")
        )
        return json.dumps({})

    def untag_delivery_stream(self) -> str:
        """Prepare arguments and respond to UntagDeliveryStream()."""
        self.firehose_backend.untag_delivery_stream(
            self._get_param("DeliveryStreamName"), self._get_param("TagKeys")
        )
        return json.dumps({})

    def update_destination(self) -> str:
        """Prepare arguments and respond to UpdateDestination()."""
        self.firehose_backend.update_destination(
            self._get_param("DeliveryStreamName"),
            self._get_param("CurrentDeliveryStreamVersionId"),
            self._get_param("DestinationId"),
            self._get_param("S3DestinationUpdate"),
            self._get_param("ExtendedS3DestinationUpdate"),
            self._get_param("S3BackupMode"),
            self._get_param("RedshiftDestinationUpdate"),
            self._get_param("ElasticsearchDestinationUpdate"),
            self._get_param("SplunkDestinationUpdate"),
            self._get_param("HttpEndpointDestinationUpdate"),
            self._get_param("SnowflakeDestinationConfiguration"),
        )
        return json.dumps({})

    def start_delivery_stream_encryption(self) -> str:
        stream_name = self._get_param("DeliveryStreamName")
        encryption_config = self._get_param(
            "DeliveryStreamEncryptionConfigurationInput"
        )
        self.firehose_backend.start_delivery_stream_encryption(
            stream_name, encryption_config
        )
        return "{}"

    def stop_delivery_stream_encryption(self) -> str:
        stream_name = self._get_param("DeliveryStreamName")
        self.firehose_backend.stop_delivery_stream_encryption(stream_name)
        return "{}"