1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
"""Handles incoming securityhub requests, invokes methods, returns responses."""
import json
from moto.core.responses import BaseResponse
from .models import SecurityHubBackend, securityhub_backends
class SecurityHubResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="securityhub")
@property
def securityhub_backend(self) -> SecurityHubBackend:
return securityhub_backends[self.current_account][self.region]
def enable_security_hub(self) -> str:
params = json.loads(self.body) if self.body else {}
enable_default_standards = params.get("EnableDefaultStandards", True)
tags = params.get("Tags", {})
self.securityhub_backend.enable_security_hub(
enable_default_standards=enable_default_standards,
tags=tags,
)
return json.dumps({})
def disable_security_hub(self) -> str:
self.securityhub_backend.disable_security_hub()
return json.dumps({})
def describe_hub(self) -> str:
params = json.loads(self.body) if self.body else {}
hub_arn = params.get("HubArn")
hub_info = self.securityhub_backend.describe_hub(hub_arn=hub_arn)
return json.dumps(hub_info)
def get_findings(self) -> str:
filters = self._get_param("Filters")
sort_criteria = self._get_param("SortCriteria")
max_results = self._get_param("MaxResults")
next_token = self._get_param("NextToken")
findings, next_token = self.securityhub_backend.get_findings(
filters=filters,
sort_criteria=sort_criteria,
max_results=max_results,
next_token=next_token,
)
response = {"Findings": findings, "NextToken": next_token}
return json.dumps(response)
def batch_import_findings(self) -> str:
raw_body = self.body
if isinstance(raw_body, bytes):
raw_body = raw_body.decode("utf-8")
body = json.loads(raw_body)
findings = body.get("Findings", [])
failed_count, success_count, failed_findings = (
self.securityhub_backend.batch_import_findings(
findings=findings,
)
)
return json.dumps(
{
"FailedCount": failed_count,
"FailedFindings": [
{
"ErrorCode": finding.get("ErrorCode"),
"ErrorMessage": finding.get("ErrorMessage"),
"Id": finding.get("Id"),
}
for finding in failed_findings
],
"SuccessCount": success_count,
}
)
def enable_organization_admin_account(self) -> str:
params = json.loads(self.body)
admin_account_id = params.get("AdminAccountId")
self.securityhub_backend.enable_organization_admin_account(
admin_account_id=admin_account_id,
)
return json.dumps({})
def update_organization_configuration(self) -> str:
params = json.loads(self.body)
auto_enable = params.get("AutoEnable")
auto_enable_standards = params.get("AutoEnableStandards")
organization_configuration = params.get("OrganizationConfiguration")
self.securityhub_backend.update_organization_configuration(
auto_enable=auto_enable,
auto_enable_standards=auto_enable_standards,
organization_configuration=organization_configuration,
)
return json.dumps({})
def get_administrator_account(self) -> str:
administrator = self.securityhub_backend.get_administrator_account()
return json.dumps(administrator)
def describe_organization_configuration(self) -> str:
response = self.securityhub_backend.describe_organization_configuration()
return json.dumps(dict(response))
|