File: test_dms_replication_instance_state.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (161 lines) | stat: -rw-r--r-- 6,724 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from unittest import SkipTest

import boto3
import pytest

from moto import mock_aws, settings
from moto.moto_api import state_manager


@pytest.fixture(scope="function")
def execution_state_transition():
    state_manager.set_transition(
        model_name="dms::replicationinstance",
        transition={"progression": "manual", "times": 2},
    )
    yield
    # Reset to default - we don't want to affect other tests
    state_manager.set_transition(
        model_name="dms::replicationinstance",
        transition={"progression": "manual", "times": 1},
    )


@mock_aws
def test_describe_replication_instance_manual_transition(execution_state_transition):
    if not settings.TEST_DECORATOR_MODE:
        # We already test the state transitions in ServerMode in other tests
        raise SkipTest(
            "NO point in verifying state transitions when not using the decorator"
        )

    client = boto3.client("dms", region_name="eu-west-1")
    response = client.create_replication_instance(
        ReplicationInstanceIdentifier="test-instance",
        ReplicationInstanceClass="dms.t2.micro",
        AllocatedStorage=50,
        VpcSecurityGroupIds=["sg-12345"],
        AvailabilityZone="us-east-1a",
        ReplicationSubnetGroupIdentifier="default-subnet-group",
        PreferredMaintenanceWindow="sun:06:00-sun:14:00",
        MultiAZ=False,
        EngineVersion="3.4.6",
        AutoMinorVersionUpgrade=True,
        Tags=[{"Key": "Name", "Value": "Test Instance"}],
        PubliclyAccessible=True,
        NetworkType="IPV4",
    )

    instance = response["ReplicationInstance"]
    assert instance["ReplicationInstanceIdentifier"] == "test-instance"
    assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
    assert instance["AllocatedStorage"] == 50
    assert instance["AvailabilityZone"] == "us-east-1a"
    assert instance["ReplicationInstanceStatus"] == "creating"
    assert instance["MultiAZ"] is False
    assert instance["EngineVersion"] == "3.4.6"
    assert instance["AutoMinorVersionUpgrade"] is True
    assert instance["PubliclyAccessible"] is True
    assert instance["NetworkType"] == "IPV4"
    assert instance["VpcSecurityGroups"] == [
        {"Status": "active", "VpcSecurityGroupId": "sg-12345"}
    ]

    # first call status should stay creating
    response = client.describe_replication_instances(
        Filters=[{"Name": "replication-instance-id", "Values": ["test-instance"]}]
    )
    assert len(response["ReplicationInstances"]) == 1
    instance = response["ReplicationInstances"][0]
    assert instance["ReplicationInstanceIdentifier"] == "test-instance"
    assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
    assert instance["AllocatedStorage"] == 50
    assert instance["AvailabilityZone"] == "us-east-1a"
    assert instance["ReplicationInstanceStatus"] == "creating"
    assert instance["MultiAZ"] is False
    assert instance["EngineVersion"] == "3.4.6"
    assert instance["AutoMinorVersionUpgrade"] is True
    assert instance["PubliclyAccessible"] is True
    assert instance["NetworkType"] == "IPV4"
    assert instance["VpcSecurityGroups"] == [
        {"Status": "active", "VpcSecurityGroupId": "sg-12345"}
    ]

    # Second call should update the status to available
    response = client.describe_replication_instances(
        Filters=[{"Name": "replication-instance-id", "Values": ["test-instance"]}]
    )
    assert len(response["ReplicationInstances"]) == 1
    instance = response["ReplicationInstances"][0]
    assert instance["ReplicationInstanceIdentifier"] == "test-instance"
    assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
    assert instance["AllocatedStorage"] == 50
    assert instance["AvailabilityZone"] == "us-east-1a"
    assert instance["ReplicationInstanceStatus"] == "available"
    assert instance["MultiAZ"] is False
    assert instance["EngineVersion"] == "3.4.6"
    assert instance["AutoMinorVersionUpgrade"] is True
    assert instance["PubliclyAccessible"] is True
    assert instance["NetworkType"] == "IPV4"
    assert instance["VpcSecurityGroups"] == [
        {"Status": "active", "VpcSecurityGroupId": "sg-12345"}
    ]


@mock_aws
def test_replication_instance_without_transition():
    """
    Default state transition is 'manual', with 1 times  meaning that the first time we call describe_replication_instance the status advances
    """

    client = boto3.client("dms", region_name="eu-west-1")
    response = client.create_replication_instance(
        ReplicationInstanceIdentifier="test-instance",
        ReplicationInstanceClass="dms.t2.micro",
        AllocatedStorage=50,
        VpcSecurityGroupIds=["sg-12345"],
        AvailabilityZone="us-east-1a",
        ReplicationSubnetGroupIdentifier="default-subnet-group",
        PreferredMaintenanceWindow="sun:06:00-sun:14:00",
        MultiAZ=False,
        EngineVersion="3.4.6",
        AutoMinorVersionUpgrade=True,
        Tags=[{"Key": "Name", "Value": "Test Instance"}],
        PubliclyAccessible=True,
        NetworkType="IPV4",
    )

    instance = response["ReplicationInstance"]
    assert instance["ReplicationInstanceIdentifier"] == "test-instance"
    assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
    assert instance["AllocatedStorage"] == 50
    assert instance["AvailabilityZone"] == "us-east-1a"
    assert instance["ReplicationInstanceStatus"] == "creating"
    assert instance["MultiAZ"] is False
    assert instance["EngineVersion"] == "3.4.6"
    assert instance["AutoMinorVersionUpgrade"] is True
    assert instance["PubliclyAccessible"] is True
    assert instance["NetworkType"] == "IPV4"
    assert instance["VpcSecurityGroups"] == [
        {"Status": "active", "VpcSecurityGroupId": "sg-12345"}
    ]

    # first call status should advance the state
    response = client.describe_replication_instances(
        Filters=[{"Name": "replication-instance-id", "Values": ["test-instance"]}]
    )
    assert len(response["ReplicationInstances"]) == 1
    instance = response["ReplicationInstances"][0]
    assert instance["ReplicationInstanceIdentifier"] == "test-instance"
    assert instance["ReplicationInstanceClass"] == "dms.t2.micro"
    assert instance["AllocatedStorage"] == 50
    assert instance["AvailabilityZone"] == "us-east-1a"
    assert instance["ReplicationInstanceStatus"] == "available"
    assert instance["MultiAZ"] is False
    assert instance["EngineVersion"] == "3.4.6"
    assert instance["AutoMinorVersionUpgrade"] is True
    assert instance["PubliclyAccessible"] is True
    assert instance["NetworkType"] == "IPV4"
    assert instance["VpcSecurityGroups"] == [
        {"Status": "active", "VpcSecurityGroupId": "sg-12345"}
    ]