File: xml_operations.py

package info (click to toggle)
scap-security-guide 0.1.76-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 110,644 kB
  • sloc: xml: 241,883; sh: 73,777; python: 32,527; makefile: 27
file content (346 lines) | stat: -rw-r--r-- 12,434 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/python3
import contextlib
import logging
import re
import subprocess
import xml.etree.cElementTree as ET

from ssg.constants import OSCAP_RULE
from ssg.constants import PREFIX_TO_NS
from ssg.constants import bash_system as bash_rem_system
from ssg.constants import ansible_system as ansible_rem_system
from ssg.constants import puppet_system as puppet_rem_system
from ssg.constants import anaconda_system as anaconda_rem_system
from ssg.constants import ignition_system as ignition_rem_system
from ssg.constants import oval_namespace, SCE_SYSTEM

CHECK_SYSTEM_URI_TO_TYPE = {
    oval_namespace: "oval",
    SCE_SYSTEM: "sce",
}

SYSTEM_ATTRIBUTE = {
    'bash': bash_rem_system,
    'ansible': ansible_rem_system,
    'puppet': puppet_rem_system,
    'anaconda': anaconda_rem_system,
    'ignition': ignition_rem_system,
}


BENCHMARK_QUERY = ".//ds:component/xccdf-1.2:Benchmark"
OVAL_DEF_QUERY = ".//ds:component/oval-def:oval_definitions/oval-def:definitions"

logging.getLogger(__name__).addHandler(logging.NullHandler())


def get_all_xccdf_ids_in_datastream(datastream):
    root = ET.parse(datastream).getroot()

    checklists_node = root.find(".//ds:checklists", PREFIX_TO_NS)
    if checklists_node is None:
        logging.error(
            "Checklists not found within data stream")

    xccdf_ids = []
    for cref in checklists_node.findall("ds:component-ref", PREFIX_TO_NS):
        href = cref.get('{%s}href' % PREFIX_TO_NS["xlink"])
        comp_id = href.lstrip("#")
        query = ".//ds:component[@id='%s']/xccdf-1.2:Benchmark" % (comp_id)
        benchmark_node = root.find(query, PREFIX_TO_NS)
        if benchmark_node is not None:
            xccdf_ids.append(cref.get("id"))
    return xccdf_ids


def infer_benchmark_id_from_component_ref_id(datastream, ref_id):
    root = ET.parse(datastream).getroot()
    component_ref_node = root.find("*//ds:component-ref[@id='{0}']"
                                   .format(ref_id), PREFIX_TO_NS)
    if component_ref_node is None:
        msg = (
            'Component reference of Ref-Id {} not found within data stream'
            .format(ref_id))
        raise RuntimeError(msg)

    comp_id = component_ref_node.get('{%s}href' % PREFIX_TO_NS['xlink'])
    comp_id = comp_id.lstrip('#')

    query = ".//ds:component[@id='{}']/xccdf-1.2:Benchmark".format(comp_id)
    benchmark_node = root.find(query, PREFIX_TO_NS)
    if benchmark_node is None:
        msg = (
            'Benchmark not found within component of Id {}'
            .format(comp_id)
        )
        raise RuntimeError(msg)

    return benchmark_node.get('id')


@contextlib.contextmanager
def datastream_root(ds_location, save_location=None):
    try:
        tree = ET.parse(ds_location)
        for prefix, uri in PREFIX_TO_NS.items():
            ET.register_namespace(prefix, uri)
        root = tree.getroot()
        yield root
    finally:
        if save_location:
            tree.write(save_location)


def find_elements(root, element_spec=None):
    query = BENCHMARK_QUERY
    if element_spec is not None:
        query = query + "//{0}".format(element_spec)
    return root.findall(query, PREFIX_TO_NS)


def remove_platforms_from_element(root, element_spec=None, platforms=None):
    elements = find_elements(root, element_spec)
    for el in elements:
        platforms_xml = el.findall("./xccdf-1.2:platform", PREFIX_TO_NS)
        for p in platforms_xml:
            if platforms is None or instance_in_platforms(p, platforms):
                el.remove(p)


def instance_in_platforms(inst, platforms):
    return (isinstance(platforms, str) and inst.get("idref") == platforms) or \
        (hasattr(platforms, "__iter__") and inst.get("idref") in platforms)


