1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
import json
from unittest import SkipTest
from uuid import uuid4
import requests
from moto import settings
"""
Test the different server responses
Docs:
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Appendix.APIv20111205.html
"""
def test_table_list():
if not settings.TEST_SERVER_MODE:
raise SkipTest("Only run test with external server")
headers = {
"X-Amz-Target": "DynamoDB_20111205.ListTables",
"AUTHORIZATION": "AWS4-HMAC-SHA256 Credential=ACCESS_KEY/20220226/us-east-1/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=sig",
}
requests.post(settings.test_server_mode_endpoint() + "/moto-api/reset")
res = requests.get(settings.test_server_mode_endpoint(), headers=headers)
assert res.status_code == 200
assert json.loads(res.content) == {"TableNames": []}
def test_create_table():
if not settings.TEST_SERVER_MODE:
raise SkipTest("Only run test with external server")
table_name = str(uuid4())
headers = {
"X-Amz-Target": "DynamoDB_20111205.CreateTable",
"Content-Type": "application/x-amz-json-1.0",
"AUTHORIZATION": "AWS4-HMAC-SHA256 Credential=ACCESS_KEY/20220226/us-east-1/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=sig",
}
request_body = {
"TableName": table_name,
"KeySchema": {
"HashKeyElement": {"AttributeName": "hkey", "AttributeType": "S"}
},
"ProvisionedThroughput": {"ReadCapacityUnits": 5, "WriteCapacityUnits": 10},
}
res = requests.post(
settings.test_server_mode_endpoint(), headers=headers, json=request_body
)
res = json.loads(res.content)["Table"]
assert "CreationDateTime" in res
del res["CreationDateTime"]
assert res == {
"KeySchema": {
"HashKeyElement": {"AttributeName": "hkey", "AttributeType": "S"}
},
"ProvisionedThroughput": {"ReadCapacityUnits": 5, "WriteCapacityUnits": 10},
"TableName": table_name,
"TableStatus": "ACTIVE",
"ItemCount": 0,
"TableSizeBytes": 0,
}
headers["X-Amz-Target"] = "DynamoDB_20111205.ListTables"
res = requests.get(settings.test_server_mode_endpoint(), headers=headers)
res = json.loads(res.content)
assert table_name in res["TableNames"]
|