1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
"""EBSBackend class with methods for supported APIs."""
from typing import Any, Optional
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
from moto.core.utils import unix_time
from moto.ec2.models import EC2Backend, ec2_backends
from moto.ec2.models.elastic_block_store import Snapshot
from moto.moto_api._internal import mock_random
class Block(BaseModel):
def __init__(
self, block_data: str, checksum: str, checksum_algorithm: str, data_length: str
):
self.block_data = block_data
self.checksum = checksum
self.checksum_algorithm = checksum_algorithm
self.data_length = data_length
self.block_token = str(mock_random.uuid4())
class EBSSnapshot(BaseModel):
def __init__(self, account_id: str, snapshot: Snapshot):
self.account_id = account_id
self.snapshot_id = snapshot.id
self.status = "pending"
self.start_time = unix_time()
self.volume_size = snapshot.volume.size
self.block_size = 512
self.tags = [
{"Key": t["key"], "Value": t["value"]} for t in snapshot.get_tags()
]
self.description = snapshot.description
self.blocks: dict[str, Block] = {}
def put_block(
self,
block_idx: str,
block_data: str,
checksum: str,
checksum_algorithm: str,
data_length: str,
) -> None:
block = Block(block_data, checksum, checksum_algorithm, data_length)
self.blocks[block_idx] = block
def to_json(self) -> dict[str, Any]:
return {
"SnapshotId": self.snapshot_id,
"OwnerId": self.account_id,
"Status": self.status,
"StartTime": self.start_time,
"VolumeSize": self.volume_size,
"BlockSize": self.block_size,
"Tags": self.tags,
"Description": self.description,
}
class EBSBackend(BaseBackend):
"""Implementation of EBS APIs."""
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.snapshots: dict[str, EBSSnapshot] = {}
@property
def ec2_backend(self) -> EC2Backend:
return ec2_backends[self.account_id][self.region_name]
def start_snapshot(
self, volume_size: int, tags: Optional[list[dict[str, str]]], description: str
) -> EBSSnapshot:
"""
The following parameters are not yet implemented: ParentSnapshotId, ClientToken, Encrypted, KmsKeyArn, Timeout
"""
zone_name = f"{self.region_name}a"
vol = self.ec2_backend.create_volume(size=volume_size, zone_name=zone_name)
snapshot = self.ec2_backend.create_snapshot(
volume_id=vol.id, description=description
)
if tags:
snapshot.add_tags({tag["Key"]: tag["Value"] for tag in tags})
ebs_snapshot = EBSSnapshot(account_id=self.account_id, snapshot=snapshot)
self.snapshots[ebs_snapshot.snapshot_id] = ebs_snapshot
return ebs_snapshot
def complete_snapshot(self, snapshot_id: str) -> dict[str, str]:
"""
The following parameters are not yet supported: ChangedBlocksCount, Checksum, ChecksumAlgorithm, ChecksumAggregationMethod
"""
self.snapshots[snapshot_id].status = "completed"
return {"Status": "completed"}
def put_snapshot_block(
self,
snapshot_id: str,
block_index: str,
block_data: str,
checksum: str,
checksum_algorithm: str,
data_length: str,
) -> tuple[str, str]:
"""
The following parameters are currently not taken into account: DataLength, Progress.
The Checksum and ChecksumAlgorithm are taken at face-value, but no validation takes place.
"""
snapshot = self.snapshots[snapshot_id]
snapshot.put_block(
block_index, block_data, checksum, checksum_algorithm, data_length
)
return checksum, checksum_algorithm
def get_snapshot_block(self, snapshot_id: str, block_index: str) -> Block:
"""
The BlockToken-parameter is not yet implemented
"""
snapshot = self.snapshots[snapshot_id]
return snapshot.blocks[block_index]
def list_changed_blocks(
self, first_snapshot_id: str, second_snapshot_id: str
) -> tuple[dict[str, tuple[str, Optional[str]]], EBSSnapshot]:
"""
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
"""
snapshot1 = self.snapshots[first_snapshot_id]
snapshot2 = self.snapshots[second_snapshot_id]
changed_blocks: dict[
str, tuple[str, Optional[str]]
] = {} # {idx: (token1, token2), ..}
for idx in snapshot1.blocks:
block1 = snapshot1.blocks[idx]
if idx in snapshot2.blocks:
block2 = snapshot2.blocks[idx]
if block1.block_data != block2.block_data:
changed_blocks[idx] = (block1.block_token, block2.block_token)
else:
changed_blocks[idx] = (block1.block_token, None)
return changed_blocks, snapshot1
def list_snapshot_blocks(self, snapshot_id: str) -> EBSSnapshot:
"""
The following parameters are not yet implemented: NextToken, MaxResults, StartingBlockIndex
"""
return self.snapshots[snapshot_id]
ebs_backends = BackendDict(EBSBackend, "ebs")
|