1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
# SPDX-FileCopyrightText: 2019-2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
from types import TracebackType
from typing import Callable, Generic, Optional, Type, TypeVar
from gvm.connections import GvmConnection
from .core import Connection, Request, Response
T = TypeVar("T")
Self = TypeVar("Self", bound="GvmProtocol")
def str_transform(response: Response) -> str:
return str(response)
class GvmProtocol(Generic[T]):
"""Base class for different GVM protocols"""
def __init__(
self,
connection: GvmConnection,
*,
transform: Callable[[Response], T] = str_transform, # type: ignore[assignment]
):
"""
Create a new GvmProtocol instance.
Args:
connection: Connection to use to talk with the remote daemon. See
:mod:`gvm.connections` for possible connection types.
transform: Optional transform callable to convert response data.
After each request the callable gets passed the plain response data
which can be used to check the data and/or conversion into different
representations like a xml dom.
"""
self._connection = connection
self._protocol = Connection()
self._connected = False
self._transform_callable = transform
def __enter__(self: Self) -> Self:
self.connect()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.disconnect()
def _read(self) -> bytes:
"""Read a command response from gvmd
Returns:
str: Response from server.
"""
return self._connection.read()
def _send(self, data: bytes) -> None:
"""Send a command to the server
Arguments:
data (str): Data to be send over the connection to the server
"""
self.connect()
self._connection.send(data)
def is_connected(self) -> bool:
"""Status of the current connection
Returns:
True if a connection to the remote server has been established.
"""
return self._connected
def connect(self) -> None:
"""Initiates a protocol connection
Normally connect is not called directly. Either it is called
automatically when sending a protocol command or when using a
`with statement <https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers>`_.
"""
if not self.is_connected():
self._connection.connect()
self._connected = True
def disconnect(self) -> None:
"""Disconnect the connection
Ends and closes the connection.
"""
if self.is_connected():
self._connection.disconnect()
self._connected = False
self._protocol.close()
def send_command(self, cmd: str) -> str:
"""
Send a string command to the remote daemon and return the response as
string
"""
return bytes(
self._send_request(cmd.encode("utf-8", errors="ignore")) # type: ignore[arg-type] # it seems mypy on Python < 3.11 can't handle bytes here
).decode("utf-8", errors="ignore")
def _transform(self, response: Response) -> T:
transform = self._transform_callable
return transform(response)
def _send_request(self, request: Request) -> Response:
"""
Send a request to the remote daemon and return the response
Args:
request: The request to be send.
"""
try:
send_data = self._protocol.send(request)
self._send(send_data)
response: Optional[Response] = None
while not response:
received_data = self._read()
response = self._protocol.receive_data(received_data)
return response
except Exception as e:
self.disconnect()
raise e
def _send_request_and_transform_response(self, request: Request) -> T:
"""
Send a request and transform its response using the transform callable.
Args:
request: The request to be send.
"""
return self._transform(self._send_request(request))
|