1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
from uuid import uuid4
import boto3
from moto import mock_aws
@mock_aws
def test_update_table__billing_mode():
client = boto3.client("dynamodb", region_name="us-east-1")
table_name = f"T{uuid4()}"
client.create_table(
TableName=table_name,
AttributeDefinitions=[
{"AttributeName": "client", "AttributeType": "S"},
{"AttributeName": "app", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "client", "KeyType": "HASH"},
{"AttributeName": "app", "KeyType": "RANGE"},
],
BillingMode="PAY_PER_REQUEST",
)
client.update_table(
TableName=table_name,
BillingMode="PROVISIONED",
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
)
actual = client.describe_table(TableName=table_name)["Table"]
assert actual["BillingModeSummary"] == {"BillingMode": "PROVISIONED"}
assert actual["ProvisionedThroughput"] == {
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1,
}
@mock_aws
def test_update_table_throughput():
conn = boto3.resource("dynamodb", region_name="us-west-2")
table = conn.create_table(
TableName=f"T{uuid4()}",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
assert table.provisioned_throughput["ReadCapacityUnits"] == 5
assert table.provisioned_throughput["WriteCapacityUnits"] == 5
table.update(
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 6}
)
assert table.provisioned_throughput["ReadCapacityUnits"] == 5
assert table.provisioned_throughput["WriteCapacityUnits"] == 6
@mock_aws
def test_update_table_deletion_protection_enabled():
conn = boto3.resource("dynamodb", region_name="us-west-2")
table = conn.create_table(
TableName=f"T{uuid4()}",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
DeletionProtectionEnabled=False,
)
assert not table.deletion_protection_enabled
table.update(DeletionProtectionEnabled=True)
assert table.deletion_protection_enabled
@mock_aws
def test_update_table_deletion_protection_disabled():
conn = boto3.resource("dynamodb", region_name="us-west-2")
table = conn.create_table(
TableName=f"T{uuid4()}",
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
DeletionProtectionEnabled=True,
)
assert table.deletion_protection_enabled
table.update(DeletionProtectionEnabled=False)
assert not table.deletion_protection_enabled
@mock_aws
def test_update_table__enable_stream():
conn = boto3.client("dynamodb", region_name="us-east-1")
table_name = f"T{uuid4()}"
resp = conn.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
)
assert "StreamSpecification" not in resp["TableDescription"]
resp = conn.update_table(
TableName=table_name,
StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},
)
assert "StreamSpecification" in resp["TableDescription"]
assert resp["TableDescription"]["StreamSpecification"] == {
"StreamEnabled": True,
"StreamViewType": "NEW_IMAGE",
}
assert "LatestStreamLabel" in resp["TableDescription"]
assert "LatestStreamArn" in resp["TableDescription"]
@mock_aws
def test_update_table_warm_throughput():
client = boto3.client("dynamodb", region_name="us-east-1")
table_name = f"T{uuid4()}"
client.create_table(
TableName=table_name,
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
BillingMode="PAY_PER_REQUEST",
)
client.update_table(
TableName=table_name,
WarmThroughput={
"ReadUnitsPerSecond": 15000,
"WriteUnitsPerSecond": 5000,
},
)
table = client.describe_table(TableName=table_name)["Table"]
assert (
table["WarmThroughput"]
== {
"ReadUnitsPerSecond": 15000,
"Status": "ACTIVE",
"WriteUnitsPerSecond": 5000,
}
or table["WarmThroughput"]["Status"] == "UPDATING"
)
|