def make_applicable_in_containers(root):
    remove_machine_platform(root)
    remove_machine_remediation_condition(root)


def remove_machine_platform(root):
    remove_platforms_from_element(root, "xccdf-1.2:Rule", "cpe:/a:machine")
    remove_platforms_from_element(root, "xccdf-1.2:Group", "cpe:/a:machine")
    remove_platforms_from_element(root, "xccdf-1.2:Rule", "#machine")
    remove_platforms_from_element(root, "xccdf-1.2:Group", "#machine")
    remove_platforms_from_element(root, "xccdf-1.2:Rule", "#system_with_kernel")
    remove_platforms_from_element(root, "xccdf-1.2:Group", "#system_with_kernel")


def remove_platforms(root):
    remove_platforms_from_element(root)
    remove_platforms_from_element(root, "xccdf-1.2:Profile")


def remove_ocp4_platforms(root):
    remove_platforms_from_element(
        root, "xccdf-1.2:Rule",
        ["cpe:/o:redhat:openshift_container_platform:4",
         "cpe:/o:redhat:openshift_container_platform_node:4",
         "cpe:/a:ocp4-master-node"])
    remove_platforms_from_element(
        root, "xccdf-1.2:Group",
        ["cpe:/o:redhat:openshift_container_platform:4",
         "cpe:/o:redhat:openshift_container_platform_node:4",
         "cpe:/a:ocp4-master-node"])


def remove_machine_remediation_condition(root):
    remove_bash_machine_remediation_condition(root)
    remove_ansible_machine_remediation_condition(root)


def remove_bash_machine_remediation_condition(root):
    system = "urn:xccdf:fix:script:sh"
    considered_machine_platform_checks = [
        r"\[\s+!\s+-f\s+/\.dockerenv\s+\]\s+&&\s+\[\s+!\s+-f\s+/run/\.containerenv\s+\]",
        r"rpm\s+--quiet\s+-q\s+kernel"
    ]
    repl = "true"
    _replace_in_fix(root, system, considered_machine_platform_checks, repl)


def remove_ansible_machine_remediation_condition(root):
    system = "urn:xccdf:fix:script:ansible"
    considered_machine_platform_checks = [
        r"\bansible_virtualization_type\s+not\s+in.*docker.*",
        r"\"kernel\"\s+in\s+ansible_facts.packages"
    ]
    repl = "True"
    _replace_in_fix(root, system, considered_machine_platform_checks, repl)


def _replace_in_fix(root, system, considered_machine_platform_checks, repl):
    query = BENCHMARK_QUERY + '//xccdf-1.2:fix[@system="' + system + '"]'
    fix_elements = root.findall(query, PREFIX_TO_NS)
    for el in fix_elements:
        for check in considered_machine_platform_checks:
            sub_els = el.findall(".//xccdf-1.2:sub", PREFIX_TO_NS)
            for sub_el in sub_els:
                sub_el.tail = re.sub(check, repl, sub_el.tail)
            el.text = re.sub(check, repl, el.text)


