File: _connection.py

package info (click to toggle)
python-gvm 26.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 5,220 kB
  • sloc: python: 45,844; makefile: 18
file content (124 lines) | stat: -rw-r--r-- 3,442 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

import logging
import socket as socketlib
from abc import ABC, abstractmethod
from time import time
from typing import Optional, Protocol, Union, runtime_checkable

from gvm.errors import GvmError

BUF_SIZE = 16 * 1024

DEFAULT_TIMEOUT = 60  # in seconds

logger = logging.getLogger(__name__)


@runtime_checkable
class GvmConnection(Protocol):
    """
    Python `protocol <https://docs.python.org/3/library/typing.html#typing.Protocol>`_
    for GvmConnection classes.
    """

    def connect(self) -> None:
        """Establish a connection to a remote server"""

    def disconnect(self) -> None:
        """Send data to the connected remote server

        Arguments:
            data: Data to be send to the server. Either utf-8 encoded string or
                bytes.
        """

    def send(self, data: bytes) -> None:
        """Send data to the connected remote server

        Args:
            data: Data to be send to the server as bytes.
        """

    def read(self) -> bytes:
        """Read data from the remote server

        Returns:
            data as bytes
        """

    def finish_send(self):
        """Indicate to the remote server you are done with sending data"""


class AbstractGvmConnection(ABC):
    """
    Base class for establishing a connection to a remote server daemon.

    Arguments:
        timeout: Timeout in seconds for the connection. None to
            wait indefinitely
    """

    def __init__(self, timeout: Optional[Union[int, float]] = DEFAULT_TIMEOUT):
        self._socket: Optional[socketlib.SocketType] = None
        self._timeout = timeout if timeout is not None else DEFAULT_TIMEOUT

    def _read(self) -> bytes:
        if self._socket is None:
            raise GvmError("Socket is not connected")

        return self._socket.recv(BUF_SIZE)

    @abstractmethod
    def connect(self) -> None:
        """Establish a connection to a remote server"""
        raise NotImplementedError

    def send(self, data: bytes) -> None:
        """Send data to the connected remote server

        Args:
            data: Data to be send to the server as bytes.
        """
        if self._socket is None:
            raise GvmError("Socket is not connected")

        self._socket.sendall(data)

    def read(self) -> bytes:
        """Read data from the remote server

        Returns:
            data as bytes
        """
        break_timeout = (
            time() + self._timeout if self._timeout is not None else None
        )

        data = self._read()

        if not data:
            # Connection was closed by server
            raise GvmError("Remote closed the connection")

        if break_timeout and time() > break_timeout:
            raise GvmError("Timeout while reading the response")

        return data

    def disconnect(self) -> None:
        """Disconnect and close the connection to the remote server"""
        try:
            if self._socket is not None:
                self._socket.close()
        except OSError as e:
            logger.debug("Connection closing error: %s", e)

    def finish_send(self):
        """Indicate to the remote server you are done with sending data"""
        if self._socket is not None:
            # shutdown socket for sending. only allow reading data afterwards
            self._socket.shutdown(socketlib.SHUT_WR)