File: test_access_points.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (152 lines) | stat: -rw-r--r-- 4,988 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import pytest
from botocore.exceptions import ClientError

from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID

from . import fixture_efs  # noqa


@pytest.fixture(scope="function", name="file_system")
def fixture_file_system(efs):
    create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
    create_fs_resp.pop("ResponseMetadata")
    yield create_fs_resp


def test_describe_access_points__initial(efs):
    resp = efs.describe_access_points()
    assert resp["AccessPoints"] == []


def test_create_access_point__simple(efs, file_system):
    fs_id = file_system["FileSystemId"]

    resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
    assert resp["ClientToken"] == "ct"
    assert "AccessPointId" in resp
    assert "AccessPointArn" in resp
    assert resp["FileSystemId"] == fs_id
    assert resp["OwnerId"] == ACCOUNT_ID
    assert resp["LifeCycleState"] == "available"

    assert resp["RootDirectory"] == {"Path": "/"}


def test_create_access_point__full(efs, file_system):
    fs_id = file_system["FileSystemId"]

    resp = efs.create_access_point(
        ClientToken="ct",
        Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
        FileSystemId=fs_id,
        PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
        RootDirectory={
            "Path": "/root/path",
            "CreationInfo": {
                "OwnerUid": 987,
                "OwnerGid": 986,
                "Permissions": "root_permissions",
            },
        },
    )

    assert resp["ClientToken"] == "ct"
    assert resp["Name"] == "myname"
    assert resp["Tags"] == [
        {"Key": "key", "Value": "value"},
        {"Key": "Name", "Value": "myname"},
    ]
    assert "AccessPointId" in resp
    assert "AccessPointArn" in resp
    assert resp["FileSystemId"] == fs_id
    assert resp["PosixUser"] == {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
    assert resp["RootDirectory"] == {
        "Path": "/root/path",
        "CreationInfo": {
            "OwnerUid": 987,
            "OwnerGid": 986,
            "Permissions": "root_permissions",
        },
    }
    assert resp["OwnerId"] == ACCOUNT_ID
    assert resp["LifeCycleState"] == "available"


def test_describe_access_point(efs, file_system):
    fs_id = file_system["FileSystemId"]

    access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
        "AccessPointId"
    ]

    resp = efs.describe_access_points(AccessPointId=access_point_id)
    assert len(resp["AccessPoints"]) == 1
    access_point = resp["AccessPoints"][0]

    assert access_point["ClientToken"] == "ct"
    assert "AccessPointId" in access_point
    assert "AccessPointArn" in access_point
    assert access_point["FileSystemId"] == fs_id
    assert access_point["OwnerId"] == ACCOUNT_ID
    assert access_point["LifeCycleState"] == "available"


def test_describe_access_points__multiple(efs, file_system):
    fs_id = file_system["FileSystemId"]

    efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
    efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)

    resp = efs.describe_access_points()
    assert len(resp["AccessPoints"]) == 2


def test_describe_access_points__filters(efs):
    fs_id1 = efs.create_file_system(CreationToken="foobarbaz")["FileSystemId"]
    fs_id2 = efs.create_file_system(CreationToken="bazbarfoo")["FileSystemId"]

    ap_id1 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id1)[
        "AccessPointId"
    ]
    ap_id2 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id2)[
        "AccessPointId"
    ]

    resp = efs.describe_access_points(FileSystemId=fs_id1)
    assert len(resp["AccessPoints"]) == 1
    assert resp.get("NextToken") is None
    assert resp["AccessPoints"][0]["FileSystemId"] == fs_id1
    assert resp["AccessPoints"][0]["AccessPointId"] == ap_id1

    resp = efs.describe_access_points(AccessPointId=ap_id2)
    assert len(resp["AccessPoints"]) == 1
    assert resp.get("NextToken") is None
    assert resp["AccessPoints"][0]["FileSystemId"] == fs_id2
    assert resp["AccessPoints"][0]["AccessPointId"] == ap_id2


def test_delete_access_points(efs, file_system):
    fs_id = file_system["FileSystemId"]

    ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
        "AccessPointId"
    ]
    ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
        "AccessPointId"
    ]

    # Delete one access point
    efs.delete_access_point(AccessPointId=ap_id2)

    # We can only find one
    resp = efs.describe_access_points()
    assert len(resp["AccessPoints"]) == 1

    # The first one still exists
    efs.describe_access_points(AccessPointId=ap_id1)

    # The second one is gone
    with pytest.raises(ClientError) as exc_info:
        efs.describe_access_points(AccessPointId=ap_id2)
    err = exc_info.value.response["Error"]
    assert err["Code"] == "AccessPointNotFound"