def get_oscap_supported_cpes():
    """
    Obtain a list of CPEs that the scanner supports
    """
    result = []
    proc = subprocess.Popen(
            ("oscap", "--version"),
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    try:
        outs, errs = proc.communicate(timeout=3)
    except subprocess.TimeoutExpired:
        logging.warn("Scanner timeouted when asked about supported CPEs")
        proc.kill()
        return []

    if proc.returncode != 0:
        first_error_line = errs.split("\n")[0]
        logging.warn("Error getting CPEs from the scanner: {msg}".format(msg=first_error_line))

    cpe_regex = re.compile(r'\bcpe:\S+$')
    for line in outs.decode().split("\n"):
        match = cpe_regex.search(line)
        if match:
            result.append(match.group(0))
    return result


def add_platform_to_benchmark(root, cpe_regex):
    benchmark_query = ".//ds:component/xccdf-1.2:Benchmark"
    benchmarks = root.findall(benchmark_query, PREFIX_TO_NS)
    if not benchmarks:
        msg = (
            "No benchmarks found in the data stream"
        )
        raise RuntimeError(msg)

    all_cpes = get_oscap_supported_cpes()
    regex = re.compile(cpe_regex)

    cpes_to_add = []
    for cpe_str in all_cpes:
        if regex.search(cpe_str):
            cpes_to_add.append(cpe_str)

    if not cpes_to_add:
        cpes_to_add = [cpe_regex]

    for benchmark in benchmarks:
        existing_platform_element = benchmark.find("xccdf-1.2:platform", PREFIX_TO_NS)
        if existing_platform_element is None:
            logging.warn(
                "Couldn't find platform element in a benchmark, "
                "not adding any additional platforms as a result.")
            continue
        platform_index = list(benchmark).index(existing_platform_element)
        for cpe_str in cpes_to_add:
            e = ET.Element("xccdf-1.2:platform", idref=cpe_str)
            benchmark.insert(platform_index, e)


def remove_fips_certified(root):
    def_id = "oval:ssg-installed_OS_is_FIPS_certified:def:1"
    parent_query = ".//oval-def:extend_definition[@definition_ref='{0}']/..".format(def_id)
    child_query = "oval-def:extend_definition[@definition_ref='{0}']".format(def_id)
    for parent in root.findall(parent_query, PREFIX_TO_NS):
        for child in parent.findall(child_query, PREFIX_TO_NS):
            parent.remove(child)


def _get_benchmark_node(datastream, benchmark_id, logging):
    root = ET.parse(datastream).getroot()
    benchmark_node = root.find(
        "*//xccdf-1.2:Benchmark[@id='{0}']".format(benchmark_id), PREFIX_TO_NS)
    if benchmark_node is None:
        if logging is not None:
            logging.error(
                "Benchmark ID '{}' not found within data stream"
                .format(benchmark_id))
    return benchmark_node


def get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None):
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
    all_profiles = benchmark_node.findall('xccdf-1.2:Profile', PREFIX_TO_NS)
    return all_profiles


def get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None):
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
    profile = benchmark_node.find("xccdf-1.2:Profile[@id='{0}']".format(profile_id), PREFIX_TO_NS)
    rule_selections = profile.findall("xccdf-1.2:select[@selected='true']", PREFIX_TO_NS)
    return rule_selections


def get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None):
    rule_selections = get_all_rule_selections_in_profile(datastream, benchmark_id,
                                                         profile_id, logging=None)
    rule_ids = [select.get("idref") for select in rule_selections]

    # Strip xccdf 1.2 prefixes from rule ids
    # Necessary to search for the rules within test scenarios tree
    prefix_len = len(OSCAP_RULE)
    return [rule[prefix_len:] for rule in rule_ids]


def benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None):
    """
    Returns a set of CPEs the given benchmark is applicable to.
    """
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
    platform_elements = benchmark_node.findall('xccdf-1.2:platform', PREFIX_TO_NS)
    cpes = {platform_el.get("idref") for platform_el in platform_elements}
    return cpes


def find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None):
    """
    Returns rule node from the given benchmark.
    """
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
    rule = benchmark_node.find(".//xccdf-1.2:Rule[@id='{0}']".format(rule_id), PREFIX_TO_NS)
    return rule


def get_all_rules_in_benchmark(datastream, benchmark_id, logging=None):
    """
    Returns all rule IDs in the given benchmark.
    """
    rules = []
    benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
    for rule in benchmark_node.findall(".//xccdf-1.2:Rule", PREFIX_TO_NS):
        rules.append(rule.get("id"))
    return rules


def find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None):
    """
    Return fix from benchmark. None if not found.
    """
    rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging)
    if rule is None:
        return None

    system_attribute = SYSTEM_ATTRIBUTE.get(fix_type, bash_rem_system)

    fix = rule.find("xccdf-1.2:fix[@system='{0}']".format(system_attribute), PREFIX_TO_NS)
    return fix


def find_checks_in_rule(datastream, benchmark_id, rule_id):
    """
    Return check types for given rule from benchmark.
    """
    checks = set()
    rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id)
    if rule is None:
        return checks
    check_els = rule.findall("xccdf-1.2:check", PREFIX_TO_NS)
    for check_el in check_els:
        system = check_el.get("system")
        check = CHECK_SYSTEM_URI_TO_TYPE.get(system, None)
        if check is not None:
            checks.add(check)
    return checks