File: _gmp.py

package info (click to toggle)
python-gvm 26.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 5,132 kB
  • sloc: python: 44,662; makefile: 18
file content (140 lines) | stat: -rw-r--r-- 5,236 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# SPDX-FileCopyrightText: 2019-2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

import warnings
from types import TracebackType
from typing import Callable, Optional, Type, Union

from gvm.__version__ import __version__
from gvm.connections import GvmConnection
from gvm.errors import GvmError
from gvm.protocols.core import Response

from .._protocol import GvmProtocol, T, str_transform
from ._gmp224 import GMPv224
from ._gmp225 import GMPv225
from ._gmp226 import GMPv226
from ._gmp227 import GMPv227
from ._gmpnext import GMPNext
from .requests import Version

SUPPORTED_GMP_VERSIONS = Union[
    GMPv224[T], GMPv225[T], GMPv226[T], GMPv227[T], GMPNext[T]
]
_SUPPORTED_GMP_VERSION_STRINGS = ["22.4", "22.5", "22.6", "22.7", "22.8"]


class GMP(GvmProtocol[T]):
    """Dynamically select supported GMP protocol of the remote manager daemon.

    Must be used as a `Context Manager <https://docs.python.org/3/reference/datamodel.html#context-managers>`_

    Example:

        .. code-block:: python

            from gvm.protocols.gmp import GMP

            with GMP(connection) as gmp:
                # gmp can be an instance of
                # gvm.protocols.gmp.GMPv224,
                # gvm.protocols.gmp.GMPv225,
                # gvm.protocols.gmp.GMPv226,
                # gvm.protocols.gmp.GMPv227,
                # or gvm.protocols.gmp.GMPNext
                # depending on the supported GMP version of the remote manager daemon
                resp = gmp.get_tasks()
    """

    def __init__(
        self,
        connection: GvmConnection,
        *,
        transform: Callable[[Response], T] = str_transform,  # type: ignore[assignment]
    ):
        """
        Create a new GMP instance.

        Args:
            connection: Connection to use to talk with the remote daemon. See
                :mod:`gvm.connections` for possible connection types.
            transform: Optional transform `callable <https://docs.python.org/3/library/functions.html#callable>`_
                to convert response data. After each request the callable gets passed the plain response data
                which can be used to check the data and/or conversion into different
                representations like a xml dom.

                See :mod:`gvm.transforms` for existing transforms.
        """
        super().__init__(connection, transform=transform)
        self._gmp: Optional[SUPPORTED_GMP_VERSIONS] = None

    def determine_remote_gmp_version(self) -> str:
        """Determine the supported GMP version of the remote daemon"""
        self.connect()
        resp = self._send_request(Version.get_version())
        self.disconnect()

        version_el = resp.xml().find("version")
        if version_el is None or not version_el.text:
            raise GvmError(
                "Invalid response from manager daemon while requesting the "
                "version information."
            )

        return version_el.text

    def determine_supported_gmp(self) -> SUPPORTED_GMP_VERSIONS:
        """Determine supported GMP version of the remote daemon and return a
        corresponding GMP class instance
        """
        version_str = self.determine_remote_gmp_version().split(".", 1)
        major_version = int(version_str[0])
        minor_version = int(version_str[1])
        if major_version == 22 and minor_version == 4:
            gmp_class = GMPv224
        elif major_version == 22 and minor_version == 5:
            gmp_class = GMPv225
        elif major_version == 22 and minor_version == 6:
            gmp_class = GMPv226
        elif major_version == 22 and minor_version == 7:
            gmp_class = GMPv227
        elif major_version == 22 and minor_version >= 8:
            gmp_class = GMPNext
            if minor_version > 8:
                warnings.warn(
                    "Remote manager daemon uses a newer GMP version than "
                    f"supported by python-gvm {__version__}. Please update to "
                    "a newer release of python-gvm if possible. "
                    f"Remote GMP version is {major_version}.{minor_version}. "
                    f"Supported GMP versions are {', '.join(_SUPPORTED_GMP_VERSION_STRINGS)}."
                )
        else:
            raise GvmError(
                "Remote manager daemon uses an unsupported version of GMP. "
                f"The GMP version was {major_version}.{minor_version}"
                f"Supported GMP versions are {', '.join(_SUPPORTED_GMP_VERSION_STRINGS)}."
            )

        return gmp_class(self._connection, transform=self._transform_callable)  # type: ignore[arg-type]

    def __enter__(self) -> SUPPORTED_GMP_VERSIONS:  # type: ignore[override]
        """
        Returns the corresponding GMP class of the supported GMP version of the
        remote manager daemon.
        """
        self._gmp = self.determine_supported_gmp()

        self._gmp.connect()

        return self._gmp

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        if self._gmp:
            self._gmp.disconnect()
        self._gmp = None