1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
#!/usr/bin/python3
import sys
import os
import argparse
import subprocess
import json
import ssg.build_remediations
import ssg.fixes
import ssg.rule_yaml
import ssg.utils
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
REMEDIATION_LANGS = list(ssg.build_remediations.REMEDIATION_TO_EXT_MAP)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("-j", "--json", type=str, action="store", default="build/rule_dirs.json",
help="File to read json output of rule_dir_json from (defaults "
"to build/rule_dirs.json)")
parser.add_argument("rule_id", type=str, help="Rule to change, by id")
parser.add_argument("fix_lang", choices=REMEDIATION_LANGS, help="Remediation to change")
parser.add_argument("action", choices=['add', 'remove', 'list', 'replace', 'delete', 'make_shared', 'diff'],
help="Rule to change, by id")
parser.add_argument("products", type=str, nargs='*',
help="Products or platforms to perform action with on rule_id. For replace, "
"the expected format is "
"platform[,other_platform]~platform[,other_platform] "
"where the first half is the platforms that are required to "
"match, and are replaced by the platforms in the second half "
"if all match. Add and remove require platforms and only apply "
"to the shared OVAL; delete, make_shared, and diff require products.")
return parser.parse_args()
def list_platforms(rule_obj, lang):
print("Computed products:")
for fix_id in sorted(rule_obj['remediations'].get(lang, {})):
fix = rule_obj['remediations'][lang][fix_id]
print(" - %s" % fix_id)
for product in sorted(fix.get('products', [])):
print(" - %s" % product)
print("")
print("Actual platforms:")
for rule_id in sorted(rule_obj['remediations'].get(lang, {})):
fix = rule_obj['remediations'][lang][rule_id]
fix_file = ssg.fixes.get_fix_path(rule_obj, lang, rule_id)
platforms = ssg.fixes.applicable_platforms(fix_file)
print(" - %s" % fix_id)
for platform in platforms:
print(" - %s" % platform)
print("")
def add_platforms(rule_obj, lang, platforms):
fix_file, fix_contents = ssg.fixes.get_fix_contents(rule_obj, lang, 'shared')
current_platforms = ssg.fixes.applicable_platforms(fix_file)
if "multi_platform_all" in current_platforms:
return
new_platforms = set(current_platforms)
new_platforms.update(platforms)
print("Current platforms: %s" % ','.join(sorted(current_platforms)))
print("New platforms: %s" % ','.join(sorted(new_platforms)))
new_contents = ssg.fixes.set_applicable_platforms(fix_contents,
new_platforms)
ssg.utils.write_list_file(fix_file, new_contents)
def remove_platforms(rule_obj, lang, platforms):
fix_file, fix_contents = ssg.fixes.get_fix_contents(rule_obj, lang, 'shared')
current_platforms = ssg.fixes.applicable_platforms(fix_file)
new_platforms = set(current_platforms).difference(platforms)
print("Current platforms: %s" % ','.join(sorted(current_platforms)))
print("New platforms: %s" % ','.join(sorted(new_platforms)))
new_contents = ssg.fixes.set_applicable_platforms(fix_contents,
new_platforms)
ssg.utils.write_list_file(fix_file, new_contents)
def replace_platforms(rule_obj, lang, platforms):
fix_file, fix_contents = ssg.fixes.get_fix_contents(rule_obj, lang, 'shared')
current_platforms = ssg.fixes.applicable_platforms(fix_file)
new_platforms = set(current_platforms)
for platform in platforms:
parsed_platform = platform.split('~')
if not len(parsed_platform) == 2:
print("Invalid platform replacement description: %s" % platform,
file=sys.stderr)
sys.exit(1)
match = ssg.utils.parse_platform(parsed_platform[0])
replacement = ssg.utils.parse_platform(parsed_platform[1])
if match.issubset(current_platforms):
new_platforms.difference_update(match)
new_platforms.update(replacement)
print("Current platforms: %s" % ','.join(sorted(current_platforms)))
print("New platforms: %s" % ','.join(sorted(new_platforms)))
new_contents = ssg.fixes.set_applicable_platforms(fix_contents,
new_platforms)
ssg.utils.write_list_file(fix_file, new_contents)
def delete_fixes(rule_obj, lang, products):
for product in products:
fix_file = ssg.fixes.get_fix_path(rule_obj, lang, product)
os.remove(fix_file)
print("Removed: %s" % fix_file)
def make_shared_fix(rule_obj, lang, products):
if not products or len(products) > 1:
raise ValueError("Must pass exactly one product for the make_shared option.")
if 'remediations' not in rule_obj or lang not in rule_obj['remediations']:
raise ValueError("Rule is missing fixes.")
lang_ext = ssg.build_remediations.REMEDIATION_TO_EXT_MAP[lang]
shared_name = "shared" + lang_ext
if shared_name in rule_obj['remediations'][lang]:
raise ValueError("Already have shared fix for rule_id:%s; refusing "
"to continue." % rule_obj['id'])
fix_file = ssg.fixes.get_fix_path(rule_obj, lang, products[0])
shared_fix_file = os.path.join(rule_obj['dir'], lang, shared_name)
os.rename(fix_file, shared_fix_file)
print("Moved %s -> %s" % (fix_file, shared_fix_file))
def diff_fixes(rule_obj, lang, products):
if not products or len(products) != 2 or products[0] == products[1]:
raise ValueError("Must pass exactly two products for the diff option.")
if 'remediations' not in rule_obj or lang not in rule_obj['remediations']:
raise ValueError("Rule is missing fixes.")
left_fix_file = ssg.fixes.get_fix_path(rule_obj, lang, products[0])
right_fix_file = ssg.fixes.get_fix_path(rule_obj, lang, products[1])
subprocess.run(['diff', '--color=always', left_fix_file, right_fix_file])
def main():
args = parse_args()
json_file = open(args.json, 'r')
known_rules = json.load(json_file)
if args.rule_id not in known_rules:
print("Error: rule_id:%s is not known!" % args.rule_id, file=sys.stderr)
print("If you think this is an error, try regenerating the JSON.", file=sys.stderr)
sys.exit(1)
if args.action != "list" and not args.products:
print("Error: expected a list of products or replace transformations but "
"none given.", file=sys.stderr)
sys.exit(1)
rule_obj = known_rules[args.rule_id]
print("rule_id:%s\n" % args.rule_id)
if args.action == "list":
list_platforms(rule_obj, args.fix_lang)
elif args.action == "add":
add_platforms(rule_obj, args.fix_lang, args.products)
elif args.action == "remove":
remove_platforms(rule_obj, args.fix_lang, args.products)
elif args.action == "replace":
replace_platforms(rule_obj, args.fix_lang, args.products)
elif args.action == 'delete':
delete_fixes(rule_obj, args.fix_lang, args.products)
elif args.action == 'make_shared':
make_shared_fix(rule_obj, args.fix_lang, args.products)
elif args.action == 'diff':
diff_fixes(rule_obj, args.fix_lang, args.products)
else:
print("Unknown option: %s" % args.action)
if __name__ == "__main__":
main()
|