File: json.py

package info (click to toggle)
notus-scanner 22.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,556 kB
  • sloc: python: 4,229; sh: 36; makefile: 4
file content (137 lines) | stat: -rw-r--r-- 5,165 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# SPDX-FileCopyrightText: 2021-2024 Greenbone AG
#
# SPDX-License-Identifier: AGPL-3.0-or-later

import json
import logging
from json.decoder import JSONDecodeError
from pathlib import Path
from typing import Callable, Dict, Optional

from ..errors import AdvisoriesLoadingError
from ..models.packages import package_class_by_type
from ..models.packages.package import PackageAdvisories, PackageType
from .gpg_sha_verifier import VerificationResult
from .loader import AdvisoriesLoader

logger = logging.getLogger(__name__)


def _get_operating_system_file_name(operating_system: str) -> str:
    return operating_system.strip().replace(" ", "_").lower()


class JSONAdvisoriesLoader(AdvisoriesLoader):
    def __init__(
        self,
        advisories_directory_path: Path,
        verify: Callable[[Path], VerificationResult],
    ):
        self._advisories_directory_path = advisories_directory_path
        self._verify = verify

    def __load_data(self, operating_system: str) -> Optional[Dict]:
        os_file_name = _get_operating_system_file_name(operating_system)
        json_file_path = (
            self._advisories_directory_path / f"{os_file_name}.notus"
        )
        # since the data is coming from the outside it should not crash
        # on wrongfully send data instead print a warning and return None
        if not json_file_path.exists():
            logger.log(
                logging.WARNING,
                "Could not load advisories from %s. File does not exist.",
                json_file_path.absolute(),
            )
            return None
        # If there is a file but unable to verify it could be that the feed
        # is corrupted and the application should stop
        verify_result = self._verify(json_file_path)
        if verify_result == VerificationResult.SUCCESS:
            logger.debug("File '%s' verification successful.", json_file_path)
        else:
            reason = (
                "File verification failed."
                if verify_result != VerificationResult.INVALID_NAME
                else "OS name does not match filename."
            )
            raise AdvisoriesLoadingError(
                f"could not load advisories from {json_file_path.absolute()}. "
                f"{reason}"
            )

        if json_file_path.stat().st_size < 2:
            # the minimum size of a json file is 2 bytes ({} or [])
            return None

        with json_file_path.open("r", encoding="utf-8") as f:
            try:
                return json.load(f)
            except JSONDecodeError as e:
                raise AdvisoriesLoadingError(
                    "Could not load advisories from "
                    f"{json_file_path.absolute()}. Error in line {e.lineno} "
                    "while decoding JSON data."
                ) from None

    def load_package_advisories(
        self, operating_system: str
    ) -> Optional[PackageAdvisories]:
        data = self.__load_data(operating_system=operating_system)
        if not data:
            return None
        package_type_id = data.get("package_type", "")
        try:
            package_type = PackageType(package_type_id)
        except ValueError:
            logger.warning("%s invalid package type.", package_type_id)
            return None
        package_advisories = PackageAdvisories(package_type)
        package_class = package_class_by_type(package_type)
        if not package_class:
            logger.warning("%s has no package implementation", package_type_id)
            return None

        for advisory_data in data.get("advisories", []):
            if "oid" not in advisory_data:
                logger.error("No OID found for JSON advisory %s", advisory_data)
                continue

            try:
                # parse required data
                oid = advisory_data["oid"]
                fixed_packages = advisory_data["fixed_packages"]
            except (KeyError, TypeError) as e:
                logger.warning(
                    "Error while parsing %s for %s. Error was %s",
                    advisory_data,
                    operating_system,
                    e,
                )
                continue

            advisory = oid

            for package_dict in fixed_packages:
                full_name = package_dict.get("full_name")
                if full_name:
                    package = package_class.from_full_name(full_name)
                else:
                    package = package_class.from_name_and_full_version(
                        package_dict.get("name"),
                        package_dict.get("full_version"),
                    )
                if not package:
                    logger.warning(
                        "Could not parse fixed package information from %s "
                        "in %s",
                        package_dict,
                        operating_system,
                    )
                    continue

                package_advisories.add_advisory_for_package(
                    package, advisory, package_dict.get("specifier")
                )

        return package_advisories