File: responses.py

package info (click to toggle)
python-moto 5.1.18-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 116,520 kB
  • sloc: python: 636,725; javascript: 181; makefile: 39; sh: 3
file content (122 lines) | stat: -rw-r--r-- 4,321 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import json
from urllib.parse import unquote

from moto.core.responses import BaseResponse

from .models import GuardDutyBackend, guardduty_backends


class GuardDutyResponse(BaseResponse):
    def __init__(self) -> None:
        super().__init__(service_name="guardduty")

    @property
    def guardduty_backend(self) -> GuardDutyBackend:
        return guardduty_backends[self.current_account][self.region]

    def create_filter(self) -> str:
        detector_id = self.path.split("/")[-2]
        name = self._get_param("name")
        action = self._get_param("action")
        description = self._get_param("description")
        finding_criteria = self._get_param("findingCriteria")
        rank = self._get_param("rank")

        self.guardduty_backend.create_filter(
            detector_id, name, action, description, finding_criteria, rank
        )
        return json.dumps({"name": name})

    def create_detector(self) -> str:
        enable = self._get_param("enable")
        finding_publishing_frequency = self._get_param("findingPublishingFrequency")
        data_sources = self._get_param("dataSources")
        tags = self._get_param("tags")
        features = self._get_param("features")

        detector_id = self.guardduty_backend.create_detector(
            enable, finding_publishing_frequency, data_sources, tags, features
        )

        return json.dumps({"detectorId": detector_id})

    def delete_detector(self) -> str:
        detector_id = self.path.split("/")[-1]

        self.guardduty_backend.delete_detector(detector_id)
        return "{}"

    def delete_filter(self) -> str:
        detector_id = self.path.split("/")[-3]
        filter_name = unquote(self.path.split("/")[-1])

        self.guardduty_backend.delete_filter(detector_id, filter_name)
        return "{}"

    def enable_organization_admin_account(self) -> str:
        admin_account = self._get_param("adminAccountId")
        self.guardduty_backend.enable_organization_admin_account(admin_account)

        return "{}"

    def list_organization_admin_accounts(self) -> str:
        account_ids = self.guardduty_backend.list_organization_admin_accounts()
        accounts = [
            {"adminAccountId": a, "adminStatus": "ENABLED"} for a in account_ids
        ]

        return json.dumps({"adminAccounts": accounts})

    def list_detectors(self) -> str:
        detector_ids = self.guardduty_backend.list_detectors()

        return json.dumps({"detectorIds": detector_ids})

    def get_detector(self) -> str:
        detector_id = self.path.split("/")[-1]

        detector = self.guardduty_backend.get_detector(detector_id)
        return json.dumps(detector.to_json())

    def get_filter(self) -> str:
        detector_id = self.path.split("/")[-3]
        filter_name = unquote(self.path.split("/")[-1])

        _filter = self.guardduty_backend.get_filter(detector_id, filter_name)
        return json.dumps(_filter.to_json())

    def update_detector(self) -> str:
        detector_id = self.path.split("/")[-1]
        enable = self._get_param("enable")
        finding_publishing_frequency = self._get_param("findingPublishingFrequency")
        data_sources = self._get_param("dataSources")
        features = self._get_param("features")

        self.guardduty_backend.update_detector(
            detector_id, enable, finding_publishing_frequency, data_sources, features
        )
        return "{}"

    def update_filter(self) -> str:
        detector_id = self.path.split("/")[-3]
        filter_name = unquote(self.path.split("/")[-1])
        action = self._get_param("action")
        description = self._get_param("description")
        finding_criteria = self._get_param("findingCriteria")
        rank = self._get_param("rank")

        self.guardduty_backend.update_filter(
            detector_id,
            filter_name,
            action=action,
            description=description,
            finding_criteria=finding_criteria,
            rank=rank,
        )
        return json.dumps({"name": filter_name})

    def get_administrator_account(self) -> str:
        """Get administrator account details."""
        detector_id = self.path.split("/")[-2]
        result = self.guardduty_backend.get_administrator_account(detector_id)
        return json.dumps(result)