1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
"""Handles Firehose API requests, invokes method and returns response."""
import json
from moto.core.responses import BaseResponse
from .models import FirehoseBackend, firehose_backends
class FirehoseResponse(BaseResponse):
"""Handler for Firehose requests and responses."""
def __init__(self) -> None:
super().__init__(service_name="firehose")
@property
def firehose_backend(self) -> FirehoseBackend:
"""Return backend instance specific to this region."""
return firehose_backends[self.current_account][self.region]
def create_delivery_stream(self) -> str:
"""Prepare arguments and respond to CreateDeliveryStream request."""
delivery_stream_arn = self.firehose_backend.create_delivery_stream(
self.region,
self._get_param("DeliveryStreamName"),
self._get_param("DeliveryStreamType"),
self._get_param("KinesisStreamSourceConfiguration"),
self._get_param("DeliveryStreamEncryptionConfigurationInput"),
self._get_param("S3DestinationConfiguration"),
self._get_param("ExtendedS3DestinationConfiguration"),
self._get_param("RedshiftDestinationConfiguration"),
self._get_param("ElasticsearchDestinationConfiguration"),
self._get_param("SplunkDestinationConfiguration"),
self._get_param("HttpEndpointDestinationConfiguration"),
self._get_param("SnowflakeDestinationConfiguration"),
self._get_param("Tags"),
)
return json.dumps({"DeliveryStreamARN": delivery_stream_arn})
def delete_delivery_stream(self) -> str:
"""Prepare arguments and respond to DeleteDeliveryStream request."""
self.firehose_backend.delete_delivery_stream(
self._get_param("DeliveryStreamName")
)
return json.dumps({})
def describe_delivery_stream(self) -> str:
"""Prepare arguments and respond to DescribeDeliveryStream request."""
result = self.firehose_backend.describe_delivery_stream(
self._get_param("DeliveryStreamName")
)
return json.dumps(result)
def list_delivery_streams(self) -> str:
"""Prepare arguments and respond to ListDeliveryStreams request."""
stream_list = self.firehose_backend.list_delivery_streams(
self._get_param("Limit"),
self._get_param("DeliveryStreamType"),
self._get_param("ExclusiveStartDeliveryStreamName"),
)
return json.dumps(stream_list)
def list_tags_for_delivery_stream(self) -> str:
"""Prepare arguments and respond to ListTagsForDeliveryStream()."""
result = self.firehose_backend.list_tags_for_delivery_stream(
self._get_param("DeliveryStreamName"),
self._get_param("ExclusiveStartTagKey"),
self._get_param("Limit"),
)
return json.dumps(result)
def put_record(self) -> str:
"""Prepare arguments and response to PutRecord()."""
result = self.firehose_backend.put_record(
self._get_param("DeliveryStreamName"), self._get_param("Record")
)
return json.dumps(result)
def put_record_batch(self) -> str:
"""Prepare arguments and response to PutRecordBatch()."""
result = self.firehose_backend.put_record_batch(
self._get_param("DeliveryStreamName"), self._get_param("Records")
)
return json.dumps(result)
def tag_delivery_stream(self) -> str:
"""Prepare arguments and respond to TagDeliveryStream request."""
self.firehose_backend.tag_delivery_stream(
self._get_param("DeliveryStreamName"), self._get_param("Tags")
)
return json.dumps({})
def untag_delivery_stream(self) -> str:
"""Prepare arguments and respond to UntagDeliveryStream()."""
self.firehose_backend.untag_delivery_stream(
self._get_param("DeliveryStreamName"), self._get_param("TagKeys")
)
return json.dumps({})
def update_destination(self) -> str:
"""Prepare arguments and respond to UpdateDestination()."""
self.firehose_backend.update_destination(
self._get_param("DeliveryStreamName"),
self._get_param("CurrentDeliveryStreamVersionId"),
self._get_param("DestinationId"),
self._get_param("S3DestinationUpdate"),
self._get_param("ExtendedS3DestinationUpdate"),
self._get_param("S3BackupMode"),
self._get_param("RedshiftDestinationUpdate"),
self._get_param("ElasticsearchDestinationUpdate"),
self._get_param("SplunkDestinationUpdate"),
self._get_param("HttpEndpointDestinationUpdate"),
self._get_param("SnowflakeDestinationConfiguration"),
)
return json.dumps({})
def start_delivery_stream_encryption(self) -> str:
stream_name = self._get_param("DeliveryStreamName")
encryption_config = self._get_param(
"DeliveryStreamEncryptionConfigurationInput"
)
self.firehose_backend.start_delivery_stream_encryption(
stream_name, encryption_config
)
return "{}"
def stop_delivery_stream_encryption(self) -> str:
stream_name = self._get_param("DeliveryStreamName")
self.firehose_backend.stop_delivery_stream_encryption(stream_name)
return "{}"
|