1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
|
#!/usr/bin/python3
import argparse
import os
import sys
import time
import glob
import xml.etree.ElementTree as ET
from ssg.build_sce import collect_sce_checks
from ssg.constants import (
cat_namespace, datastream_namespace, oval_namespace, sce_namespace,
XCCDF12_NS, xlink_namespace)
import ssg.xml
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
ID_NS = "org.open-scap"
component_ref_prefix = "#scap_org.open-scap_cref_"
# Inspired by openscap ds_sds_mangle_filepath() function
def mangle_path(path):
path = path.replace('/', '-')
path = path.replace('@', '-')
path = path.replace('~', '-')
return path
# From the list generated by collect_sce_checks, extract the path to the check content,
# and embed the script into the data stream
def embed_sce_checks_in_datastream(datastreamtree, checklists, checks, build_dir):
sce_files = collect_sce_checks(datastreamtree)
for file in sce_files:
path = os.path.join(build_dir, file)
mangled_path = mangle_path(file)
with open(path, 'rt', encoding='utf8') as fd:
sce_script_content = fd.read()
component_id = "scap_{}_ecomp_{}".format(ID_NS, mangled_path)
component = ET.SubElement(
datastreamtree, '{%s}extended-component' % datastream_namespace,
attrib={
'id': component_id,
'timestamp': get_timestamp(path)
})
# Append the file content
script_data = ET.SubElement(component, '{%s}script' % sce_namespace)
script_data.text = sce_script_content
# Create a component reference to map the checklist to the extended component
component_ref_id = "scap_{}_cref_{}".format(ID_NS, mangled_path)
component_ref = ET.SubElement(
checks, '{%s}component-ref' % datastream_namespace,
attrib={
'id': component_ref_id,
('{%s}href' % xlink_namespace): '#' + component_id
})
# Add the component reference to the catalog of XCCDF checklists
checklists_component_ref = checklists.find(
"{%s}component-ref" % datastream_namespace)
catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace)
uri = ET.SubElement(
catalog, '{%s}uri' % cat_namespace,
attrib={
'name': file,
'uri': '#' + component_ref_id
})
def move_patches_up_to_date_to_source_data_stream_component(datastreamtree):
ds_checklists = datastreamtree.find(
".//{%s}checklists" % datastream_namespace)
checklists_component_ref = ds_checklists.find(
"{%s}component-ref" % datastream_namespace)
checklists_component_ref_id = checklists_component_ref.get('id')
# The component ID is the component-ref href without leading '#'
checklists_component_id = checklists_component_ref.get(
'{%s}href' % xlink_namespace)[1:]
# Locate the <xccdf:check> element of an <xccdf:Rule> with id
# security_patches_up_to_date
checklist_component = None
oval_check = None
ds_components = datastreamtree.findall(
".//{%s}component" % datastream_namespace)
for ds_component in ds_components:
if ds_component.get('id') == checklists_component_id:
checklist_component = ds_component
if checklist_component is None:
# Something strange happened
sys.stderr.write(
"Couldn't find <component> %s referenced by <component-ref> %s" % (
checklists_component_id, checklists_component_ref_id))
sys.exit(1)
rules = checklist_component.findall(".//{%s}Rule" % XCCDF12_NS)
for rule in rules:
if rule.get('id').endswith('rule_security_patches_up_to_date'):
rule_checks = rule.findall("{%s}check" % XCCDF12_NS)
for check in rule_checks:
if check.get('system') == oval_namespace:
oval_check = check
break
if oval_check is None:
# The component doesn't have a security patches up to date rule
# with an OVAL check
return
# SCAP 1.3 demands multi-check true if the Rules
# security_patches_up_to_date is evaluated by multiple OVAL patch class
# definitinos. See 3.2.4.3, SCAP 1.3 standard (NIST.SP.800-126r3)
oval_check.set('multi-check', 'true')
check_content_ref = oval_check.find('{%s}check-content-ref' % XCCDF12_NS)
href_url = check_content_ref.get('href')
# Use URL's path to define the component name and URI
# Path attribute returned from urlparse contains a leading '/', when
# mangling it it will get replaced by '-'. Let's strip the '/' to avoid
# a sequence of "_-" in the component-ref ID.
component_ref_name = mangle_path(urlparse(href_url).path[1:])
component_ref_uri = component_ref_prefix + component_ref_name
# update @href to refer the datastream component name
check_content_ref.set('href', component_ref_name)
# Add a uri refering the component in Rule's Benchmark component-ref
# catalog
uri_exists = False
catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace)
uris = catalog.findall("{%s}uri" % cat_namespace)
for uri in uris:
if uri.get('name') == component_ref_name:
uri_exists = True
return
if not uri_exists:
uri = ET.Element('{%s}uri' % cat_namespace)
uri.set('name', component_ref_name)
uri.set('uri', component_ref_uri)
catalog.append(uri)
# The component-ref ID is the catalog uri without leading '#'
component_ref_feed_id = component_ref_uri[1:]
# Add the component-ref to list of datastreams' checks
check_component_ref_exists = False
ds_checks = datastreamtree.find(".//{%s}checks" % datastream_namespace)
check_component_refs = ds_checks.findall(
"{%s}component-ref" % datastream_namespace)
for check_component_ref in check_component_refs:
if check_component_ref.get('id') == component_ref_feed_id:
check_component_ref_exists = True
return
if not check_component_ref_exists:
component_ref_feed = ET.Element(
'{%s}component-ref' % datastream_namespace)
component_ref_feed.set('id', component_ref_feed_id)
component_ref_feed.set('{%s}href' % xlink_namespace, href_url)
ds_checks.append(component_ref_feed)
def parse_args():
parser = argparse.ArgumentParser(description="Compose an SCAP source data \
stream from individual SCAP components")
parser.add_argument("--build-dir", help="the '/build' directory")
parser.add_argument("--xccdf", help="XCCDF 1.2 checklist file name")
parser.add_argument("--oval", help="OVAL file name")
parser.add_argument("--ocil", help="OCIL file name")
parser.add_argument("--cpe-dict", help="CPE dictionary file name")
parser.add_argument("--cpe-oval", help="CPE OVAL file name")
parser.add_argument("--enable-sce", action='store_true', help="Enable building sce data")
parser.add_argument(
"--output", required=True,
help="Output SCAP 1.3 source data stream file name")
parser.add_argument(
"--multiple-ds",
help="Directory where XCCDF, OVAL, OCIL files with lower case prefixes "
"xccdf, oval, ocil are stored to build multiple data streams. "
"Multiple streams are generated in the thin_ds subdirectory. (off: to disable) "
"e.g.: ~/scap-security-guide/build/rhel9/thin_ds_component/",
)
return parser.parse_args()
def get_timestamp(file_name):
source_date_epoch = os.getenv("SOURCE_DATE_EPOCH")
if source_date_epoch:
time_sec = float(source_date_epoch)
else:
time_sec = os.path.getmtime(file_name)
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time_sec))
return timestamp
def add_component(
ds_collection, component_ref_parent, component_file_name,
dependencies=None):
if not os.path.exists(component_file_name):
return
component_id = "scap_%s_comp_%s" % (
ID_NS, os.path.basename(component_file_name))
component = ET.SubElement(
ds_collection, "{%s}component" % datastream_namespace)
component.set("id", component_id)
component.set("timestamp", get_timestamp(component_file_name))
component_ref = ET.SubElement(
component_ref_parent, "{%s}component-ref" % datastream_namespace)
component_ref_id = "scap_%s_cref_%s" % (
ID_NS, os.path.basename(component_file_name))
component_ref.set("id", component_ref_id)
component_ref.set("{%s}href" % xlink_namespace, "#" + component_id)
component_root = ET.parse(component_file_name).getroot()
component.append(component_root)
if dependencies:
create_catalog(component_ref, dependencies)
def create_catalog(component_ref, dependencies):
dependencies = [dep for dep in dependencies if os.path.exists(dep)]
if len(dependencies) == 0:
return
catalog = ET.SubElement(component_ref, "{%s}catalog" % cat_namespace)
for dep in dependencies:
uri = ET.SubElement(catalog, "{%s}uri" % cat_namespace)
dep_base_name = os.path.basename(dep)
uri.set("name", dep_base_name)
uri.set("uri", "#scap_%s_cref_%s" % (ID_NS, dep_base_name))
def compose_ds(
build_dir, xccdf_file_name, oval_file_name, ocil_file_name,
cpe_dict_file_name, cpe_oval_file_name, sce_enabled):
ds_collection = ET.Element(
"{%s}data-stream-collection" % datastream_namespace)
name = "from_xccdf_" + os.path.basename(xccdf_file_name)
ds_collection.set("id", "scap_%s_collection_%s" % (ID_NS, name))
ds_collection.set("schematron-version", "1.3")
ds = ET.SubElement(ds_collection, "{%s}data-stream" % datastream_namespace)
ds.set("id", "scap_%s_datastream_%s" % (ID_NS, name))
ds.set("scap-version", "1.3")
ds.set("use-case", "OTHER")
dictionaries = ET.SubElement(ds, "{%s}dictionaries" % datastream_namespace)
checklists = ET.SubElement(ds, "{%s}checklists" % datastream_namespace)
checks = ET.SubElement(ds, "{%s}checks" % datastream_namespace)
cpe_dict_dependencies = [cpe_oval_file_name]
add_component(
ds_collection, dictionaries, cpe_dict_file_name, cpe_dict_dependencies)
xccdf_dependencies = [oval_file_name, ocil_file_name, cpe_oval_file_name]
add_component(
ds_collection, checklists, xccdf_file_name, xccdf_dependencies)
add_component(ds_collection, checks, oval_file_name)
add_component(ds_collection, checks, ocil_file_name)
add_component(ds_collection, checks, cpe_oval_file_name)
if sce_enabled:
embed_sce_checks_in_datastream(ds_collection, checklists, checks, build_dir)
if hasattr(ET, "indent"):
ET.indent(ds_collection, space=" ", level=0)
ds = ET.ElementTree(ds_collection)
# Move reference to remote OVAL content to a source data stream component
move_patches_up_to_date_to_source_data_stream_component(ds)
return ds
def _store_ds(ds, output):
ds.write(output, xml_declaration=True, encoding="utf-8")
def append_id_to_file_name(path, id_):
return "{0}_{2}{1}".format(*os.path.splitext(path) + (id_,))
def add_dir(path, dir):
return os.path.join(os.path.dirname(path), dir, os.path.basename(path))
def _get_thin_ds_output_path(output, file_name):
return add_dir(
append_id_to_file_name(
output,
os.path.splitext(os.path.basename(file_name))[0]
),
"thin_ds"
)
def _compose_multiple_ds(args):
for xccdf in glob.glob("{}/xccdf*.xml".format(args.multiple_ds)):
oval = xccdf.replace("xccdf", "oval")
ocil = xccdf.replace("xccdf", "ocil")
cpe_dict = xccdf.replace("xccdf", "cpe_dict")
cpe_oval = xccdf.replace("xccdf", "cpe_oval")
ds = compose_ds(
args.build_dir, xccdf, oval, ocil, cpe_dict, cpe_oval, args.enable_sce
)
output = _get_thin_ds_output_path(args.output, xccdf.replace("xccdf_", ""))
if not os.path.exists(os.path.dirname(output)):
os.makedirs(os.path.dirname(output))
_store_ds(ds, output)
if __name__ == "__main__":
args = parse_args()
ssg.xml.register_namespaces()
if args.multiple_ds != "off":
_compose_multiple_ds(args)
ds = compose_ds(
args.build_dir, args.xccdf, args.oval, args.ocil, args.cpe_dict,
args.cpe_oval, args.enable_sce
)
_store_ds(ds, args.output)
|