1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import pytest
from botocore.exceptions import ClientError
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from . import fixture_efs # noqa
@pytest.fixture(scope="function", name="file_system")
def fixture_file_system(efs):
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
create_fs_resp.pop("ResponseMetadata")
yield create_fs_resp
def test_describe_access_points__initial(efs):
resp = efs.describe_access_points()
assert resp["AccessPoints"] == []
def test_create_access_point__simple(efs, file_system):
fs_id = file_system["FileSystemId"]
resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
assert resp["ClientToken"] == "ct"
assert "AccessPointId" in resp
assert "AccessPointArn" in resp
assert resp["FileSystemId"] == fs_id
assert resp["OwnerId"] == ACCOUNT_ID
assert resp["LifeCycleState"] == "available"
assert resp["RootDirectory"] == {"Path": "/"}
def test_create_access_point__full(efs, file_system):
fs_id = file_system["FileSystemId"]
resp = efs.create_access_point(
ClientToken="ct",
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
FileSystemId=fs_id,
PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
RootDirectory={
"Path": "/root/path",
"CreationInfo": {
"OwnerUid": 987,
"OwnerGid": 986,
"Permissions": "root_permissions",
},
},
)
assert resp["ClientToken"] == "ct"
assert resp["Name"] == "myname"
assert resp["Tags"] == [
{"Key": "key", "Value": "value"},
{"Key": "Name", "Value": "myname"},
]
assert "AccessPointId" in resp
assert "AccessPointArn" in resp
assert resp["FileSystemId"] == fs_id
assert resp["PosixUser"] == {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
assert resp["RootDirectory"] == {
"Path": "/root/path",
"CreationInfo": {
"OwnerUid": 987,
"OwnerGid": 986,
"Permissions": "root_permissions",
},
}
assert resp["OwnerId"] == ACCOUNT_ID
assert resp["LifeCycleState"] == "available"
def test_describe_access_point(efs, file_system):
fs_id = file_system["FileSystemId"]
access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
"AccessPointId"
]
resp = efs.describe_access_points(AccessPointId=access_point_id)
assert len(resp["AccessPoints"]) == 1
access_point = resp["AccessPoints"][0]
assert access_point["ClientToken"] == "ct"
assert "AccessPointId" in access_point
assert "AccessPointArn" in access_point
assert access_point["FileSystemId"] == fs_id
assert access_point["OwnerId"] == ACCOUNT_ID
assert access_point["LifeCycleState"] == "available"
def test_describe_access_points__multiple(efs, file_system):
fs_id = file_system["FileSystemId"]
efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)
resp = efs.describe_access_points()
assert len(resp["AccessPoints"]) == 2
def test_describe_access_points__filters(efs):
fs_id1 = efs.create_file_system(CreationToken="foobarbaz")["FileSystemId"]
fs_id2 = efs.create_file_system(CreationToken="bazbarfoo")["FileSystemId"]
ap_id1 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id1)[
"AccessPointId"
]
ap_id2 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id2)[
"AccessPointId"
]
resp = efs.describe_access_points(FileSystemId=fs_id1)
assert len(resp["AccessPoints"]) == 1
assert resp.get("NextToken") is None
assert resp["AccessPoints"][0]["FileSystemId"] == fs_id1
assert resp["AccessPoints"][0]["AccessPointId"] == ap_id1
resp = efs.describe_access_points(AccessPointId=ap_id2)
assert len(resp["AccessPoints"]) == 1
assert resp.get("NextToken") is None
assert resp["AccessPoints"][0]["FileSystemId"] == fs_id2
assert resp["AccessPoints"][0]["AccessPointId"] == ap_id2
def test_delete_access_points(efs, file_system):
fs_id = file_system["FileSystemId"]
ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
"AccessPointId"
]
ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
"AccessPointId"
]
# Delete one access point
efs.delete_access_point(AccessPointId=ap_id2)
# We can only find one
resp = efs.describe_access_points()
assert len(resp["AccessPoints"]) == 1
# The first one still exists
efs.describe_access_points(AccessPointId=ap_id1)
# The second one is gone
with pytest.raises(ClientError) as exc_info:
efs.describe_access_points(AccessPointId=ap_id2)
err = exc_info.value.response["Error"]
assert err["Code"] == "AccessPointNotFound"
|