1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from datetime import datetime, timedelta
from dateutil import parser, tz
from awscli.customizations.cloudtrail import utils
from awscli.customizations.cloudtrail.utils import (
PublicKeyProvider,
format_date,
normalize_date,
parse_date,
)
from awscli.testutils import mock, unittest
START_DATE = parser.parse("20140810T000000Z")
class TestCloudTrailUtils(unittest.TestCase):
def test_gets_account_id_from_arn(self):
arn = "foo:bar:baz:qux:1234"
self.assertEqual("1234", utils.get_account_id_from_arn(arn))
def test_gets_trail_by_arn(self):
cloudtrail_client = mock.Mock()
cloudtrail_client.describe_trails.return_value = {
"trailList": [
{"TrailARN": "a", "Foo": "Baz"},
{"TrailARN": "b", "Foo": "Bar"},
]
}
result = utils.get_trail_by_arn(cloudtrail_client, "b")
self.assertEqual("Bar", result["Foo"])
def test_throws_when_unable_to_get_trail_by_arn(self):
cloudtrail_client = mock.Mock()
cloudtrail_client.describe_trails.return_value = {"trailList": []}
self.assertRaises(
ValueError, utils.get_trail_by_arn, cloudtrail_client, "b"
)
def test_formats_dates(self):
date = datetime(2015, 8, 21, tzinfo=tz.tzutc())
self.assertEqual("20150821T000000Z", format_date(date))
def test_parses_dates_with_better_error_message(self):
try:
parse_date("foo")
self.fail("Should have failed to parse")
except ValueError as e:
self.assertIn("Unable to parse date value: foo", str(e))
def test_parses_dates(self):
date = parse_date("August 25, 2015 00:00:00 UTC")
self.assertEqual(date, datetime(2015, 8, 25, tzinfo=tz.tzutc()))
def test_normalizes_date_timezones(self):
date = datetime(2015, 8, 21, tzinfo=tz.tzlocal())
normalized = normalize_date(date)
self.assertEqual(tz.tzutc(), normalized.tzinfo)
class TestPublicKeyProvider(unittest.TestCase):
def test_returns_public_keys_in_range(self):
cloudtrail_client = mock.Mock()
cloudtrail_client.list_public_keys.return_value = {
"PublicKeyList": [
{"Fingerprint": "a", "OtherData": "a", "Value": "a"},
{"Fingerprint": "b", "OtherData": "b", "Value": "b"},
{"Fingerprint": "c", "OtherData": "c", "Value": "c"},
]
}
provider = PublicKeyProvider(cloudtrail_client)
start_date = START_DATE
end_date = start_date + timedelta(days=2)
keys = provider.get_public_keys(start_date, end_date)
self.assertEqual(
{
"a": {"Fingerprint": "a", "OtherData": "a", "Value": "a"},
"b": {"Fingerprint": "b", "OtherData": "b", "Value": "b"},
"c": {"Fingerprint": "c", "OtherData": "c", "Value": "c"},
},
keys,
)
cloudtrail_client.list_public_keys.assert_has_calls(
[mock.call(EndTime=end_date, StartTime=start_date)]
)
def test_returns_public_key_in_range(self):
cloudtrail_client = mock.Mock()
cloudtrail_client.list_public_keys.return_value = {
"PublicKeyList": [
{"Fingerprint": "a", "OtherData": "a1", "Value": "a2"},
{"Fingerprint": "b", "OtherData": "b1", "Value": "b2"},
{"Fingerprint": "c", "OtherData": "c1", "Value": "c2"},
]
}
provider = PublicKeyProvider(cloudtrail_client)
start_date = parser.parse("20140810T000000Z")
public_key = provider.get_public_key(start_date, "c")
self.assertEqual("c2", public_key)
def test_key_not_found(self):
with self.assertRaises(RuntimeError):
cloudtrail_client = mock.Mock()
cloudtrail_client.list_public_keys.return_value = {
"PublicKeyList": [
{"Fingerprint": "123", "OtherData": "456", "Value": "789"},
]
}
provider = PublicKeyProvider(cloudtrail_client)
start_date = parser.parse("20140810T000000Z")
provider.get_public_key(start_date, "c")
|