File: _protocol.py

package info (click to toggle)
python-gvm 26.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 5,132 kB
  • sloc: python: 44,662; makefile: 18
file content (145 lines) | stat: -rw-r--r-- 4,440 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# SPDX-FileCopyrightText: 2019-2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from types import TracebackType
from typing import Callable, Generic, Optional, Type, TypeVar

from gvm.connections import GvmConnection

from .core import Connection, Request, Response

T = TypeVar("T")
Self = TypeVar("Self", bound="GvmProtocol")


def str_transform(response: Response) -> str:
    return str(response)


class GvmProtocol(Generic[T]):
    """Base class for different GVM protocols"""

    def __init__(
        self,
        connection: GvmConnection,
        *,
        transform: Callable[[Response], T] = str_transform,  # type: ignore[assignment]
    ):
        """
        Create a new GvmProtocol instance.

        Args:
            connection: Connection to use to talk with the remote daemon. See
                :mod:`gvm.connections` for possible connection types.
            transform: Optional transform callable to convert response data.
                After each request the callable gets passed the plain response data
                which can be used to check the data and/or conversion into different
                representations like a xml dom.
        """
        self._connection = connection
        self._protocol = Connection()

        self._connected = False

        self._transform_callable = transform

    def __enter__(self: Self) -> Self:
        self.connect()
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self.disconnect()

    def _read(self) -> bytes:
        """Read a command response from gvmd

        Returns:
            str: Response from server.
        """
        return self._connection.read()

    def _send(self, data: bytes) -> None:
        """Send a command to the server

        Arguments:
            data (str): Data to be send over the connection to the server
        """
        self.connect()
        self._connection.send(data)

    def is_connected(self) -> bool:
        """Status of the current connection

        Returns:
            True if a connection to the remote server has been established.
        """
        return self._connected

    def connect(self) -> None:
        """Initiates a protocol connection

        Normally connect is not called directly. Either it is called
        automatically when sending a protocol command or when using a
        `with statement <https://docs.python.org/3/reference/datamodel.html#with-statement-context-managers>`_.
        """
        if not self.is_connected():
            self._connection.connect()
            self._connected = True

    def disconnect(self) -> None:
        """Disconnect the connection

        Ends and closes the connection.
        """
        if self.is_connected():
            self._connection.disconnect()
            self._connected = False

        self._protocol.close()

    def send_command(self, cmd: str) -> str:
        """
        Send a string command to the remote daemon and return the response as
        string
        """
        return bytes(
            self._send_request(cmd.encode("utf-8", errors="ignore"))  # type: ignore[arg-type] # it seems mypy on Python < 3.11 can't handle bytes here
        ).decode("utf-8", errors="ignore")

    def _transform(self, response: Response) -> T:
        transform = self._transform_callable
        return transform(response)

    def _send_request(self, request: Request) -> Response:
        """
        Send a request to the remote daemon and return the response

        Args:
            request: The request to be send.
        """
        try:
            send_data = self._protocol.send(request)
            self._send(send_data)
            response: Optional[Response] = None
            while not response:
                received_data = self._read()
                response = self._protocol.receive_data(received_data)
            return response
        except Exception as e:
            self.disconnect()
            raise e

    def _send_request_and_transform_response(self, request: Request) -> T:
        """
        Send a request and transform its response using the transform callable.

        Args:
            request: The request to be send.
        """
        return self._transform(self._send_request(request))