1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
from typing import Any, Optional
from moto.core import DEFAULT_ACCOUNT_ID
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.config import default_user_config
from moto.core.model_instances import reset_model_data
class MotoAPIBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.proxy_urls_to_passthrough: set[str] = set()
self.proxy_hosts_to_passthrough: set[str] = set()
def reset(self) -> None:
region_name = self.region_name
account_id = self.account_id
BackendDict.reset()
reset_model_data()
self.__init__(region_name, account_id) # type: ignore[misc]
def get_transition(self, model_name: str) -> dict[str, Any]:
from moto.moto_api import state_manager
return state_manager.get_transition(model_name)
def set_transition(self, model_name: str, transition: dict[str, Any]) -> None:
from moto.moto_api import state_manager
state_manager.set_transition(model_name, transition)
def unset_transition(self, model_name: str) -> None:
from moto.moto_api import state_manager
state_manager.unset_transition(model_name)
def set_athena_result(
self,
rows: list[dict[str, Any]],
column_info: list[dict[str, str]],
account_id: str,
region: str,
) -> None:
from moto.athena.models import QueryResults, athena_backends
backend = athena_backends[account_id][region]
results = QueryResults(rows=rows, column_info=column_info)
backend.query_results_queue.append(results)
def set_ce_cost_usage(self, result: dict[str, Any], account_id: str) -> None:
from moto.ce.models import ce_backends
backend = ce_backends[account_id]["global"]
backend.cost_usage_results_queue.append(result)
def set_lambda_simple_result(
self, result: str, account_id: str, region: str
) -> None:
from moto.awslambda_simple.models import lambda_simple_backends
backend = lambda_simple_backends[account_id][region]
backend.lambda_simple_results_queue.append(result)
def set_resilience_result(
self, result: list[dict[str, Any]], account_id: str, region: str
) -> None:
from moto.resiliencehub.models import resiliencehub_backends
backend = resiliencehub_backends[account_id][region]
backend.app_assessments_queue.append(result)
def set_sagemaker_result(
self,
body: str,
content_type: str,
prod_variant: str,
custom_attrs: str,
account_id: str,
region: str,
) -> None:
from moto.sagemakerruntime.models import sagemakerruntime_backends
backend = sagemakerruntime_backends[account_id][region]
backend.results_queue.append((body, content_type, prod_variant, custom_attrs))
def set_sagemaker_async_result(
self,
is_failure: bool,
data: str,
account_id: str,
region: str,
) -> None:
from moto.sagemakerruntime.models import sagemakerruntime_backends
backend = sagemakerruntime_backends[account_id][region]
backend.async_results_queue.append((is_failure, data))
def set_rds_data_result(
self,
records: Optional[list[list[dict[str, Any]]]],
column_metadata: Optional[list[dict[str, Any]]],
nr_of_records_updated: Optional[int],
generated_fields: Optional[list[dict[str, Any]]],
formatted_records: Optional[str],
account_id: str,
region: str,
) -> None:
from moto.rdsdata.models import QueryResults, rdsdata_backends
backend = rdsdata_backends[account_id][region]
backend.results_queue.append(
QueryResults(
records=records,
column_metadata=column_metadata,
number_of_records_updated=nr_of_records_updated,
generated_fields=generated_fields,
formatted_records=formatted_records,
)
)
def set_ecr_scan_finding_result(
self,
results: dict[str, Any],
account_id: str,
region: str,
) -> None:
from moto.ecr.models import ECRBackend, ecr_backends
backend: ECRBackend = ecr_backends[account_id][region]
backend.scan_finding_results.append(results)
def set_inspector2_findings_result(
self,
results: Optional[list[list[dict[str, Any]]]],
account_id: str,
region: str,
) -> None:
from moto.inspector2.models import inspector2_backends
backend = inspector2_backends[account_id][region]
backend.findings_queue.append(results)
def set_timestream_result(
self,
query: Optional[str],
query_results: list[dict[str, Any]],
account_id: str,
region: str,
) -> None:
from moto.timestreamquery.models import (
TimestreamQueryBackend,
timestreamquery_backends,
)
backend: TimestreamQueryBackend = timestreamquery_backends[account_id][region]
if query not in backend.query_result_queue:
backend.query_result_queue[query] = []
backend.query_result_queue[query].extend(query_results)
def get_proxy_passthrough(self) -> tuple[set[str], set[str]]:
return self.proxy_urls_to_passthrough, self.proxy_hosts_to_passthrough
def set_proxy_passthrough(
self, http_urls: list[str], https_hosts: list[str]
) -> None:
for url in http_urls:
self.proxy_urls_to_passthrough.add(url)
for host in https_hosts:
self.proxy_hosts_to_passthrough.add(host)
def delete_proxy_passthroughs(self) -> None:
self.proxy_urls_to_passthrough.clear()
self.proxy_hosts_to_passthrough.clear()
def get_config(self) -> dict[str, Any]:
return {
"batch": default_user_config["batch"],
"lambda": default_user_config["lambda"],
}
def set_config(self, config: dict[str, Any]) -> None:
if "batch" in config:
default_user_config["batch"] = config["batch"]
if "lambda" in config:
default_user_config["lambda"] = config["lambda"]
if "stepfunctions" in config:
default_user_config["stepfunctions"] = config["stepfunctions"]
moto_api_backend = MotoAPIBackend(region_name="global", account_id=DEFAULT_ACCOUNT_ID)
|