File: responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (111 lines) | stat: -rw-r--r-- 4,271 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
from typing import Any

from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse

from .models import GlacierBackend, glacier_backends
from .utils import vault_from_glacier_url


class GlacierResponse(BaseResponse):
    def __init__(self) -> None:
        super().__init__(service_name="glacier")

    def setup_class(
        self, request: Any, full_url: str, headers: Any, use_raw_body: bool = False
    ) -> None:
        super().setup_class(request, full_url, headers, use_raw_body=True)

    @property
    def glacier_backend(self) -> GlacierBackend:
        return glacier_backends[self.current_account][self.region]

    def list_vaults(self) -> TYPE_RESPONSE:
        vaults = self.glacier_backend.list_vaults()
        response = json.dumps(
            {"Marker": None, "VaultList": [vault.to_dict() for vault in vaults]}
        )

        headers = {"content-type": "application/json"}
        return 200, headers, response

    def describe_vault(self) -> TYPE_RESPONSE:
        vault_name = vault_from_glacier_url(self.uri)
        vault = self.glacier_backend.describe_vault(vault_name)
        headers = {"content-type": "application/json"}
        return 200, headers, json.dumps(vault.to_dict())

    def create_vault(self) -> TYPE_RESPONSE:
        vault_name = vault_from_glacier_url(self.uri)
        self.glacier_backend.create_vault(vault_name)
        return 201, {"status": 201}, ""

    def delete_vault(self) -> TYPE_RESPONSE:
        vault_name = vault_from_glacier_url(self.uri)
        self.glacier_backend.delete_vault(vault_name)
        return 204, {"status": 204}, ""

    def upload_archive(self) -> TYPE_RESPONSE:
        description = self.headers.get("x-amz-archive-description") or ""
        vault_name = self.parsed_url.path.split("/")[-2]
        vault = self.glacier_backend.upload_archive(vault_name, self.body, description)
        headers = {
            "x-amz-archive-id": vault["archive_id"],
            "x-amz-sha256-tree-hash": vault["sha256"],
            "status": 201,
        }
        return 201, headers, ""

    def delete_archive(self) -> TYPE_RESPONSE:
        vault_name = self.parsed_url.path.split("/")[-3]
        archive_id = self.parsed_url.path.split("/")[-1]

        self.glacier_backend.delete_archive(vault_name, archive_id)
        return 204, {"status": 204}, ""

    def list_jobs(self) -> TYPE_RESPONSE:
        vault_name = self.parsed_url.path.split("/")[-2]
        jobs = self.glacier_backend.list_jobs(vault_name)
        headers = {"content-type": "application/json"}
        response = json.dumps(
            {"JobList": [job.to_dict() for job in jobs], "Marker": None}
        )
        return 200, headers, response

    def initiate_job(self) -> TYPE_RESPONSE:
        account_id = self.uri.split("/")[1]
        vault_name = self.parsed_url.path.split("/")[-2]
        json_body = json.loads(self.body.decode("utf-8"))
        job_type = json_body["Type"]
        archive_id = json_body.get("ArchiveId")
        tier = json_body.get("Tier") or "Standard"
        job_id = self.glacier_backend.initiate_job(
            vault_name, job_type, tier, archive_id
        )
        headers = {
            "x-amz-job-id": job_id,
            "Location": f"/{account_id}/vaults/{vault_name}/jobs/{job_id}",
            "status": 202,
        }
        return 202, headers, ""

    def describe_job(self) -> str:
        vault_name = self.parsed_url.path.split("/")[-3]
        archive_id = self.parsed_url.path.split("/")[-1]

        job = self.glacier_backend.describe_job(vault_name, archive_id)
        return json.dumps(job.to_dict())  # type: ignore

    def get_job_output(self) -> TYPE_RESPONSE:
        vault_name = self.parsed_url.path.split("/")[-4]
        job_id = self.parsed_url.path.split("/")[-2]
        output = self.glacier_backend.get_job_output(vault_name, job_id)
        if output is None:
            return 404, {"status": 404}, "404 Not Found"
        if isinstance(output, dict):
            headers = {"content-type": "application/json"}
            return 200, headers, json.dumps(output)
        else:
            headers = {"content-type": "application/octet-stream"}
            return 200, headers, output