1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
|
import re
import pytest
from botocore.exceptions import ClientError
from tests.test_efs.junk_drawer import has_status_code
from . import fixture_efs # noqa
ARN_PATT = r"^arn:(?P<Partition>[^:\n]*):(?P<Service>[^:\n]*):(?P<Region>[^:\n]*):(?P<AccountID>[^:\n]*):(?P<Ignore>(?P<ResourceType>[^:\/\n]*)[:\/])?(?P<Resource>.*)$"
STRICT_ARN_PATT = r"^arn:aws:[a-z]+:[a-z]{2}-[a-z]+-[0-9]:[0-9]+:[a-z-]+\/[a-z0-9-]+$"
SAMPLE_1_PARAMS = {
"CreationToken": "myFileSystem1",
"PerformanceMode": "generalPurpose",
"Backup": True,
"Encrypted": True,
"Tags": [{"Key": "Name", "Value": "Test Group1"}],
}
SAMPLE_2_PARAMS = {
"CreationToken": "myFileSystem2",
"PerformanceMode": "generalPurpose",
"Backup": True,
"AvailabilityZoneName": "us-west-2b",
"Encrypted": True,
"ThroughputMode": "provisioned",
"ProvisionedThroughputInMibps": 60,
"Tags": [{"Key": "Name", "Value": "Test Group1"}],
}
# Testing Create
# ==============
def test_create_file_system_correct_use(efs):
from datetime import datetime
creation_token = "test_efs_create"
create_fs_resp = efs.create_file_system(
CreationToken=creation_token,
Tags=[{"Key": "Name", "Value": "Test EFS Container"}],
)
# Check the response.
assert has_status_code(create_fs_resp, 201)
assert create_fs_resp["CreationToken"] == creation_token
assert "fs-" in create_fs_resp["FileSystemId"]
assert isinstance(create_fs_resp["CreationTime"], datetime)
assert create_fs_resp["LifeCycleState"] == "available"
assert create_fs_resp["Tags"][0] == {"Key": "Name", "Value": "Test EFS Container"}
assert create_fs_resp["ThroughputMode"] == "bursting"
assert create_fs_resp["PerformanceMode"] == "generalPurpose"
assert create_fs_resp["Encrypted"] is False
assert create_fs_resp["NumberOfMountTargets"] == 0
assert create_fs_resp["Name"] == "Test EFS Container"
for key_name in ["Value", "ValueInIA", "ValueInStandard"]:
assert key_name in create_fs_resp["SizeInBytes"]
assert create_fs_resp["SizeInBytes"][key_name] == 0
assert re.match(STRICT_ARN_PATT, create_fs_resp["FileSystemArn"])
# Check the (lack of the) backup policy.
with pytest.raises(ClientError) as exc_info:
efs.describe_backup_policy(FileSystemId=create_fs_resp["FileSystemId"])
resp = exc_info.value.response
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 404
assert "PolicyNotFound" == resp["Error"]["Code"]
# Check the arn in detail
match_obj = re.match(ARN_PATT, create_fs_resp["FileSystemArn"])
arn_parts = match_obj.groupdict()
assert arn_parts["ResourceType"] == "file-system"
assert arn_parts["Resource"] == create_fs_resp["FileSystemId"]
assert arn_parts["Service"] == "elasticfilesystem"
assert arn_parts["AccountID"] == create_fs_resp["OwnerId"]
def test_create_file_system_aws_sample_1(efs):
resp = efs.create_file_system(**SAMPLE_1_PARAMS)
resp_metadata = resp.pop("ResponseMetadata")
assert resp_metadata["HTTPStatusCode"] == 201
assert set(resp.keys()) == {
"OwnerId",
"CreationToken",
"Encrypted",
"PerformanceMode",
"FileSystemId",
"FileSystemArn",
"CreationTime",
"LifeCycleState",
"NumberOfMountTargets",
"SizeInBytes",
"Tags",
"ThroughputMode",
"Name",
}
assert resp["Tags"] == [{"Key": "Name", "Value": "Test Group1"}]
assert resp["PerformanceMode"] == "generalPurpose"
assert resp["Encrypted"]
assert resp["Name"] == "Test Group1"
policy_resp = efs.describe_backup_policy(FileSystemId=resp["FileSystemId"])
assert policy_resp["BackupPolicy"]["Status"] == "ENABLED"
def test_create_file_system_aws_sample_2(efs):
resp = efs.create_file_system(**SAMPLE_2_PARAMS)
resp_metadata = resp.pop("ResponseMetadata")
assert resp_metadata["HTTPStatusCode"] == 201
assert set(resp.keys()) == {
"AvailabilityZoneId",
"AvailabilityZoneName",
"PerformanceMode",
"ProvisionedThroughputInMibps",
"SizeInBytes",
"Tags",
"ThroughputMode",
"CreationTime",
"CreationToken",
"Encrypted",
"LifeCycleState",
"FileSystemId",
"FileSystemArn",
"NumberOfMountTargets",
"OwnerId",
"Name",
}
assert resp["ProvisionedThroughputInMibps"] == 60
assert resp["AvailabilityZoneId"] == "usw2-az1"
assert resp["AvailabilityZoneName"] == "us-west-2b"
assert resp["ThroughputMode"] == "provisioned"
assert resp["Name"] == "Test Group1"
policy_resp = efs.describe_backup_policy(FileSystemId=resp["FileSystemId"])
assert policy_resp["BackupPolicy"]["Status"] == "ENABLED"
def test_create_file_system_az_name_given_backup_default(efs):
resp = efs.create_file_system(AvailabilityZoneName="us-east-1e")
policy_resp = efs.describe_backup_policy(FileSystemId=resp["FileSystemId"])
assert policy_resp["BackupPolicy"]["Status"] == "ENABLED"
def test_create_file_system_no_creation_token_given(efs):
# Note that from the API docs, it would seem this should create an error. However it
# turns out that botocore just automatically assigns a UUID.
resp = efs.create_file_system()
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 201
assert "CreationToken" in resp
def test_create_file_system_file_system_already_exists(efs):
efs.create_file_system(CreationToken="foo")
with pytest.raises(ClientError) as exc_info:
efs.create_file_system(CreationToken="foo")
resp = exc_info.value.response
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 409
assert "FileSystemAlreadyExists" == resp["Error"]["Code"]
# Testing Describe
# ================
def test_describe_file_systems_using_identifier(efs):
# Create the file system.
create_fs_resp = efs.create_file_system(CreationToken="foobar")
create_fs_resp.pop("ResponseMetadata")
fs_id = create_fs_resp["FileSystemId"]
# Describe the file system.
desc_fs_resp = efs.describe_file_systems(FileSystemId=fs_id)
assert len(desc_fs_resp["FileSystems"]) == 1
assert desc_fs_resp["FileSystems"][0]["FileSystemId"] == fs_id
assert desc_fs_resp["FileSystems"][0]["Name"] == ""
def test_describe_file_systems_using_unknown_identifier(efs):
with pytest.raises(ClientError) as exc:
efs.describe_file_systems(FileSystemId="unknown")
err = exc.value.response["Error"]
assert err["Code"] == "FileSystemNotFound"
# Verified against AWS
assert err["Message"] == "File system 'unknown' does not exist."
def test_describe_file_systems_minimal_case(efs):
# Create the file system.
create_fs_resp = efs.create_file_system(CreationToken="foobar")
create_fs_resp.pop("ResponseMetadata")
# Describe the file systems.
desc_fs_resp = efs.describe_file_systems()
desc_fs_resp_metadata = desc_fs_resp.pop("ResponseMetadata")
assert desc_fs_resp_metadata["HTTPStatusCode"] == 200
# Check the list results.
fs_list = desc_fs_resp["FileSystems"]
assert len(fs_list) == 1
file_system = fs_list[0]
assert set(file_system.keys()) == {
"CreationTime",
"CreationToken",
"Encrypted",
"LifeCycleState",
"PerformanceMode",
"SizeInBytes",
"Tags",
"ThroughputMode",
"FileSystemId",
"FileSystemArn",
"NumberOfMountTargets",
"OwnerId",
"Name",
}
assert file_system["FileSystemId"] == create_fs_resp["FileSystemId"]
assert file_system["Name"] == ""
# Pop out the timestamps and see if the rest of the description is the same.
create_fs_resp["SizeInBytes"].pop("Timestamp")
file_system["SizeInBytes"].pop("Timestamp")
assert file_system == create_fs_resp
def test_describe_file_systems_aws_create_sample_2(efs):
efs.create_file_system(**SAMPLE_2_PARAMS)
# Describe the file systems.
desc_resp = efs.describe_file_systems()
desc_fs_resp_metadata = desc_resp.pop("ResponseMetadata")
assert desc_fs_resp_metadata["HTTPStatusCode"] == 200
# Check the list results.
fs_list = desc_resp["FileSystems"]
assert len(fs_list) == 1
file_system = fs_list[0]
assert set(file_system.keys()) == {
"AvailabilityZoneId",
"AvailabilityZoneName",
"CreationTime",
"CreationToken",
"Encrypted",
"LifeCycleState",
"PerformanceMode",
"ProvisionedThroughputInMibps",
"SizeInBytes",
"Tags",
"ThroughputMode",
"FileSystemId",
"FileSystemArn",
"NumberOfMountTargets",
"OwnerId",
"Name",
}
assert file_system["ProvisionedThroughputInMibps"] == 60
assert file_system["AvailabilityZoneId"] == "usw2-az1"
assert file_system["AvailabilityZoneName"] == "us-west-2b"
assert file_system["ThroughputMode"] == "provisioned"
assert file_system["Name"] == "Test Group1"
def test_describe_file_systems_paging(efs):
# Create several file systems.
for i in range(10):
efs.create_file_system(CreationToken=f"foobar_{i}")
# First call (Start)
# ------------------
# Call the tested function
resp1 = efs.describe_file_systems(MaxItems=4)
# Check the response status
assert has_status_code(resp1, 200)
# Check content of the result.
resp1.pop("ResponseMetadata")
assert set(resp1.keys()) == {"NextMarker", "FileSystems"}
assert len(resp1["FileSystems"]) == 4
fs_id_set_1 = {fs["FileSystemId"] for fs in resp1["FileSystems"]}
# Second call (Middle)
# --------------------
# Get the next marker.
resp2 = efs.describe_file_systems(MaxItems=4, Marker=resp1["NextMarker"])
# Check the response status
resp2_metadata = resp2.pop("ResponseMetadata")
assert resp2_metadata["HTTPStatusCode"] == 200
# Check the response contents.
assert set(resp2.keys()) == {"NextMarker", "FileSystems", "Marker"}
assert len(resp2["FileSystems"]) == 4
assert resp2["Marker"] == resp1["NextMarker"]
fs_id_set_2 = {fs["FileSystemId"] for fs in resp2["FileSystems"]}
assert fs_id_set_1 & fs_id_set_2 == set()
# Third call (End)
# ----------------
# Get the last marker results
resp3 = efs.describe_file_systems(MaxItems=4, Marker=resp2["NextMarker"])
# Check the response status
resp3_metadata = resp3.pop("ResponseMetadata")
assert resp3_metadata["HTTPStatusCode"] == 200
# Check the response contents.
assert set(resp3.keys()) == {"FileSystems", "Marker"}
assert len(resp3["FileSystems"]) == 2
assert resp3["Marker"] == resp2["NextMarker"]
fs_id_set_3 = {fs["FileSystemId"] for fs in resp3["FileSystems"]}
assert fs_id_set_3 & (fs_id_set_1 | fs_id_set_2) == set()
def test_describe_file_systems_invalid_marker(efs):
with pytest.raises(ClientError) as exc_info:
efs.describe_file_systems(Marker="fiddlesticks")
resp = exc_info.value.response
assert has_status_code(resp, 400)
assert "BadRequest" == resp["Error"]["Code"]
def test_describe_file_systems_invalid_creation_token(efs):
resp = efs.describe_file_systems(CreationToken="fizzle")
assert has_status_code(resp, 200)
assert len(resp["FileSystems"]) == 0
def test_describe_file_systems_invalid_file_system_id(efs):
with pytest.raises(ClientError) as exc_info:
efs.describe_file_systems(FileSystemId="fs-29879313")
resp = exc_info.value.response
assert has_status_code(resp, 404)
assert "FileSystemNotFound" == resp["Error"]["Code"]
def test_describe_file_system_creation_token_and_file_system_id(efs):
with pytest.raises(ClientError) as exc_info:
efs.describe_file_systems(CreationToken="fizzle", FileSystemId="fs-07987987")
resp = exc_info.value.response
assert has_status_code(resp, 400)
assert "BadRequest" == resp["Error"]["Code"]
# Testing Delete
# ==============
def test_delete_file_system_minimal_case(efs):
# Create the file system
resp = efs.create_file_system()
# Describe the file system, prove it shows up.
desc1 = efs.describe_file_systems()
assert len(desc1["FileSystems"]) == 1
assert resp["FileSystemId"] in {fs["FileSystemId"] for fs in desc1["FileSystems"]}
# Delete the file system.
del_resp = efs.delete_file_system(FileSystemId=resp["FileSystemId"])
assert has_status_code(del_resp, 204)
# Check that the file system is no longer there.
desc2 = efs.describe_file_systems()
assert len(desc2["FileSystems"]) == 0
def test_delete_file_system_invalid_file_system_id(efs):
with pytest.raises(ClientError) as exc_info:
efs.delete_file_system(FileSystemId="fs-2394287")
resp = exc_info.value.response
assert has_status_code(resp, 404)
assert "FileSystemNotFound" == resp["Error"]["Code"]
|