1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
from unittest import SkipTest
import boto3
import pytest
from moto import mock_aws, settings
from moto.moto_api import state_manager
@pytest.fixture(scope="function")
def execution_state_transition():
state_manager.set_transition(
model_name="dms::replicationinstance",
transition={"progression": "manual", "times": 2},
)
yield
# Reset to default - we don't want to affect other tests
state_manager.set_transition(
model_name="dms::replicationinstance",
transition={"progression": "manual", "times": 1},
)
@mock_aws
def test_describe_replication_instance_manual_transition(execution_state_transition):
if not settings.TEST_DECORATOR_MODE:
# We already test the state transitions in ServerMode in other tests
raise SkipTest(
"NO point in verifying state transitions when not using the decorator"
)
client = boto3.client("dms", region_name="eu-west-1")
response = client.create_replication_instance(
ReplicationInstanceIdentifier="test-instance",
ReplicationInstanceClass="dms.t2.micro",
AllocatedStorage=50,
VpcSecurityGroupIds=["sg-12345"],
AvailabilityZone="us-east-1a",
ReplicationSubnetGroupIdentifier="default-subnet-group",
PreferredMaintenanceWindow="sun:06:00-sun:14:00",
MultiAZ=False,
EngineVersion="3.4.6",
AutoMinorVersionUpgrade=True,
Tags=[{"Key": "Name", "Value": "Test Instance"}],
PubliclyAccessible=True,
NetworkType="IPV4",
)
instance = response["ReplicationInstance"]
assert instance["ReplicationInstanceIdentifier"] == "test-instance"
assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
assert instance["AllocatedStorage"] == 50
assert instance["AvailabilityZone"] == "us-east-1a"
assert instance["ReplicationInstanceStatus"] == "creating"
assert instance["MultiAZ"] is False
assert instance["EngineVersion"] == "3.4.6"
assert instance["AutoMinorVersionUpgrade"] is True
assert instance["PubliclyAccessible"] is True
assert instance["NetworkType"] == "IPV4"
assert instance["VpcSecurityGroups"] == [
{"Status": "active", "VpcSecurityGroupId": "sg-12345"}
]
# first call status should stay creating
response = client.describe_replication_instances(
Filters=[{"Name": "replication-instance-id", "Values": ["test-instance"]}]
)
assert len(response["ReplicationInstances"]) == 1
instance = response["ReplicationInstances"][0]
assert instance["ReplicationInstanceIdentifier"] == "test-instance"
assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
assert instance["AllocatedStorage"] == 50
assert instance["AvailabilityZone"] == "us-east-1a"
assert instance["ReplicationInstanceStatus"] == "creating"
assert instance["MultiAZ"] is False
assert instance["EngineVersion"] == "3.4.6"
assert instance["AutoMinorVersionUpgrade"] is True
assert instance["PubliclyAccessible"] is True
assert instance["NetworkType"] == "IPV4"
assert instance["VpcSecurityGroups"] == [
{"Status": "active", "VpcSecurityGroupId": "sg-12345"}
]
# Second call should update the status to available
response = client.describe_replication_instances(
Filters=[{"Name": "replication-instance-id", "Values": ["test-instance"]}]
)
assert len(response["ReplicationInstances"]) == 1
instance = response["ReplicationInstances"][0]
assert instance["ReplicationInstanceIdentifier"] == "test-instance"
assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
assert instance["AllocatedStorage"] == 50
assert instance["AvailabilityZone"] == "us-east-1a"
assert instance["ReplicationInstanceStatus"] == "available"
assert instance["MultiAZ"] is False
assert instance["EngineVersion"] == "3.4.6"
assert instance["AutoMinorVersionUpgrade"] is True
assert instance["PubliclyAccessible"] is True
assert instance["NetworkType"] == "IPV4"
assert instance["VpcSecurityGroups"] == [
{"Status": "active", "VpcSecurityGroupId": "sg-12345"}
]
@mock_aws
def test_replication_instance_without_transition():
"""
Default state transition is 'manual', with 1 times meaning that the first time we call describe_replication_instance the status advances
"""
client = boto3.client("dms", region_name="eu-west-1")
response = client.create_replication_instance(
ReplicationInstanceIdentifier="test-instance",
ReplicationInstanceClass="dms.t2.micro",
AllocatedStorage=50,
VpcSecurityGroupIds=["sg-12345"],
AvailabilityZone="us-east-1a",
ReplicationSubnetGroupIdentifier="default-subnet-group",
PreferredMaintenanceWindow="sun:06:00-sun:14:00",
MultiAZ=False,
EngineVersion="3.4.6",
AutoMinorVersionUpgrade=True,
Tags=[{"Key": "Name", "Value": "Test Instance"}],
PubliclyAccessible=True,
NetworkType="IPV4",
)
instance = response["ReplicationInstance"]
assert instance["ReplicationInstanceIdentifier"] == "test-instance"
assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
assert instance["AllocatedStorage"] == 50
assert instance["AvailabilityZone"] == "us-east-1a"
assert instance["ReplicationInstanceStatus"] == "creating"
assert instance["MultiAZ"] is False
assert instance["EngineVersion"] == "3.4.6"
assert instance["AutoMinorVersionUpgrade"] is True
assert instance["PubliclyAccessible"] is True
assert instance["NetworkType"] == "IPV4"
assert instance["VpcSecurityGroups"] == [
{"Status": "active", "VpcSecurityGroupId": "sg-12345"}
]
# first call status should advance the state
response = client.describe_replication_instances(
Filters=[{"Name": "replication-instance-id", "Values": ["test-instance"]}]
)
assert len(response["ReplicationInstances"]) == 1
instance = response["ReplicationInstances"][0]
assert instance["ReplicationInstanceIdentifier"] == "test-instance"
assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
assert instance["AllocatedStorage"] == 50
assert instance["AvailabilityZone"] == "us-east-1a"
assert instance["ReplicationInstanceStatus"] == "available"
assert instance["MultiAZ"] is False
assert instance["EngineVersion"] == "3.4.6"
assert instance["AutoMinorVersionUpgrade"] is True
assert instance["PubliclyAccessible"] is True
assert instance["NetworkType"] == "IPV4"
assert instance["VpcSecurityGroups"] == [
{"Status": "active", "VpcSecurityGroupId": "sg-12345"}
]
|