1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
|
#!/usr/bin/python3
import argparse
import datetime
import time
import json
import os
import re
import sys
import xml.etree.ElementTree as ET
import ssg.build_yaml
import ssg.constants
import ssg.rules
import ssg.yaml
import ssg.environment
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SSG_BUILD_ROOT = os.path.join(SSG_ROOT, "build")
RULES_JSON = os.path.join(SSG_BUILD_ROOT, "rule_dirs.json")
BUILD_CONFIG = os.path.join(SSG_BUILD_ROOT, "build_config.yml")
NS = {'scap': ssg.constants.datastream_namespace,
'xccdf-1.2': ssg.constants.XCCDF12_NS}
PROFILE = 'stig'
def get_profile(product, profile_name):
ds_root = ET.parse(os.path.join(SSG_ROOT, 'build', 'ssg-{product}-ds.xml'
.format(product=product))).getroot()
profiles = ds_root.findall(
'.//{{{scap}}}component/{{{xccdf}}}Benchmark/{{{xccdf}}}Profile'.format(
scap=NS["scap"], xccdf=NS["xccdf-1.2"])
)
profile_name_fqdn = "xccdf_org.ssgproject.content_profile_{profile_name}".format(
profile_name=profile_name)
for profile in profiles:
if profile.attrib['id'] == profile_name_fqdn:
return profile
raise ValueError("Profile %s was not found." % profile_name_fqdn)
get_profile.__annotations__ = {'product': str, 'profile_name': str, 'return': ET.Element}
def filter_out_implemented_rules(known_rules, ns, root):
needed_rules = known_rules.copy()
groups = root.findall('.//scap:component/xccdf-1.2:Benchmark/xccdf-1.2:Group', ns)
for group in groups:
for stig in group.findall('xccdf-1.2:Rule', ns):
stig_id = stig.find('xccdf-1.2:version', ns).text
check = stig.find('xccdf-1.2:check', ns)
if stig_id in known_rules.keys() and len(check) > 0:
del needed_rules[stig_id]
return needed_rules
filter_out_implemented_rules.__annotations__ = {'known_rules': dict, 'ns': dict,
'root': ET.Element, 'return': dict}
def handle_rule_yaml(product, rule_id, rule_dir, guide_dir, env_yaml):
rule_obj = {'id': rule_id, 'dir': rule_dir, 'guide': guide_dir}
rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
rule_yaml.normalize(product)
rule_obj['references'] = rule_yaml.references
return rule_obj
handle_rule_yaml.__annotations__ = {'product': str, 'rule_id': str, 'rule_dir': str,
'guide_dir': str, 'env_yaml': dict, 'return': dict}
def get_platform_rules(product, json_path, resolved_rules_dir, build_root):
platform_rules = list()
if resolved_rules_dir:
rules_path = os.path.join(build_root, product, 'rules')
for file in os.listdir(rules_path):
if not file.endswith('.yml'):
continue
rule_id = file.replace('.yml', '')
rule = dict()
rule_yaml = ssg.yaml.open_raw(os.path.join(rules_path, file))
rule['id'] = rule_id
rule['references'] = dict()
rule['references'] = rule_yaml['references']
platform_rules.append(rule)
else:
rules_json_file = open(json_path, 'r')
rules_json = json.load(rules_json_file)
for rule in rules_json.values():
if product in rule['products']:
platform_rules.append(rule)
if not rules_json_file.closed:
rules_json_file.close()
return platform_rules
get_platform_rules.__annotations__ = {'product': str, 'json_path': str, 'resolved_rules_dir': bool,
'build_root': str, 'return': list}
def _open_rule_obj(resolved_rules_dir, rule, product, env_yaml):
if resolved_rules_dir:
return rule
try:
rule_obj = handle_rule_yaml(
product, rule['id'], rule['dir'], rule['guide'], env_yaml)
except ssg.yaml.DocumentationNotComplete:
msg = 'Rule %s throw DocumentationNotComplete' % rule['id']
sys.stderr.write(msg)
# Happens on non-debug build when a rule is "documentation-incomplete"
return None
return rule_obj
def get_implemented_stigs(product, root_path, build_config_yaml_path,
reference_str, json_path, resolved_rules_dir,
build_root):
platform_rules = get_platform_rules(product, json_path, resolved_rules_dir, build_root)
product_dir = os.path.join(root_path, "products", product)
product_yaml_path = os.path.join(product_dir, "product.yml")
env_yaml = ssg.environment.open_environment(
build_config_yaml_path, product_yaml_path, os.path.join(root_path, "product_properties"))
known_rules = dict()
for rule in platform_rules:
rule_obj = _open_rule_obj(resolved_rules_dir, rule, product, env_yaml)
if not rule_obj:
continue
if reference_str not in rule_obj['references'].keys():
continue
refs = rule_obj['references'][reference_str]
for ref in refs:
if ref in known_rules:
known_rules[ref].append(rule['id'])
else:
known_rules[ref] = [rule['id']]
return known_rules
get_implemented_stigs.__annotations__ = {'product': str, 'root_path': str,
'build_config_yaml_path': str, 'reference_str': str,
'json_path': str, 'resolved_rules_dir': bool,
'build_root': str, 'return': dict}
def setup_tailoring_profile(profile_id, profile_root):
tailoring_profile = ET.Element('xccdf-1.2:Profile')
if profile_id:
tailoring_profile.set('id', '{oscap}{profile_id}'
.format(oscap=ssg.constants.OSCAP_PROFILE, profile_id=profile_id))
else:
tailoring_profile.set('id', '{id}_delta_tailoring'.format(id=profile_root.attrib["id"]))
tailoring_profile.append(profile_root.find('xccdf-1.2:title', NS))
tailoring_profile.append(profile_root.find('xccdf-1.2:description', NS))
tailoring_profile.set('extends', profile_root.get('id'))
return tailoring_profile
setup_tailoring_profile.__annotations__ = {'profile_id': str, 'profile_root': ET.Element}
def _get_datetime():
return datetime.datetime.fromtimestamp(
int(os.environ.get('SOURCE_DATE_EPOCH', time.time()))).isoformat()
def create_tailoring(args):
benchmark_root = ET.parse(args.manual).getroot()
known_rules = get_implemented_stigs(args.product, args.root, args.build_config_yaml,
args.reference, args.json, args.resolved_rules_dir,
args.build_root)
needed_rules = filter_out_implemented_rules(known_rules, NS, benchmark_root)
needed_rule_names_set = set(rulename for ruleset in needed_rules.values() for rulename in ruleset)
profile_root = get_profile(args.product, args.profile)
selections = profile_root.findall('xccdf-1.2:select', NS)
tailoring_profile = setup_tailoring_profile(args.profile_id, profile_root)
for selection in selections:
if selection.attrib['idref'].startswith(ssg.constants.OSCAP_RULE):
cac_rule_id = selection.attrib['idref'].replace(ssg.constants.OSCAP_RULE, '')
desired_value = str(cac_rule_id in needed_rule_names_set).lower()
if selection.get('selected') != desired_value:
selection.set('selected', desired_value)
tailoring_profile.append(selection)
if not args.quiet:
print('Set rule "{cac_rule_id}" selection state to {desired_value}'
.format(cac_rule_id=cac_rule_id, desired_value=desired_value))
tailoring_root = ET.Element('xccdf-1.2:Tailoring')
version = ET.SubElement(tailoring_root, 'xccdf-1.2:version',
attrib={'time': _get_datetime()})
version.text = '1'
tailoring_root.set('id', args.tailoring_id)
tailoring_root.append(tailoring_profile)
return tailoring_root
create_tailoring.__annotations__ = {'args': argparse.Namespace, 'return': ET.Element}
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
help="Path to SSG root directory (defaults to {ssg_root})"
.format(ssg_root=SSG_ROOT))
parser.add_argument("-p", "--product", type=str, action="store", required=True,
help="What product to produce the tailoring file for")
parser.add_argument("-m", "--manual", type=str, action="store", required=True,
help="Path to XML XCCDF manual file to use as the source")
parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
help="Path to the rules_dir.json (defaults to build/stig_control.json)")
parser.add_argument("-c", "--build-config-yaml", default=BUILD_CONFIG,
help="YAML file with information about the build configuration. ")
parser.add_argument("-b", "--profile", type=str, default="stig",
help="What profile to use. Defaults to stig")
parser.add_argument("-ref", "--reference", type=str, default="stigid",
help="Reference system to check for, defaults to stigid")
parser.add_argument("-o", "--output", type=str,
help="Defaults build/PRODUCT_PROFILE_tailoring.xml")
parser.add_argument("--profile-id", type=str,
help="Id of the created profile. Defaults to PROFILE_delta")
parser.add_argument("--tailoring-id", type=str,
default='xccdf_content-disa-delta_tailoring_default',
help="Id of the created tailoring file. "
"Defaults to xccdf_content-disa-delta_tailoring_default")
parser.add_argument("-q", "--quiet", action='store_true',
help="The script will not produce any output.")
parser.add_argument("--resolved-rules-dir", action='store_true',
help="The script will not use rules_dir.json, "
"but instead uses the data from the build.")
parser.add_argument('-B', '--build-root', type=str, default=SSG_BUILD_ROOT,
help="The root of the CMake working directory, "
"defaults to {ssg_build_root}".format(ssg_build_root=SSG_BUILD_ROOT))
parser.add_argument("-d", "--dry-run", action="store_true",
help="If set the script will not output.")
return parser.parse_args()
parse_args.__annotations__ = {'return': argparse.Namespace}
def main():
args = parse_args()
ET.register_namespace('xccdf-1.2', ssg.constants.XCCDF12_NS)
tailoring_root = create_tailoring(args)
tree = ET.ElementTree(tailoring_root)
manual_version = re.search(r'(v[0-9]+r[0-9]+)', args.manual)
if manual_version is None:
sys.stderr.write("Unable to find version from file name.\n")
sys.stderr.write("The string v[NUM]r[NUM] must be in the filename.\n")
exit(1)
if args.dry_run:
return 0
if args.output:
out = os.path.join(args.output)
else:
out = os.path.join(SSG_ROOT, 'build',
'{product}_{profile}_{manual_version}_delta_tailoring.xml'
.format(product=args.product, profile=args.profile,
manual_version=manual_version.group(0)))
tree.write(out)
if not args.quiet:
print("Wrote tailoring file to {out}.".format(out=out))
if __name__ == '__main__':
main()
|