1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
"""Unit tests for ecs-supported APIs."""
import boto3
from moto import mock_aws
@mock_aws
def test_delete_task_definitions():
client = boto3.client("ecs", region_name="us-east-2")
client.register_task_definition(
family="test_ecs_task",
containerDefinitions=[
{
"name": "hello_world",
"image": "docker/hello-world:latest",
"cpu": 1024,
"memory": 400,
"essential": True,
"environment": [
{"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
],
"logConfiguration": {"logDriver": "json-file"},
}
],
)
client.deregister_task_definition(taskDefinition="test_ecs_task:1")
resp = client.delete_task_definitions(taskDefinitions=["test_ecs_task:1"])
assert resp["taskDefinitions"] == [
{
"family": "test_ecs_task",
"revision": 1,
"volumes": [],
"compatibilities": ["EC2"],
"status": "DELETE_IN_PROGRESS",
"containerDefinitions": [
{
"cpu": 1024,
"portMappings": [],
"essential": True,
"environment": [
{"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
],
"mountPoints": [],
"volumesFrom": [],
"name": "hello_world",
"image": "docker/hello-world:latest",
"memory": 400,
"logConfiguration": {"logDriver": "json-file"},
}
],
"networkMode": "bridge",
"placementConstraints": [],
"taskDefinitionArn": "arn:aws:ecs:us-east-2:123456789012:task-definition/test_ecs_task:1",
}
]
assert resp["failures"] == []
@mock_aws
def test_delete_task_definitions_cannot_delete_active():
client = boto3.client("ecs", region_name="us-east-2")
task_def_1 = client.register_task_definition(
family="test_ecs_task",
containerDefinitions=[
{
"name": "hello_world",
"image": "docker/hello-world:latest",
"cpu": 1024,
"memory": 400,
"essential": True,
"environment": [
{"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
],
"logConfiguration": {"logDriver": "json-file"},
}
],
)
resp = client.delete_task_definitions(
taskDefinitions=[
task_def_1["taskDefinition"]["taskDefinitionArn"],
]
)
assert resp["taskDefinitions"] == []
assert resp["failures"] == [
{
"arn": task_def_1["taskDefinition"]["taskDefinitionArn"],
"reason": "The specified task definition is still in ACTIVE status. Please deregister the target and try again.",
}
]
@mock_aws
def test_delete_task_definitions_invalid_identifier():
client = boto3.client("ecs", region_name="us-east-2")
resp = client.delete_task_definitions(
taskDefinitions=[
"invalid-task-definition-name",
]
)
assert resp["taskDefinitions"] == []
assert resp["failures"] == [
{
"arn": "invalid-task-definition-name",
"reason": "The specified task definition identifier is invalid. Specify a valid name or ARN and try again.",
}
]
@mock_aws
def test_delete_task_definitions_nonexistent():
client = boto3.client("ecs", region_name="us-east-2")
resp = client.delete_task_definitions(
taskDefinitions=[
"nonexistent-task-definition:1",
]
)
assert resp["taskDefinitions"] == []
assert resp["failures"] == [
{
"arn": "nonexistent-task-definition:1",
"reason": "The specified task definition does not exist. Specify a valid account, family, revision and try again.",
}
]
|