1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
from collections.abc import Iterable
from datetime import datetime
from typing import Any, Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import unix_time
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
from moto.utilities.utils import get_partition
from .exceptions import ApplicationNotFound, EventStreamNotFound
class App(BaseModel):
def __init__(self, account_id: str, region_name: str, name: str):
self.application_id = str(mock_random.uuid4()).replace("-", "")
self.arn = f"arn:{get_partition(region_name)}:mobiletargeting:{region_name}:{account_id}:apps/{self.application_id}"
self.name = name
self.created = unix_time()
self.settings = AppSettings()
self.event_stream: Optional[EventStream] = None
def get_settings(self) -> "AppSettings":
return self.settings
def update_settings(self, settings: dict[str, Any]) -> "AppSettings":
self.settings.update(settings)
return self.settings
def delete_event_stream(self) -> "EventStream":
stream = self.event_stream
self.event_stream = None
return stream # type: ignore
def get_event_stream(self) -> "EventStream":
if self.event_stream is None:
raise EventStreamNotFound()
return self.event_stream
def put_event_stream(self, stream_arn: str, role_arn: str) -> "EventStream":
self.event_stream = EventStream(stream_arn, role_arn)
return self.event_stream
def to_json(self) -> dict[str, Any]:
return {
"Arn": self.arn,
"Id": self.application_id,
"Name": self.name,
"CreationDate": self.created,
}
class AppSettings(BaseModel):
def __init__(self) -> None:
self.settings: dict[str, Any] = {}
self.last_modified = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def update(self, settings: dict[str, Any]) -> None:
self.settings = settings
self.last_modified = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def to_json(self) -> dict[str, Any]:
return {
"CampaignHook": self.settings.get("CampaignHook", {}),
"CloudWatchMetricsEnabled": self.settings.get(
"CloudWatchMetricsEnabled", False
),
"LastModifiedDate": self.last_modified,
"Limits": self.settings.get("Limits", {}),
"QuietTime": self.settings.get("QuietTime", {}),
}
class EventStream(BaseModel):
def __init__(self, stream_arn: str, role_arn: str):
self.stream_arn = stream_arn
self.role_arn = role_arn
self.last_modified = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def to_json(self) -> dict[str, Any]:
return {
"DestinationStreamArn": self.stream_arn,
"RoleArn": self.role_arn,
"LastModifiedDate": self.last_modified,
}
class PinpointBackend(BaseBackend):
"""Implementation of Pinpoint APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.apps: dict[str, App] = {}
self.tagger = TaggingService()
def create_app(self, name: str, tags: dict[str, str]) -> App:
app = App(self.account_id, self.region_name, name)
self.apps[app.application_id] = app
tag_list = self.tagger.convert_dict_to_tags_input(tags)
self.tagger.tag_resource(app.arn, tag_list)
return app
def delete_app(self, application_id: str) -> App:
self.get_app(application_id)
return self.apps.pop(application_id)
def get_app(self, application_id: str) -> App:
if application_id not in self.apps:
raise ApplicationNotFound()
return self.apps[application_id]
def get_apps(self) -> Iterable[App]:
"""
Pagination is not yet implemented
"""
return self.apps.values()
def update_application_settings(
self, application_id: str, settings: dict[str, Any]
) -> AppSettings:
app = self.get_app(application_id)
return app.update_settings(settings)
def get_application_settings(self, application_id: str) -> AppSettings:
app = self.get_app(application_id)
return app.get_settings()
def list_tags_for_resource(self, resource_arn: str) -> dict[str, dict[str, str]]:
tags = self.tagger.get_tag_dict_for_resource(resource_arn)
return {"tags": tags}
def tag_resource(self, resource_arn: str, tags: dict[str, str]) -> None:
tag_list = TaggingService.convert_dict_to_tags_input(tags)
self.tagger.tag_resource(resource_arn, tag_list)
def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
def put_event_stream(
self, application_id: str, stream_arn: str, role_arn: str
) -> EventStream:
app = self.get_app(application_id)
return app.put_event_stream(stream_arn, role_arn)
def get_event_stream(self, application_id: str) -> EventStream:
app = self.get_app(application_id)
return app.get_event_stream()
def delete_event_stream(self, application_id: str) -> EventStream:
app = self.get_app(application_id)
return app.delete_event_stream()
pinpoint_backends = BackendDict(PinpointBackend, "pinpoint")
|