1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
from uuid import uuid4
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_aws
@mock_aws
def test_encryption_on_new_bucket_fails():
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
assert err["Code"] == "ServerSideEncryptionConfigurationNotFoundError"
assert err["Message"] == "The server side encryption configuration was not found"
assert err["BucketName"] == "mybucket"
@mock_aws
def test_put_and_get_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
resp = conn.get_bucket_encryption(Bucket="mybucket")
assert "ServerSideEncryptionConfiguration" in resp
return_config = sse_config.copy()
return_config["Rules"][0]["BucketKeyEnabled"] = False
assert resp["ServerSideEncryptionConfiguration"] == return_config
@mock_aws
def test_delete_and_get_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
conn.delete_bucket_encryption(Bucket="mybucket")
# GET now fails, after deleting it, as it no longer exists
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
assert err["Code"] == "ServerSideEncryptionConfigurationNotFoundError"
@mock_aws
def test_encryption_status_on_new_objects():
bucket_name = str(uuid4())
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
# verify encryption status on object itself
res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
assert "ServerSideEncryption" not in res
# enable encryption
sse_config = {
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
}
s3_client.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# verify encryption status on existing object hasn't changed
res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
assert "ServerSideEncryption" not in res
# create object2
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt")
# verify encryption status on object2
res = s3_client.get_object(Bucket=bucket_name, Key="file2.txt")
assert res["ServerSideEncryption"] == "AES256"
@mock_aws
def test_encryption_status_on_copied_objects():
bucket_name = str(uuid4())
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
# enable encryption
sse_config = {
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
}
s3_client.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# copy object
s3_client.copy_object(
CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt"
)
# verify encryption status on object1 hasn't changed
res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
assert "ServerSideEncryption" not in res
# verify encryption status on object2 does have encryption
res = s3_client.get_object(Bucket=bucket_name, Key="file2.txt")
assert res["ServerSideEncryption"] == "AES256"
@mock_aws
def test_encryption_bucket_key_for_aes_not_returned():
bucket_name = str(uuid4())
s3_client = boto3.client("s3", region_name="us-east-1")
s3_client.create_bucket(Bucket=bucket_name)
# enable encryption
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"},
"BucketKeyEnabled": False,
}
]
}
s3_client.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
res = s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
assert "BucketKeyEnabled" not in res
|