1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
#!/usr/bin/python3
import argparse
import csv
import enum
import os.path
import sys
from utils.create_srg_export import DisaStatus
try:
import ssg.constants
except (ModuleNotFoundError, ImportError):
sys.stderr.write("Unable to load ssg python modules.\n")
sys.stderr.write("Hint: run source ./.pyenv.sh\n")
exit(3)
class Problems(enum.Enum):
MISSING_REQUIREMENT = "Missing requirement"
REQUIREMENT_HAS_INVALID_CHARS = "Requirement contains \", &, ', <, or > and is invalid."
CHECK_BADWORDS = "Check contains should, shall, or please."
FIX_BADWORDS = "Fix contains should, shall, or please."
CHECK_BAD_START = "Check starts with ensure or interview"
FIX_BAD_START = "Fix starts with ensure or interview"
CHECK_IS_A_FIND = "Check doesn't contain ', this is a finding'"
INVALID_STATUS = "The status is not valid"
INVALID_SEVERITY = "Severity is not CAT I, CAT II, or CAT III"
def has_element(search: list, target: str) -> bool:
return len([elem for elem in search if (elem in target)]) != 0
def has_startswith(search: list, target: str) -> bool:
return len([elem for elem in search if (target.startswith(elem))]) != 0
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Audits a csv from utils/create_srg_export.py",
epilog="example: ./utils/srg_audit.py build/1642443529_stig_"
"export.csv")
parser.add_argument('file', type=str, help="A CSV file created by utils/create_srg_export.py")
return parser.parse_args()
def create_reduction_dicts() -> (dict, dict, dict, dict):
# The goal these dictionaries is to reduce all values the dictionary to zero,
# save optional values.
#
# Optional values should reduce to 2 if defined or if not defined should remain 3.
config_dict = {'Requirement': 1,
'Vul Discussion': 1,
'Status': 1,
'Check': 1,
'Fix': 1,
'Severity': 1,
'Mitigation': 0,
'Artifact Description': 0,
'Status Justification': 3,
'CCI': 1,
'SRGID': 1}
meets_dict = {'Requirement': 1,
'VulDiscussion': 1,
'Status': 1,
'Check': 0,
'Fix': 0,
'Severity': 1,
'Mitigation': 0,
'Artifact Description': 1,
'Status Justification': 1,
'CCI': 1,
'SRGID': 1}
does_not_meet = {'Requirement': 1,
'Vul Discussion': 1,
'Status': 1,
'Check': 0,
'Fix': 0,
'Severity': 1,
'Mitigation': 1,
'Artifact Description': 0,
'Status Justification': 1,
'CCI': 1,
'SRGID': 1}
not_applicable = {'Requirement': 1,
'Vul Discussion': 0,
'Status': 1,
'Check': 0,
'Fix': 0,
'Severity': 1,
'Mitigation': 1,
'Artifact Description': 0,
'Status Justification': 1,
'CCI': 1,
'SRGID': 1}
return config_dict, does_not_meet, meets_dict, not_applicable
def check_paths(file: str) -> None:
if not os.path.exists(file):
sys.stderr.write("Unable to perform audit.\n")
sys.stderr.write(f"File not found: {file}\n")
exit(1)
if not os.path.isfile(file):
sys.stderr.write("Unable to perform audit.\n")
sys.stderr.write(f"Input must be a file: {file}\n")
exit(2)
def validate_check_fix(results: dict, row: dict, row_id: str) -> None:
check_invalid = ['shall', 'should', 'please']
check_invalid_start = ['ensure', 'interview']
if has_element(check_invalid, row['Fix']):
results[row_id].append(Problems.FIX_BADWORDS.value)
if has_element(check_invalid, row['Check']):
results[row_id].append(Problems.CHECK_BADWORDS.value)
if has_startswith(check_invalid_start, row['Check']):
results[row_id].append(Problems.CHECK_BAD_START.value)
if has_startswith(check_invalid_start, row['Fix']):
results[row_id].append(Problems.FIX_BAD_START.value)
if 'this is a finding.' not in row['Check']:
results[row_id].append(Problems.CHECK_IS_A_FIND.value)
def get_results_dict(row: dict) -> dict:
config_dict, does_not_meet, meets_dict, not_applicable = create_reduction_dicts()
if row['Status'] == DisaStatus.AUTOMATED:
results = config_dict.copy()
elif row['Status'] == DisaStatus.INHERENTLY_MET:
results = meets_dict.copy()
elif row['Status'] == DisaStatus.DOES_NOT_MEET:
results = does_not_meet.copy()
elif row['Status'] == DisaStatus.NOT_APPLICABLE:
results = not_applicable.copy()
else:
results = None
return results
def process_row_results(errors: dict, results: dict, row: dict) -> None:
for result in results:
if results[result] == 1:
errors[get_row_id(row)].append(f'Field {result} is not defined.')
elif results[result] == -1:
errors[get_row_id(row)].append(f'Field {result} is defined and should not be.')
elif results[result] not in [0, 2, 3]:
raise AttributeError(f"Config data is not defined correctly. "
f"Result = {result}, {results[result]}")
def validate_base_rows(errors: dict, row: dict) -> None:
if get_row_id(row) not in errors:
errors[get_row_id(row)] = list()
if not row['Requirement']:
errors[get_row_id(row)].append(Problems.MISSING_REQUIREMENT.value)
req_bad_chars = ['\"', "&", "'", "<", ">"]
if has_element(req_bad_chars, row['Requirement']):
errors[get_row_id(row)].append(Problems.REQUIREMENT_HAS_INVALID_CHARS.value)
def set_blank_rows_results(results: dict, row: dict) -> None:
for col in row:
col_value = row[col].strip()
if col_value != '' and col in results:
results[col] -= 1
def print_results(errors: dict) -> None:
for result in errors:
result_errors = errors[str(result)]
if len(result_errors) > 0:
print(result)
for error in result_errors:
print(f'\t{error}')
def get_row_id(row: dict) -> str:
if row['STIGID'] is not None and row['STIGID'] != '':
return row['STIGID']
else:
return row['SRGID']
def main():
args = parse_args()
check_paths(args.file)
with open(args.file, 'r') as f:
reader = csv.DictReader(f)
errors = dict()
for row in reader:
validate_base_rows(errors, row)
results = get_results_dict(row)
if row['Status'] == DisaStatus.AUTOMATED:
validate_check_fix(errors, row, get_row_id(row))
if row['Status'] not in DisaStatus.STATUSES:
errors[get_row_id(row)].append(Problems.INVALID_STATUS.value)
continue
set_blank_rows_results(results, row)
process_row_results(errors, results, row)
print_results(errors)
if __name__ == '__main__':
main()
|