1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
#!/usr/bin/env python3
"""CLI for building a component definition for a product using a policy for control responses."""
import argparse
import logging
import os
import sys
from typing import Any, Dict
import ssg.environment
from utils.oscal import SSG_ROOT, VENDOR_ROOT, RULES_JSON, BUILD_CONFIG, LOGGER_NAME
from utils.oscal.control_selector import PolicyControlSelector
from utils.oscal.cd_generator import ComponentDefinitionGenerator
logger = logging.getLogger(LOGGER_NAME)
LOG_FILE = os.path.join(SSG_ROOT, "build", "build_cd_for_product.log")
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Create a component definition for a product."
)
parser.add_argument("-o", "--output", help="Path to write the cd to", required=True)
parser.add_argument(
"-r",
"--root",
help=f"Root of the SSG project. Defaults to {SSG_ROOT}",
default=SSG_ROOT,
)
parser.add_argument(
"-v",
"--vendor-dir",
help="Path to the vendor directory with third party OSCAL artifacts",
default=VENDOR_ROOT,
)
parser.add_argument(
"-p",
"--profile",
help="Main profile href, or name of the profile model in the trestle workspace",
required=True,
)
parser.add_argument(
"-pr",
"--product",
help="Product to build cd with",
required=True,
)
parser.add_argument(
"-c",
"--control",
help="Control to use as the source for control responses. \
To optionally filter by level, use the format <control_id>:<level>.",
required=True,
)
parser.add_argument(
"-j",
"--json",
type=str,
action="store",
default=RULES_JSON,
help=f"Path to the rules_dir.json (defaults to {RULES_JSON})",
)
parser.add_argument(
"-b",
"--build-config-yaml",
default=BUILD_CONFIG,
help="YAML file with information about the build configuration",
)
parser.add_argument(
"--component-definition-type",
choices=["service", "validation"],
default="service",
help="Type of component definition to create",
)
return parser.parse_args()
def configure_logger(log_file=None, log_level=logging.INFO):
"""Configure the logger."""
logger.setLevel(log_level)
log_format_file = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter_file = logging.Formatter(log_format_file)
log_format_console = "%(levelname)s - %(message)s"
formatter_console = logging.Formatter(log_format_console)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter_file)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter_console)
logger.addHandler(console_handler)
def get_env_yaml(ssg_root: str, build_config_yaml: str, product: str) -> Dict[str, Any]:
"""Get the environment yaml."""
product_yaml_path = ssg.products.product_yaml_path(ssg_root, product)
env_yaml = ssg.environment.open_environment(
build_config_yaml,
product_yaml_path,
os.path.join(ssg_root, "product_properties"),
)
return env_yaml
def main():
"""Main function."""
args = _parse_args()
configure_logger(LOG_FILE, log_level=logging.INFO)
filter_by_level = None
if ":" in args.control:
args.control, filter_by_level = args.control.split(":")
env_yaml = get_env_yaml(args.root, args.build_config_yaml, args.product)
control_selector = PolicyControlSelector(
args.control,
args.root,
env_yaml,
filter_by_level,
)
cd_generator = ComponentDefinitionGenerator(
args.root,
args.json,
env_yaml,
args.vendor_dir,
args.profile,
control_selector,
)
try:
cd_generator.create_cd(args.output, args.component_definition_type)
except ValueError as e:
logger.error(f"Invalid value: {e}", exc_info=True)
sys.exit(2)
except FileNotFoundError as e:
logger.error(f"File not found: {e}", exc_info=True)
sys.exit(3)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()
|