File: test_ecs_task_definitions.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (130 lines) | stat: -rw-r--r-- 4,119 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Unit tests for ecs-supported APIs."""

import boto3

from moto import mock_aws


@mock_aws
def test_delete_task_definitions():
    client = boto3.client("ecs", region_name="us-east-2")
    client.register_task_definition(
        family="test_ecs_task",
        containerDefinitions=[
            {
                "name": "hello_world",
                "image": "docker/hello-world:latest",
                "cpu": 1024,
                "memory": 400,
                "essential": True,
                "environment": [
                    {"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
                ],
                "logConfiguration": {"logDriver": "json-file"},
            }
        ],
    )

    client.deregister_task_definition(taskDefinition="test_ecs_task:1")
    resp = client.delete_task_definitions(taskDefinitions=["test_ecs_task:1"])

    assert resp["taskDefinitions"] == [
        {
            "family": "test_ecs_task",
            "revision": 1,
            "volumes": [],
            "compatibilities": ["EC2"],
            "status": "DELETE_IN_PROGRESS",
            "containerDefinitions": [
                {
                    "cpu": 1024,
                    "portMappings": [],
                    "essential": True,
                    "environment": [
                        {"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
                    ],
                    "mountPoints": [],
                    "volumesFrom": [],
                    "name": "hello_world",
                    "image": "docker/hello-world:latest",
                    "memory": 400,
                    "logConfiguration": {"logDriver": "json-file"},
                }
            ],
            "networkMode": "bridge",
            "placementConstraints": [],
            "taskDefinitionArn": "arn:aws:ecs:us-east-2:123456789012:task-definition/test_ecs_task:1",
        }
    ]
    assert resp["failures"] == []


@mock_aws
def test_delete_task_definitions_cannot_delete_active():
    client = boto3.client("ecs", region_name="us-east-2")
    task_def_1 = client.register_task_definition(
        family="test_ecs_task",
        containerDefinitions=[
            {
                "name": "hello_world",
                "image": "docker/hello-world:latest",
                "cpu": 1024,
                "memory": 400,
                "essential": True,
                "environment": [
                    {"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
                ],
                "logConfiguration": {"logDriver": "json-file"},
            }
        ],
    )

    resp = client.delete_task_definitions(
        taskDefinitions=[
            task_def_1["taskDefinition"]["taskDefinitionArn"],
        ]
    )

    assert resp["taskDefinitions"] == []
    assert resp["failures"] == [
        {
            "arn": task_def_1["taskDefinition"]["taskDefinitionArn"],
            "reason": "The specified task definition is still in ACTIVE status. Please deregister the target and try again.",
        }
    ]


@mock_aws
def test_delete_task_definitions_invalid_identifier():
    client = boto3.client("ecs", region_name="us-east-2")
    resp = client.delete_task_definitions(
        taskDefinitions=[
            "invalid-task-definition-name",
        ]
    )

    assert resp["taskDefinitions"] == []
    assert resp["failures"] == [
        {
            "arn": "invalid-task-definition-name",
            "reason": "The specified task definition identifier is invalid. Specify a valid name or ARN and try again.",
        }
    ]


@mock_aws
def test_delete_task_definitions_nonexistent():
    client = boto3.client("ecs", region_name="us-east-2")
    resp = client.delete_task_definitions(
        taskDefinitions=[
            "nonexistent-task-definition:1",
        ]
    )

    assert resp["taskDefinitions"] == []
    assert resp["failures"] == [
        {
            "arn": "nonexistent-task-definition:1",
            "reason": "The specified task definition does not exist. Specify a valid account, family, revision and try again.",
        }
    ]