1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
from threading import Thread
from typing import TYPE_CHECKING, Any, Optional
from uuid import uuid4
from moto.core.utils import gzip_decompress
from moto.utilities.utils import get_partition
if TYPE_CHECKING:
from moto.dynamodb.models import DynamoDBBackend
from moto.dynamodb.models.table import Table
from moto.s3.models import FakeBucket, S3Backend
class TableImport(Thread):
def __init__(
self,
account_id: str,
s3_source: dict[str, str],
region_name: str,
table_name: str,
billing_mode: str,
throughput: Optional[dict[str, int]],
key_schema: list[dict[str, str]],
global_indexes: Optional[list[dict[str, Any]]],
attrs: list[dict[str, str]],
compression_type: Optional[str],
):
super().__init__()
self.partition = get_partition(region_name)
self.arn = f"arn:{self.partition}:dynamodb:{region_name}:{account_id}:table/{table_name}/import/{str(uuid4()).replace('-', '')}"
self.status = "IN_PROGRESS"
self.account_id = account_id
self.s3_source = s3_source
self.region_name = region_name
self.table_name = table_name
self.billing_mode = billing_mode
self.throughput = throughput
self.key_schema = key_schema
self.global_indexes = global_indexes
self.attrs = attrs
self.compression_type = compression_type
self.failure_code: Optional[str] = None
self.failure_message: Optional[str] = None
self.table: Optional[Table] = None
self.table_arn = f"arn:{get_partition(self.region_name)}:dynamodb:{self.region_name}:{self.account_id}:table/{table_name}"
self.processed_count = 0
self.processed_bytes = 0
self.error_count = 0
self.imported_count = 0
def run(self) -> None:
s3_bucket_name = self.s3_source["S3Bucket"]
try:
from moto.s3.models import s3_backends
s3_backend = s3_backends[self.account_id][self.partition]
bucket = s3_backend.buckets[s3_bucket_name]
except KeyError:
self.status = "FAILED"
self.failure_code = "S3NoSuchBucket"
self.failure_message = "The specified bucket does not exist"
return
try:
self._process_s3_files(s3_backend, bucket)
except Exception as e:
self.status = "FAILED"
self.failure_code = "UNKNOWN"
self.failure_message = str(e)
def _process_s3_files(self, s3_backend: "S3Backend", bucket: "FakeBucket") -> None:
# CREATE TABLE
from moto.dynamodb.models import dynamodb_backends
dynamo: DynamoDBBackend = dynamodb_backends[self.account_id][self.region_name]
self.table = dynamo.create_table(
name=self.table_name,
billing_mode=self.billing_mode,
throughput=self.throughput,
schema=self.key_schema,
global_indexes=self.global_indexes,
indexes=None,
attr=self.attrs,
sse_specification=None,
streams=None,
tags=[],
deletion_protection_enabled=False,
warm_throughput=None,
)
# Load data from S3
keys, _, _, _ = s3_backend.list_objects(
bucket,
prefix=self.s3_source.get("S3KeyPrefix"),
delimiter=None,
marker=None,
max_keys=None,
)
from py_partiql_parser import JsonParser
for key in keys:
if self.compression_type == "GZIP":
content = gzip_decompress(key.value).decode("utf-8")
else:
content = key.value.decode("utf-8")
result = JsonParser.parse(original=content)
for json_object in result:
try:
self.processed_count += 1
self.processed_bytes += len(json_object)
self.table.put_item(item_attrs=json_object["Item"])
self.imported_count += 1
except Exception as e:
self.failure_message = str(e)
self.error_count += 1
self.status = "COMPLETED" if self.error_count == 0 else "FAILED"
def response(self) -> dict[str, Any]:
return {
"ImportArn": self.arn,
"ImportStatus": self.status,
"TableArn": self.table_arn,
"FailureCode": self.failure_code,
"FailureMessage": self.failure_message,
"ProcessedItemCount": self.processed_count,
"ProcessedSizeBytes": self.processed_bytes,
"ErrorCount": self.error_count,
"ImportedItemCount": self.imported_count,
}
|