File: test_cloudtrail.py

package info (click to toggle)
python-boto 2.49.0-4.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 9,888 kB
  • sloc: python: 86,396; makefile: 112
file content (91 lines) | stat: -rw-r--r-- 3,057 bytes parent folder | download | duplicates (12)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import boto

from time import time
from tests.compat import unittest

DEFAULT_S3_POLICY = """{
    "Version": "2012-10-17",
    "Statement": [
                {
                    "Sid": "AWSCloudTrailAclCheck20131101",
                    "Effect": "Allow",
                    "Principal": {
                "AWS": [
                "arn:aws:iam::086441151436:root",
                "arn:aws:iam::113285607260:root"
            ]
                    },
                    "Action": "s3:GetBucketAcl",
                    "Resource": "arn:aws:s3:::<BucketName>"
                },
                {
                    "Sid": "AWSCloudTrailWrite20131101",
                    "Effect": "Allow",
                    "Principal": {
                "AWS": [
                "arn:aws:iam::086441151436:root",
                "arn:aws:iam::113285607260:root"
            ]
                    },
                    "Action": "s3:PutObject",
                    "Resource": "arn:aws:s3:::<BucketName>/<Prefix>/AWSLogs/<CustomerAccountID>/*",
                    "Condition": {
                        "StringEquals": {
                                    "s3:x-amz-acl": "bucket-owner-full-control"
                        }
                    }
                }
    ]
}"""

class TestCloudTrail(unittest.TestCase):
    def test_cloudtrail(self):
        cloudtrail = boto.connect_cloudtrail()

        # Don't delete existing customer data!
        res = cloudtrail.describe_trails()
        if len(res['trailList']):
            self.fail('A trail already exists on this account!')

        # Who am I?
        iam = boto.connect_iam()
        response = iam.get_user()
        account_id = response['get_user_response']['get_user_result'] \
                             ['user']['user_id']

        # Setup a new bucket
        s3 = boto.connect_s3()
        bucket_name = 'cloudtrail-integ-{0}'.format(time())
        policy = DEFAULT_S3_POLICY.replace('<BucketName>', bucket_name)\
                                  .replace('<CustomerAccountID>', account_id)\
                                  .replace('<Prefix>/', '')
        b = s3.create_bucket(bucket_name)
        b.set_policy(policy)

        # Setup CloudTrail
        cloudtrail.create_trail(trail={'Name': 'test', 'S3BucketName': bucket_name})

        cloudtrail.update_trail(trail={'Name': 'test', 'IncludeGlobalServiceEvents': False})

        trails = cloudtrail.describe_trails()

        self.assertEqual('test', trails['trailList'][0]['Name'])
        self.assertFalse(trails['trailList'][0]['IncludeGlobalServiceEvents'])

        cloudtrail.start_logging(name='test')

        status = cloudtrail.get_trail_status(name='test')
        self.assertTrue(status['IsLogging'])

        cloudtrail.stop_logging(name='test')

        status = cloudtrail.get_trail_status(name='test')
        self.assertFalse(status['IsLogging'])

        # Clean up
        cloudtrail.delete_trail(name='test')

        for key in b.list():
            key.delete()

        s3.delete_bucket(bucket_name)