1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
#!/usr/bin/python3
import sys
import os
import os.path
import argparse
import ssg.build_remediations as remediation
import ssg.build_yaml
import ssg.rules
import ssg.jinja
import ssg.environment
import ssg.utils
from ssg.utils import mkdir_p
import ssg.xml
from ssg.build_cpe import ProductCPEs
def parse_args():
p = argparse.ArgumentParser(
description="This script collects all remediation scripts, both "
"static and templated, processes them with Jinja and puts them to "
"the given output directory.")
p.add_argument(
"--build-config-yaml", required=True,
help="YAML file with information about the build configuration. "
"e.g.: ~/scap-security-guide/build/build_config.yml"
)
p.add_argument(
"--product-yaml", required=True,
help="YAML file with information about the product we are building. "
"e.g.: ~/scap-security-guide/rhel9/product.yml"
)
p.add_argument(
"--resolved-rules-dir", required=True,
help="Directory with <rule-id>.yml resolved rule YAMLs"
)
p.add_argument(
"--remediation-type", required=True, action="append",
help="language or type of the remediations we are combining."
"example: ansible")
p.add_argument(
"--output-dir", required=True,
help="output directory where all remediations will be saved"
)
p.add_argument(
"--fixes-from-templates-dir", required=True,
help="directory from which we will collect fixes generated from "
"templates")
p.add_argument(
"--platforms-dir", required=True,
help="directory from which we collect compiled platforms")
p.add_argument(
"--cpe-items-dir", required=True,
help="directory from which we collect compiled CPE items")
return p.parse_args()
def prepare_output_dirs(output_dir, remediation_types):
output_dirs = dict()
for lang in remediation_types:
language_output_dir = os.path.join(output_dir, lang)
mkdir_p(language_output_dir)
output_dirs[lang] = language_output_dir
return output_dirs
def find_remediation(
fixes_from_templates_dir, rule_dir, lang, product, expected_file_name):
language_fixes_from_templates_dir = os.path.join(
fixes_from_templates_dir, lang)
fix_path = None
# first look for a static remediation
rule_dir_remediations = remediation.get_rule_dir_remediations(
rule_dir, lang, product)
if len(rule_dir_remediations) > 0:
# first item in the list has the highest priority
fix_path = rule_dir_remediations[0]
if fix_path is None:
# check if we have a templated remediation instead
if os.path.isdir(language_fixes_from_templates_dir):
templated_fix_path = os.path.join(
language_fixes_from_templates_dir, expected_file_name)
if os.path.exists(templated_fix_path):
fix_path = templated_fix_path
return fix_path
def process_remediation(
rule, fix_path, lang, output_dirs, expected_file_name, env_yaml, cpe_platforms):
remediation_cls = remediation.REMEDIATION_TO_CLASS[lang]
remediation_obj = remediation_cls(fix_path)
remediation_obj.associate_rule(rule)
fix = remediation.process(remediation_obj, env_yaml, cpe_platforms)
if fix:
output_file_path = os.path.join(output_dirs[lang], expected_file_name)
remediation.write_fix_to_file(fix, output_file_path)
def collect_remediations(
rule, langs, fixes_from_templates_dir, product, output_dirs,
env_yaml, cpe_platforms):
rule_dir = os.path.dirname(rule.definition_location)
for lang in langs:
ext = remediation.REMEDIATION_TO_EXT_MAP[lang]
expected_file_name = rule.id_ + ext
fix_path = find_remediation(
fixes_from_templates_dir, rule_dir, lang, product,
expected_file_name)
if fix_path is None:
# neither static nor templated remediation found
continue
try:
process_remediation(
rule, fix_path, lang, output_dirs, expected_file_name, env_yaml, cpe_platforms)
except Exception as exc:
msg = (
"Failed to dispatch {lang} remediation for {rule_id}: {error}"
.format(lang=lang, rule_id=rule.id_, error=str(exc)))
raise RuntimeError(msg)
def main():
args = parse_args()
env_yaml = ssg.environment.open_environment(
args.build_config_yaml, args.product_yaml)
product = ssg.utils.required_key(env_yaml, "product")
output_dirs = prepare_output_dirs(args.output_dir, args.remediation_type)
product_cpes = ProductCPEs()
product_cpes.load_cpes_from_directory_tree(args.cpe_items_dir, env_yaml)
cpe_platforms = dict()
for platform_file in os.listdir(args.platforms_dir):
platform_path = os.path.join(args.platforms_dir, platform_file)
try:
platform = ssg.build_yaml.Platform.from_yaml(platform_path, env_yaml, product_cpes)
except ssg.build_yaml.DocumentationNotComplete:
# Happens on non-debug build when a platform is
# "documentation-incomplete"
continue
cpe_platforms[platform.name] = platform
for rule_file in os.listdir(args.resolved_rules_dir):
rule_path = os.path.join(args.resolved_rules_dir, rule_file)
try:
rule = ssg.build_yaml.Rule.from_yaml(rule_path, env_yaml)
except ssg.build_yaml.DocumentationNotComplete:
# Happens on non-debug build when a rule is
# "documentation-incomplete"
continue
collect_remediations(
rule, args.remediation_type, args.fixes_from_templates_dir,
product, output_dirs, env_yaml, cpe_platforms)
sys.exit(0)
if __name__ == "__main__":
main()
|