File: responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (143 lines) | stat: -rw-r--r-- 5,587 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Handles incoming emrcontainers requests, invokes methods, returns responses."""

import json

from moto.core.common_types import TYPE_RESPONSE
from moto.core.responses import BaseResponse

from .models import EMRContainersBackend, emrcontainers_backends

DEFAULT_MAX_RESULTS = 100
DEFAULT_NEXT_TOKEN = ""
DEFAULT_CONTAINER_PROVIDER_TYPE = "EKS"


class EMRContainersResponse(BaseResponse):
    """Handler for EMRContainers requests and responses."""

    def __init__(self) -> None:
        super().__init__(service_name="emr-containers")

    @property
    def emrcontainers_backend(self) -> EMRContainersBackend:
        """Return backend instance specific for this region."""
        return emrcontainers_backends[self.current_account][self.region]

    def create_virtual_cluster(self) -> TYPE_RESPONSE:
        name = self._get_param("name")
        container_provider = self._get_param("containerProvider")
        client_token = self._get_param("clientToken")
        tags = self._get_param("tags")

        virtual_cluster = self.emrcontainers_backend.create_virtual_cluster(
            name=name,
            container_provider=container_provider,
            client_token=client_token,
            tags=tags,
        )
        return 200, {}, json.dumps(dict(virtual_cluster))

    def delete_virtual_cluster(self) -> TYPE_RESPONSE:
        cluster_id = self._get_param("virtualClusterId")

        virtual_cluster = self.emrcontainers_backend.delete_virtual_cluster(
            cluster_id=cluster_id
        )
        return 200, {}, json.dumps(dict(virtual_cluster))

    def describe_virtual_cluster(self) -> TYPE_RESPONSE:
        cluster_id = self._get_param("virtualClusterId")

        virtual_cluster = self.emrcontainers_backend.describe_virtual_cluster(
            cluster_id=cluster_id
        )
        response = {"virtualCluster": virtual_cluster}
        return 200, {}, json.dumps(response)

    def list_virtual_clusters(self) -> TYPE_RESPONSE:
        container_provider_id = self._get_param("containerProviderId")
        container_provider_type = self._get_param(
            "containerProviderType", DEFAULT_CONTAINER_PROVIDER_TYPE
        )
        created_after = self._get_param("createdAfter")
        created_before = self._get_param("createdBefore")
        states = self.querystring.get("states", [])
        max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
        next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)

        virtual_clusters, next_token = self.emrcontainers_backend.list_virtual_clusters(
            container_provider_id=container_provider_id,
            container_provider_type=container_provider_type,
            created_after=created_after,
            created_before=created_before,
            states=states,
            max_results=max_results,
            next_token=next_token,
        )

        response = {"virtualClusters": virtual_clusters, "nextToken": next_token}
        return 200, {}, json.dumps(response)

    def start_job_run(self) -> TYPE_RESPONSE:
        name = self._get_param("name")
        virtual_cluster_id = self._get_param("virtualClusterId")
        client_token = self._get_param("clientToken")
        execution_role_arn = self._get_param("executionRoleArn")
        release_label = self._get_param("releaseLabel")
        job_driver = self._get_param("jobDriver")
        configuration_overrides = self._get_param("configurationOverrides")
        tags = self._get_param("tags")

        job = self.emrcontainers_backend.start_job_run(
            name=name,
            virtual_cluster_id=virtual_cluster_id,
            client_token=client_token,
            execution_role_arn=execution_role_arn,
            release_label=release_label,
            job_driver=job_driver,
            configuration_overrides=configuration_overrides,
            tags=tags,
        )
        return 200, {}, json.dumps(dict(job))

    def cancel_job_run(self) -> TYPE_RESPONSE:
        job_id = self._get_param("jobRunId")
        virtual_cluster_id = self._get_param("virtualClusterId")

        job = self.emrcontainers_backend.cancel_job_run(
            job_id=job_id, virtual_cluster_id=virtual_cluster_id
        )
        return 200, {}, json.dumps(dict(job))

    def list_job_runs(self) -> TYPE_RESPONSE:
        virtual_cluster_id = self._get_param("virtualClusterId")
        created_before = self._get_param("createdBefore")
        created_after = self._get_param("createdAfter")
        name = self._get_param("name")
        states = self.querystring.get("states", [])
        max_results = self._get_int_param("maxResults", DEFAULT_MAX_RESULTS)
        next_token = self._get_param("nextToken", DEFAULT_NEXT_TOKEN)

        job_runs, next_token = self.emrcontainers_backend.list_job_runs(
            virtual_cluster_id=virtual_cluster_id,
            created_before=created_before,
            created_after=created_after,
            name=name,
            states=states,
            max_results=max_results,
            next_token=next_token,
        )

        response = {"jobRuns": job_runs, "nextToken": next_token}
        return 200, {}, json.dumps(response)

    def describe_job_run(self) -> TYPE_RESPONSE:
        job_id = self._get_param("jobRunId")
        virtual_cluster_id = self._get_param("virtualClusterId")

        job_run = self.emrcontainers_backend.describe_job_run(
            job_id=job_id, virtual_cluster_id=virtual_cluster_id
        )

        response = {"jobRun": job_run}
        return 200, {}, json.dumps(response)