File: s3_lib.py

package info (click to toggle)
aptly 1.6.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 49,928 kB
  • sloc: python: 10,398; sh: 252; makefile: 184
file content (92 lines) | stat: -rw-r--r-- 2,866 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from lib import BaseTest
import uuid
import os

try:
    import boto

    if 'AWS_SECRET_ACCESS_KEY' in os.environ and 'AWS_ACCESS_KEY_ID' in os.environ and \
       os.environ['AWS_SECRET_ACCESS_KEY'] != "" and os.environ['AWS_ACCESS_KEY_ID'] != "":
        s3_conn = boto.connect_s3()
    else:
        print("S3 tests disabled: AWS creds not found in the environment (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)")
        s3_conn = None
except ImportError as e:
    print("S3 tests disabled: can't import boto", e)
    s3_conn = None


class S3Test(BaseTest):
    """
    BaseTest + support for S3
    """

    s3Overrides = {}

    def fixture_available(self):
        return super(S3Test, self).fixture_available() and \
                'AWS_SECRET_ACCESS_KEY' in os.environ and 'AWS_ACCESS_KEY_ID' in os.environ and \
                os.environ['AWS_SECRET_ACCESS_KEY'] != "" and os.environ['AWS_ACCESS_KEY_ID'] != ""

    def prepare(self):
        self.bucket_name = "aptly-sys-test-" + str(uuid.uuid1())
        self.bucket = s3_conn.create_bucket(self.bucket_name)
        self.configOverride = {"S3PublishEndpoints": {
            "test1": {
                "region": "us-east-1",
                "bucket": self.bucket_name,
                "awsAccessKeyID": os.environ["AWS_ACCESS_KEY_ID"],
                "awsSecretAccessKey": os.environ["AWS_SECRET_ACCESS_KEY"]
            }
        }}

        self.configOverride["S3PublishEndpoints"]["test1"].update(**self.s3Overrides)

        super(S3Test, self).prepare()

    def shutdown(self):
        if hasattr(self, "bucket_name"):
            if hasattr(self, "bucket"):
                keys = self.bucket.list()
                if keys:
                    self.bucket.delete_keys(keys)
            s3_conn.delete_bucket(self.bucket_name)

        super(S3Test, self).shutdown()

    def check_path(self, path):
        if not hasattr(self, "bucket_contents"):
            self.bucket_contents = [key.name for key in self.bucket.list()]

        if path.startswith("public/"):
            path = path[7:]

        if path in self.bucket_contents:
            return True

        if not path.endswith("/"):
            path = path + "/"

        for item in self.bucket_contents:
            if item.startswith(path):
                return True

        return False

    def check_exists(self, path):
        if not self.check_path(path):
            raise Exception("path %s doesn't exist" % (path, ))

    def check_not_exists(self, path):
        if self.check_path(path):
            raise Exception("path %s exists" % (path, ))

    def read_file(self, path, mode=''):
        # We don't support reading as binary here.
        assert not mode

        if path.startswith("public/"):
            path = path[7:]

        key = self.bucket.get_key(path)
        return key.get_contents_as_string()