1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
from uuid import uuid4
import boto3
import pytest
from . import dynamodb_aws_verified
@pytest.mark.aws_verified
@dynamodb_aws_verified()
def test_batch_write_single_set(table_name=None):
ddb_client = boto3.client("dynamodb", region_name="us-east-1")
ddb_client.transact_write_items(
TransactItems=[
{
"Update": {
"TableName": table_name,
"Key": {"pk": {"S": "test"}},
"UpdateExpression": "SET xxx = :xxx",
"ConditionExpression": "attribute_not_exists(xxx)",
"ExpressionAttributeValues": {":xxx": {"S": "123"}},
}
}
]
)
results = ddb_client.scan(TableName=table_name)["Items"]
assert results == [{"pk": {"S": "test"}, "xxx": {"S": "123"}}]
@pytest.mark.aws_verified
@dynamodb_aws_verified(create_table=False)
def test_batch_write_item_to_multiple_tables():
conn = boto3.resource("dynamodb", region_name="us-west-2")
tables = [f"table-{str(uuid4())}-{i}" for i in range(3)]
for name in tables:
conn.create_table(
TableName=name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
for name in tables:
waiter = boto3.client("dynamodb", "us-west-2").get_waiter("table_exists")
waiter.wait(TableName=name)
try:
conn.batch_write_item(
RequestItems={
tables[0]: [{"PutRequest": {"Item": {"id": "0"}}}],
tables[1]: [{"PutRequest": {"Item": {"id": "1"}}}],
tables[2]: [{"PutRequest": {"Item": {"id": "2"}}}],
}
)
for idx, name in enumerate(tables):
table = conn.Table(name)
res = table.get_item(Key={"id": str(idx)})
assert res["Item"] == {"id": str(idx)}
assert table.scan()["Count"] == 1
conn.batch_write_item(
RequestItems={
tables[0]: [{"DeleteRequest": {"Key": {"id": "0"}}}],
tables[1]: [{"DeleteRequest": {"Key": {"id": "1"}}}],
tables[2]: [{"DeleteRequest": {"Key": {"id": "2"}}}],
}
)
for name in tables:
assert conn.Table(name).scan()["Count"] == 0
finally:
for name in tables:
try:
conn.Table(name).delete()
except Exception as e:
print(f"Failed to delete table {name}") # noqa
print(e) # noqa
@pytest.mark.aws_verified
@dynamodb_aws_verified()
def test_batch_write_using_arn(table_name=None):
ddb_client = boto3.client("dynamodb", region_name="us-east-1")
table_arn = ddb_client.describe_table(TableName=table_name)["Table"]["TableArn"]
ddb_client.transact_write_items(
TransactItems=[
{
"Update": {
"TableName": table_arn,
"Key": {"pk": {"S": "test"}},
"UpdateExpression": "SET xxx = :xxx",
"ExpressionAttributeValues": {":xxx": {"S": "123"}},
}
}
]
)
results = ddb_client.scan(TableName=table_arn)["Items"]
assert results == [{"pk": {"S": "test"}, "xxx": {"S": "123"}}]
items = ddb_client.batch_get_item(
RequestItems={table_arn: {"Keys": [{"pk": {"S": "test"}}]}}
)["Responses"]
assert items == {table_arn: [{"pk": {"S": "test"}, "xxx": {"S": "123"}}]}
|