1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
|
#!/usr/bin/python3
import contextlib
import logging
import re
import subprocess
import xml.etree.cElementTree as ET
from ssg.constants import OSCAP_RULE
from ssg.constants import PREFIX_TO_NS
from ssg.constants import bash_system as bash_rem_system
from ssg.constants import ansible_system as ansible_rem_system
from ssg.constants import puppet_system as puppet_rem_system
from ssg.constants import anaconda_system as anaconda_rem_system
from ssg.constants import ignition_system as ignition_rem_system
from ssg.constants import oval_namespace, SCE_SYSTEM
CHECK_SYSTEM_URI_TO_TYPE = {
oval_namespace: "oval",
SCE_SYSTEM: "sce",
}
SYSTEM_ATTRIBUTE = {
'bash': bash_rem_system,
'ansible': ansible_rem_system,
'puppet': puppet_rem_system,
'anaconda': anaconda_rem_system,
'ignition': ignition_rem_system,
}
BENCHMARK_QUERY = ".//ds:component/xccdf-1.2:Benchmark"
OVAL_DEF_QUERY = ".//ds:component/oval-def:oval_definitions/oval-def:definitions"
logging.getLogger(__name__).addHandler(logging.NullHandler())
def get_all_xccdf_ids_in_datastream(datastream):
root = ET.parse(datastream).getroot()
checklists_node = root.find(".//ds:checklists", PREFIX_TO_NS)
if checklists_node is None:
logging.error(
"Checklists not found within data stream")
xccdf_ids = []
for cref in checklists_node.findall("ds:component-ref", PREFIX_TO_NS):
href = cref.get('{%s}href' % PREFIX_TO_NS["xlink"])
comp_id = href.lstrip("#")
query = ".//ds:component[@id='%s']/xccdf-1.2:Benchmark" % (comp_id)
benchmark_node = root.find(query, PREFIX_TO_NS)
if benchmark_node is not None:
xccdf_ids.append(cref.get("id"))
return xccdf_ids
def infer_benchmark_id_from_component_ref_id(datastream, ref_id):
root = ET.parse(datastream).getroot()
component_ref_node = root.find("*//ds:component-ref[@id='{0}']"
.format(ref_id), PREFIX_TO_NS)
if component_ref_node is None:
msg = (
'Component reference of Ref-Id {} not found within data stream'
.format(ref_id))
raise RuntimeError(msg)
comp_id = component_ref_node.get('{%s}href' % PREFIX_TO_NS['xlink'])
comp_id = comp_id.lstrip('#')
query = ".//ds:component[@id='{}']/xccdf-1.2:Benchmark".format(comp_id)
benchmark_node = root.find(query, PREFIX_TO_NS)
if benchmark_node is None:
msg = (
'Benchmark not found within component of Id {}'
.format(comp_id)
)
raise RuntimeError(msg)
return benchmark_node.get('id')
@contextlib.contextmanager
def datastream_root(ds_location, save_location=None):
try:
tree = ET.parse(ds_location)
for prefix, uri in PREFIX_TO_NS.items():
ET.register_namespace(prefix, uri)
root = tree.getroot()
yield root
finally:
if save_location:
tree.write(save_location)
def find_elements(root, element_spec=None):
query = BENCHMARK_QUERY
if element_spec is not None:
query = query + "//{0}".format(element_spec)
return root.findall(query, PREFIX_TO_NS)
def remove_platforms_from_element(root, element_spec=None, platforms=None):
elements = find_elements(root, element_spec)
for el in elements:
platforms_xml = el.findall("./xccdf-1.2:platform", PREFIX_TO_NS)
for p in platforms_xml:
if platforms is None or instance_in_platforms(p, platforms):
el.remove(p)
def instance_in_platforms(inst, platforms):
return (isinstance(platforms, str) and inst.get("idref") == platforms) or \
(hasattr(platforms, "__iter__") and inst.get("idref") in platforms)
def make_applicable_in_containers(root):
remove_machine_platform(root)
remove_machine_remediation_condition(root)
def remove_machine_platform(root):
remove_platforms_from_element(root, "xccdf-1.2:Rule", "cpe:/a:machine")
remove_platforms_from_element(root, "xccdf-1.2:Group", "cpe:/a:machine")
remove_platforms_from_element(root, "xccdf-1.2:Rule", "#machine")
remove_platforms_from_element(root, "xccdf-1.2:Group", "#machine")
remove_platforms_from_element(root, "xccdf-1.2:Rule", "#system_with_kernel")
remove_platforms_from_element(root, "xccdf-1.2:Group", "#system_with_kernel")
def remove_platforms(root):
remove_platforms_from_element(root)
remove_platforms_from_element(root, "xccdf-1.2:Profile")
def remove_ocp4_platforms(root):
remove_platforms_from_element(
root, "xccdf-1.2:Rule",
["cpe:/o:redhat:openshift_container_platform:4",
"cpe:/o:redhat:openshift_container_platform_node:4",
"cpe:/a:ocp4-master-node"])
remove_platforms_from_element(
root, "xccdf-1.2:Group",
["cpe:/o:redhat:openshift_container_platform:4",
"cpe:/o:redhat:openshift_container_platform_node:4",
"cpe:/a:ocp4-master-node"])
def remove_machine_remediation_condition(root):
remove_bash_machine_remediation_condition(root)
remove_ansible_machine_remediation_condition(root)
def remove_bash_machine_remediation_condition(root):
system = "urn:xccdf:fix:script:sh"
considered_machine_platform_checks = [
r"\[\s+!\s+-f\s+/\.dockerenv\s+\]\s+&&\s+\[\s+!\s+-f\s+/run/\.containerenv\s+\]",
r"rpm\s+--quiet\s+-q\s+kernel"
]
repl = "true"
_replace_in_fix(root, system, considered_machine_platform_checks, repl)
def remove_ansible_machine_remediation_condition(root):
system = "urn:xccdf:fix:script:ansible"
considered_machine_platform_checks = [
r"\bansible_virtualization_type\s+not\s+in.*docker.*",
r"\"kernel\"\s+in\s+ansible_facts.packages"
]
repl = "True"
_replace_in_fix(root, system, considered_machine_platform_checks, repl)
def _replace_in_fix(root, system, considered_machine_platform_checks, repl):
query = BENCHMARK_QUERY + '//xccdf-1.2:fix[@system="' + system + '"]'
fix_elements = root.findall(query, PREFIX_TO_NS)
for el in fix_elements:
for check in considered_machine_platform_checks:
sub_els = el.findall(".//xccdf-1.2:sub", PREFIX_TO_NS)
for sub_el in sub_els:
sub_el.tail = re.sub(check, repl, sub_el.tail)
el.text = re.sub(check, repl, el.text)
def get_oscap_supported_cpes():
"""
Obtain a list of CPEs that the scanner supports
"""
result = []
proc = subprocess.Popen(
("oscap", "--version"),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
outs, errs = proc.communicate(timeout=3)
except subprocess.TimeoutExpired:
logging.warn("Scanner timeouted when asked about supported CPEs")
proc.kill()
return []
if proc.returncode != 0:
first_error_line = errs.split("\n")[0]
logging.warn("Error getting CPEs from the scanner: {msg}".format(msg=first_error_line))
cpe_regex = re.compile(r'\bcpe:\S+$')
for line in outs.decode().split("\n"):
match = cpe_regex.search(line)
if match:
result.append(match.group(0))
return result
def add_platform_to_benchmark(root, cpe_regex):
benchmark_query = ".//ds:component/xccdf-1.2:Benchmark"
benchmarks = root.findall(benchmark_query, PREFIX_TO_NS)
if not benchmarks:
msg = (
"No benchmarks found in the data stream"
)
raise RuntimeError(msg)
all_cpes = get_oscap_supported_cpes()
regex = re.compile(cpe_regex)
cpes_to_add = []
for cpe_str in all_cpes:
if regex.search(cpe_str):
cpes_to_add.append(cpe_str)
if not cpes_to_add:
cpes_to_add = [cpe_regex]
for benchmark in benchmarks:
existing_platform_element = benchmark.find("xccdf-1.2:platform", PREFIX_TO_NS)
if existing_platform_element is None:
logging.warn(
"Couldn't find platform element in a benchmark, "
"not adding any additional platforms as a result.")
continue
platform_index = list(benchmark).index(existing_platform_element)
for cpe_str in cpes_to_add:
e = ET.Element("xccdf-1.2:platform", idref=cpe_str)
benchmark.insert(platform_index, e)
def remove_fips_certified(root):
def_id = "oval:ssg-installed_OS_is_FIPS_certified:def:1"
parent_query = ".//oval-def:extend_definition[@definition_ref='{0}']/..".format(def_id)
child_query = "oval-def:extend_definition[@definition_ref='{0}']".format(def_id)
for parent in root.findall(parent_query, PREFIX_TO_NS):
for child in parent.findall(child_query, PREFIX_TO_NS):
parent.remove(child)
def _get_benchmark_node(datastream, benchmark_id, logging):
root = ET.parse(datastream).getroot()
benchmark_node = root.find(
"*//xccdf-1.2:Benchmark[@id='{0}']".format(benchmark_id), PREFIX_TO_NS)
if benchmark_node is None:
if logging is not None:
logging.error(
"Benchmark ID '{}' not found within data stream"
.format(benchmark_id))
return benchmark_node
def get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None):
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
all_profiles = benchmark_node.findall('xccdf-1.2:Profile', PREFIX_TO_NS)
return all_profiles
def get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None):
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
profile = benchmark_node.find("xccdf-1.2:Profile[@id='{0}']".format(profile_id), PREFIX_TO_NS)
rule_selections = profile.findall("xccdf-1.2:select[@selected='true']", PREFIX_TO_NS)
return rule_selections
def get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None):
rule_selections = get_all_rule_selections_in_profile(datastream, benchmark_id,
profile_id, logging=None)
rule_ids = [select.get("idref") for select in rule_selections]
# Strip xccdf 1.2 prefixes from rule ids
# Necessary to search for the rules within test scenarios tree
prefix_len = len(OSCAP_RULE)
return [rule[prefix_len:] for rule in rule_ids]
def benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None):
"""
Returns a set of CPEs the given benchmark is applicable to.
"""
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
platform_elements = benchmark_node.findall('xccdf-1.2:platform', PREFIX_TO_NS)
cpes = {platform_el.get("idref") for platform_el in platform_elements}
return cpes
def find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None):
"""
Returns rule node from the given benchmark.
"""
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
rule = benchmark_node.find(".//xccdf-1.2:Rule[@id='{0}']".format(rule_id), PREFIX_TO_NS)
return rule
def get_all_rules_in_benchmark(datastream, benchmark_id, logging=None):
"""
Returns all rule IDs in the given benchmark.
"""
rules = []
benchmark_node = _get_benchmark_node(datastream, benchmark_id, logging)
for rule in benchmark_node.findall(".//xccdf-1.2:Rule", PREFIX_TO_NS):
rules.append(rule.get("id"))
return rules
def find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None):
"""
Return fix from benchmark. None if not found.
"""
rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging)
if rule is None:
return None
system_attribute = SYSTEM_ATTRIBUTE.get(fix_type, bash_rem_system)
fix = rule.find("xccdf-1.2:fix[@system='{0}']".format(system_attribute), PREFIX_TO_NS)
return fix
def find_checks_in_rule(datastream, benchmark_id, rule_id):
"""
Return check types for given rule from benchmark.
"""
checks = set()
rule = find_rule_in_benchmark(datastream, benchmark_id, rule_id)
if rule is None:
return checks
check_els = rule.findall("xccdf-1.2:check", PREFIX_TO_NS)
for check_el in check_els:
system = check_el.get("system")
check = CHECK_SYSTEM_URI_TO_TYPE.get(system, None)
if check is not None:
checks.add(check)
return checks
|