File: test_s3_encryption.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (149 lines) | stat: -rw-r--r-- 5,126 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from uuid import uuid4

import boto3
import pytest
from botocore.exceptions import ClientError

from moto import mock_aws


@mock_aws
def test_encryption_on_new_bucket_fails():
    conn = boto3.client("s3", region_name="us-east-1")
    conn.create_bucket(Bucket="mybucket")

    with pytest.raises(ClientError) as exc:
        conn.get_bucket_encryption(Bucket="mybucket")
    err = exc.value.response["Error"]
    assert err["Code"] == "ServerSideEncryptionConfigurationNotFoundError"
    assert err["Message"] == "The server side encryption configuration was not found"
    assert err["BucketName"] == "mybucket"


@mock_aws
def test_put_and_get_encryption():
    # Create Bucket so that test can run
    conn = boto3.client("s3", region_name="us-east-1")
    conn.create_bucket(Bucket="mybucket")

    sse_config = {
        "Rules": [
            {
                "ApplyServerSideEncryptionByDefault": {
                    "SSEAlgorithm": "aws:kms",
                    "KMSMasterKeyID": "12345678",
                }
            }
        ]
    }

    conn.put_bucket_encryption(
        Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
    )

    resp = conn.get_bucket_encryption(Bucket="mybucket")
    assert "ServerSideEncryptionConfiguration" in resp
    return_config = sse_config.copy()
    return_config["Rules"][0]["BucketKeyEnabled"] = False
    assert resp["ServerSideEncryptionConfiguration"] == return_config


@mock_aws
def test_delete_and_get_encryption():
    # Create Bucket so that test can run
    conn = boto3.client("s3", region_name="us-east-1")
    conn.create_bucket(Bucket="mybucket")

    sse_config = {
        "Rules": [
            {
                "ApplyServerSideEncryptionByDefault": {
                    "SSEAlgorithm": "aws:kms",
                    "KMSMasterKeyID": "12345678",
                }
            }
        ]
    }

    conn.put_bucket_encryption(
        Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
    )

    conn.delete_bucket_encryption(Bucket="mybucket")
    # GET now fails, after deleting it, as it no longer exists
    with pytest.raises(ClientError) as exc:
        conn.get_bucket_encryption(Bucket="mybucket")
    err = exc.value.response["Error"]
    assert err["Code"] == "ServerSideEncryptionConfigurationNotFoundError"


@mock_aws
def test_encryption_status_on_new_objects():
    bucket_name = str(uuid4())
    s3_client = boto3.client("s3", region_name="us-east-1")
    s3_client.create_bucket(Bucket=bucket_name)
    s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
    # verify encryption status on object itself
    res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
    assert "ServerSideEncryption" not in res
    # enable encryption
    sse_config = {
        "Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
    }
    s3_client.put_bucket_encryption(
        Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
    )
    # verify encryption status on existing object hasn't changed
    res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
    assert "ServerSideEncryption" not in res
    # create object2
    s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt")
    # verify encryption status on object2
    res = s3_client.get_object(Bucket=bucket_name, Key="file2.txt")
    assert res["ServerSideEncryption"] == "AES256"


@mock_aws
def test_encryption_status_on_copied_objects():
    bucket_name = str(uuid4())
    s3_client = boto3.client("s3", region_name="us-east-1")
    s3_client.create_bucket(Bucket=bucket_name)
    s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
    # enable encryption
    sse_config = {
        "Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
    }
    s3_client.put_bucket_encryption(
        Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
    )
    # copy object
    s3_client.copy_object(
        CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt"
    )
    # verify encryption status on object1 hasn't changed
    res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
    assert "ServerSideEncryption" not in res
    # verify encryption status on object2 does have encryption
    res = s3_client.get_object(Bucket=bucket_name, Key="file2.txt")
    assert res["ServerSideEncryption"] == "AES256"


@mock_aws
def test_encryption_bucket_key_for_aes_not_returned():
    bucket_name = str(uuid4())
    s3_client = boto3.client("s3", region_name="us-east-1")
    s3_client.create_bucket(Bucket=bucket_name)
    # enable encryption
    sse_config = {
        "Rules": [
            {
                "ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"},
                "BucketKeyEnabled": False,
            }
        ]
    }
    s3_client.put_bucket_encryption(
        Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
    )
    res = s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
    assert "BucketKeyEnabled" not in res