File: models.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (194 lines) | stat: -rw-r--r-- 7,929 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""TimestreamQueryBackend class with methods for supported APIs."""

from typing import Any, Optional, Union
from uuid import uuid4

from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import unix_time
from moto.utilities.utils import get_partition

from .exceptions import ResourceNotFound


class ScheduledQuery(BaseModel):
    def __init__(
        self,
        account_id: str,
        region_name: str,
        name: str,
        query_string: str,
        schedule_configuration: dict[str, str],
        notification_configuration: dict[str, dict[str, str]],
        target_configuration: Optional[dict[str, Any]],
        scheduled_query_execution_role_arn: str,
        tags: Optional[list[dict[str, str]]],
        kms_key_id: Optional[str],
        error_report_configuration: Optional[dict[str, dict[str, str]]],
    ):
        self.account_id = account_id
        self.region_name = region_name
        self.name = name
        self.query_string = query_string
        self.schedule_configuration = schedule_configuration
        self.notification_configuration = notification_configuration
        self.target_configuration = target_configuration
        self.scheduled_query_execution_role_arn = scheduled_query_execution_role_arn
        self.tags = tags
        self.kms_key_id = kms_key_id
        self.error_report_configuration = error_report_configuration

        self.created_on = unix_time()
        self.updated_on = unix_time()

        self.arn = f"arn:{get_partition(region_name)}:timestream:{region_name}:{account_id}:scheduled-query/{name}"
        self.state = "ENABLED"

    def description(self) -> dict[str, Any]:
        return {
            "Arn": self.arn,
            "Name": self.name,
            "CreationTime": self.created_on,
            "State": self.state,
            "QueryString": self.query_string,
            "ScheduleConfiguration": self.schedule_configuration,
            "NotificationConfiguration": self.notification_configuration,
            "TargetConfiguration": self.target_configuration,
            "ScheduledQueryExecutionRoleArn": self.scheduled_query_execution_role_arn,
            "KmsKeyId": self.kms_key_id,
            "ErrorReportConfiguration": self.error_report_configuration,
        }


class TimestreamQueryBackend(BaseBackend):
    """Implementation of TimestreamQuery APIs."""

    def __init__(self, region_name: str, account_id: str):
        super().__init__(region_name, account_id)
        self.scheduled_queries: dict[str, ScheduledQuery] = {}

        self.query_result_queue: dict[Optional[str], list[dict[str, Any]]] = {}
        self.query_results: dict[str, dict[str, Any]] = {}

    def create_scheduled_query(
        self,
        name: str,
        query_string: str,
        schedule_configuration: dict[str, str],
        notification_configuration: dict[str, dict[str, str]],
        target_configuration: Optional[dict[str, Any]],
        scheduled_query_execution_role_arn: str,
        tags: Optional[list[dict[str, str]]],
        kms_key_id: Optional[str],
        error_report_configuration: dict[str, dict[str, str]],
    ) -> ScheduledQuery:
        query = ScheduledQuery(
            account_id=self.account_id,
            region_name=self.region_name,
            name=name,
            query_string=query_string,
            schedule_configuration=schedule_configuration,
            notification_configuration=notification_configuration,
            target_configuration=target_configuration,
            scheduled_query_execution_role_arn=scheduled_query_execution_role_arn,
            tags=tags,
            kms_key_id=kms_key_id,
            error_report_configuration=error_report_configuration,
        )
        self.scheduled_queries[query.arn] = query
        return query

    def delete_scheduled_query(self, scheduled_query_arn: str) -> None:
        self.scheduled_queries.pop(scheduled_query_arn, None)

    def describe_scheduled_query(self, scheduled_query_arn: str) -> ScheduledQuery:
        if scheduled_query_arn not in self.scheduled_queries:
            raise ResourceNotFound(scheduled_query_arn)
        return self.scheduled_queries[scheduled_query_arn]

    def update_scheduled_query(self, scheduled_query_arn: str, state: str) -> None:
        query = self.scheduled_queries[scheduled_query_arn]
        query.state = state

    def query(self, query_string: str) -> dict[str, Any]:
        """
        Moto does not have a builtin time-series Database, so calling this endpoint will return zero results by default.

        You can use a dedicated API to configuring a queue of expected results.

        An example invocation looks like this:

        .. sourcecode:: python

            first_result = {
                'QueryId': 'some_id',
                'Rows': [...],
                'ColumnInfo': [...],
                'QueryStatus': ...
            }
            result_for_unknown_query_string = {
                'QueryId': 'unknown',
                'Rows': [...],
                'ColumnInfo': [...],
                'QueryStatus': ...
            }
            expected_results = {
                "account_id": "123456789012",  # This is the default - can be omitted
                "region": "us-east-1",  # This is the default - can be omitted
                "results": {
                    # Use the exact querystring, and a list of results for it
                    # For example
                    "SELECT data FROM mytable": [first_result, ...],
                    # Use None if the exact querystring is unknown/irrelevant
                    None: [result_for_unknown_query_string, ...],
                }
            }
            requests.post(
                "http://motoapi.amazonaws.com/moto-api/static/timestream/query-results",
                json=expected_results,
            )

        When calling `query(QueryString='SELECT data FROM mytable')`, the `first_result` will be returned.
        Call the query again for the second result, and so on.

        If you don't know the exact query strings, use the `None`-key. In the above example, when calling `SELECT something FROM unknown`, there are no results for that specific query, so `result_for_unknown_query_string` will be returned.

        Results for unknown queries are cached, so calling `SELECT something FROM unknown` will return the same result.

        """
        if self.query_result_queue.get(query_string):
            return self.query_result_queue[query_string].pop(0)
        if result := self.query_results.get(query_string):
            return result
        if self.query_result_queue.get(None):
            self.query_results[query_string] = self.query_result_queue[None].pop(0)
            return self.query_results[query_string]
        return {"QueryId": str(uuid4()), "Rows": [], "ColumnInfo": []}

    def describe_endpoints(self) -> list[dict[str, Union[str, int]]]:
        # https://docs.aws.amazon.com/timestream/latest/developerguide/Using-API.endpoint-discovery.how-it-works.html
        # Usually, the address look like this:
        # query-cell1.timestream.us-east-1.amazonaws.com
        # Where 'cell1' can be any number, 'cell2', 'cell3', etc - whichever endpoint happens to be available for that particular account
        # We don't implement a cellular architecture in Moto though, so let's keep it simple
        return [
            {
                "Address": f"query.timestream.{self.region_name}.amazonaws.com",
                "CachePeriodInMinutes": 1440,
            }
        ]


timestreamquery_backends = BackendDict(
    TimestreamQueryBackend,
    "timestream-query",
    additional_regions=[
        "us-east-1",
        "us-east-2",
        "us-west-2",
        "eu-central-1",
        "eu-west-1",
        "ap-southeast-2",
        "ap-northeast-1",
    ],
)