1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
|
# SPDX-FileCopyrightText: 2018-2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Module for communication to a daemon speaking `Open Scanner Protocol version 1`_
.. _Open Scanner Protocol version 1:
https://docs.greenbone.net/API/OSP/osp-20.08.html
"""
import logging
from typing import Any, Optional
from gvm.errors import InvalidArgument, RequiredArgument
from gvm.utils import to_bool
from gvm.xml import XmlCommand, XmlCommandElement
from ._protocol import GvmProtocol, T
from .core import Request, Response
logger = logging.getLogger(__name__)
PROTOCOL_VERSION = (1, 2)
def create_credentials_element(
xml_credentials: XmlCommandElement, credentials: dict[str, dict[str, str]]
):
"""Generates an xml element with credentials."""
for service, credential in credentials.items():
cred_type = credential.get("type", "")
server_port = credential.get("port", "")
username = credential.get("username")
password = credential.get("password")
xml_credential = xml_credentials.add_element("credential")
xml_credential.set_attribute("type", cred_type)
xml_credential.set_attribute("port", server_port)
xml_credential.set_attribute("service", service)
xml_credential.add_element("username", username)
xml_credential.add_element("password", password)
return xml_credentials
def create_vt_selection_element(
xml_vt_selection: XmlCommandElement, vt_selection: dict[str, Any]
):
"""Generates an xml element with a selection of Vulnerability tests."""
for vt_id, vt_values in vt_selection.items():
if vt_id != "vt_groups" and isinstance(vt_values, dict):
xml_vt = xml_vt_selection.add_element(
"vt_single", attrs={"id": vt_id}
)
if vt_values:
for key, value in vt_values.items():
xml_vt.add_element("vt_value", value, attrs={"id": key})
elif vt_id == "vt_groups" and isinstance(vt_values, list):
for group in vt_values:
xml_vt_selection.add_element(
"vt_group", attrs={"filter": group}
)
else:
raise InvalidArgument(
f"It was not possible to add {vt_id} to the VTs selection."
)
return xml_vt_selection
class Osp(GvmProtocol[T]):
@staticmethod
def get_protocol_version() -> tuple[int, int]:
"""Determine the Open Scanner Protocol version.
Returns:
tuple: Implemented version of the Open Scanner Protocol
"""
return PROTOCOL_VERSION
def _send_request(self, cmd: Request) -> Response:
try:
return super()._send_request(cmd)
finally:
# OSP is stateless. Therefore the connection is closed after each
# response and we must reset the connection
self.disconnect()
def get_version(self) -> T:
"""Get the version of the OSPD server which is connected to."""
cmd = XmlCommand("get_version")
return self._send_request_and_transform_response(cmd)
def help(self) -> T:
"""Get the help text."""
cmd = XmlCommand("help")
return self._send_request_and_transform_response(cmd)
def get_scans(
self,
scan_id: Optional[str] = None,
details: bool = True,
pop_results: bool = False,
) -> T:
"""Get the stored scans.
Arguments:
scan_id (str, optional): UUID identifier for a scan.
details (boolean, optional): Whether to get full scan reports.
Default: True
pop_results (boolean, optional) Whether to remove the fetched
results. Default: False
"""
cmd = XmlCommand("get_scans")
if scan_id:
cmd.set_attribute("scan_id", scan_id)
cmd.set_attribute("details", to_bool(details))
cmd.set_attribute("pop_results", to_bool(pop_results))
return self._send_request_and_transform_response(cmd)
def delete_scan(self, scan_id: str) -> T:
"""Delete a finished scan.
Arguments:
scan_id: UUID identifier for a finished scan.
"""
if not scan_id:
raise ValueError("delete_scan requires a scan_id element")
cmd = XmlCommand("delete_scan")
cmd.set_attribute("scan_id", scan_id)
return self._send_request_and_transform_response(cmd)
def get_scanner_details(self) -> T:
"""Return scanner description and parameters."""
cmd = XmlCommand("get_scanner_details")
return self._send_request_and_transform_response(cmd)
def get_vts(self, vt_id: Optional[str] = None) -> T:
"""Return information about vulnerability tests,
if offered by scanner.
Args:
vt_id: UUID identifier for a vulnerability test.
"""
cmd = XmlCommand("get_vts")
if vt_id:
cmd.set_attribute("vt_id", vt_id)
return self._send_request_and_transform_response(cmd)
def start_scan(
self,
scan_id: Optional[str] = None,
parallel: int = 1,
target=None,
ports=None,
targets: Optional[list[dict[str, str]]] = None,
scanner_params: Optional[dict[str, Any]] = None,
vt_selection: Optional[dict[str, Any]] = None,
) -> T:
"""Start a new scan.
Args:
scan_id: UUID identifier for a running scan.
parallel: Number of parallel scanned targets.
Default 1.
target: Deprecated. Please use targets instead.
targets: List of dictionaries. See example.
ports: Deprecated. Ports to use for target parameter.
scanner_params:: Dictionary of scanner parameters.
vt_selection:: Vulnerability tests to select. See example.
Examples:
Scanner Parameters::
scanner_parameters = {
'scan_param1': 'scan_param1_value',
'scan_param2': 'scan_param2_value',
}
Targets::
targets = [{
'hosts': 'localhost',
'ports': '80,43'
}, {
'hosts': '192.168.0.0/24',
'ports': '22',
}, {
'credentials': {
'smb': {
'password': 'pass',
'port': 'port',
'type': 'type',
'username': 'username',
}
}
}]
VT Selection::
vt_selection = {
'vt1': {},
'vt2': {'value_id': 'value'},
'vt_groups': ['family=debian', 'family=general']
}
"""
cmd = XmlCommand("start_scan")
if scan_id:
cmd.set_attribute("scan_id", scan_id)
cmd.set_attribute("parallel", str(parallel))
# Add <scanner_params> even if it is empty, since it is mandatory
xml_scan_params = cmd.add_element("scanner_params")
if scanner_params:
xml_scan_params.set_attributes(scanner_params)
if targets:
xml_targets = cmd.add_element("targets")
for target in targets:
xml_target = xml_targets.add_element("target")
hosts = target.get("hosts")
ports = target.get("ports")
credentials = target.get("credentials")
xml_target.add_element("hosts", hosts)
xml_target.add_element("ports", ports)
if credentials:
create_credentials_element(
xml_target.add_element("credentials"), credentials
)
# Check target as attribute for legacy mode compatibility. Deprecated.
elif target:
cmd.set_attribute("target", target)
if ports:
cmd.set_attribute("ports", ports)
else:
raise RequiredArgument(
function=self.start_scan.__name__, argument="targets"
)
if vt_selection:
create_vt_selection_element(
cmd.add_element("vt_selection"), vt_selection
)
return self._send_request_and_transform_response(cmd)
def stop_scan(self, scan_id: str) -> T:
"""Stop a currently running scan.
Args:
scan_id: UUID identifier for a running scan.
"""
if not scan_id:
raise RequiredArgument(
function=self.stop_scan.__name__, argument="scan_id"
)
cmd = XmlCommand("stop_scan")
cmd.set_attribute("scan_id", scan_id)
return self._send_request_and_transform_response(cmd)
|