File: misc.py

package info (click to toggle)
ospd-openvas 22.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,664 kB
  • sloc: python: 14,268; xml: 1,913; makefile: 45; sh: 29
file content (140 lines) | stat: -rw-r--r-- 3,886 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2014-2023 Greenbone AG
#
# SPDX-License-Identifier: AGPL-3.0-or-later

# pylint: disable=too-many-lines

"""Miscellaneous classes and functions related to OSPD."""

import logging
import os
import sys
import uuid
import multiprocessing

from typing import Any, Callable, Iterable
from pathlib import Path

import psutil

logger = logging.getLogger(__name__)


def create_process(
    func: Callable, *, args: Iterable[Any] = None
) -> multiprocessing.Process:
    return multiprocessing.Process(target=func, args=args)


class ResultType(object):
    """Various scan results types values."""

    ALARM = 0
    LOG = 1
    ERROR = 2
    HOST_DETAIL = 3

    @classmethod
    def get_str(cls, result_type: int) -> str:
        """Return string name of a result type."""
        if result_type == cls.ALARM:
            return "Alarm"
        elif result_type == cls.LOG:
            return "Log Message"
        elif result_type == cls.ERROR:
            return "Error Message"
        elif result_type == cls.HOST_DETAIL:
            return "Host Detail"
        else:
            assert False, f"Erroneous result type {result_type}."

    @classmethod
    def get_type(cls, result_name: str) -> int:
        """Return string name of a result type."""
        if result_name == "Alarm":
            return cls.ALARM
        elif result_name == "Log Message":
            return cls.LOG
        elif result_name == "Error Message":
            return cls.ERROR
        elif result_name == "Host Detail":
            return cls.HOST_DETAIL
        else:
            assert False, f"Erroneous result name {result_name}."


def valid_uuid(value) -> bool:
    """Check if value is a valid UUID."""

    try:
        uuid.UUID(value, version=4)
        return True
    except (TypeError, ValueError, AttributeError):
        return False


def go_to_background() -> None:
    """Daemonize the running process."""
    try:
        if os.fork():
            sys.exit()
    except OSError as errmsg:
        logger.error('Fork failed: %s', errmsg)
        sys.exit(1)


def create_pid(pidfile: str) -> bool:
    """Check if there is an already running daemon and creates the pid file.
    Otherwise gives an error."""

    pid = os.getpid()
    current_process = psutil.Process(pid)
    current_process_name = current_process.name()

    pidpath = Path(pidfile)
    pf_process_name = ""
    pf_pid = ""

    if pidpath.is_file():
        with pidpath.open('r', encoding='utf-8') as file:
            pf_pid = file.read().strip()
            try:
                pf_pid = int(pf_pid)
            except (TypeError, ValueError):
                pf_pid = None

        if pf_pid:
            try:
                process = psutil.Process(pf_pid)
                pf_process_name = process.name()
            except psutil.NoSuchProcess:
                pass

            if pf_process_name == current_process_name and pf_pid != pid:
                logger.error(
                    "There is an already running process. See %s.",
                    str(pidpath.absolute()),
                )
                return False
            else:
                logger.debug(
                    "There is an existing pid file '%s', but the PID %s "
                    "belongs to the process %s. It seems that %s was "
                    "abruptly stopped. Removing the pid file.",
                    str(pidpath.absolute()),
                    pf_pid,
                    pf_process_name,
                    current_process_name,
                )

    try:
        with pidpath.open(mode='w', encoding='utf-8') as f:
            f.write(str(pid))
    except (FileNotFoundError, PermissionError) as e:
        logger.error(
            "Failed to create pid file %s. %s", str(pidpath.absolute()), e
        )
        return False

    return True