1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
|
"""Build a component definition for a product from pre-existing OSCAL profiles"""
import logging
import pathlib
import re
from typing import Any, Dict, List, Optional, Set, Tuple
from trestle.common.common_types import TypeWithProps, TypeWithParts
from trestle.common.const import TRESTLE_HREF_HEADING, IMPLEMENTATION_STATUS, REPLACE_ME
from trestle.common.list_utils import as_list, none_if_empty
from trestle.core.generators import generate_sample_model
from trestle.core.catalog.catalog_interface import CatalogInterface
from trestle.core.control_interface import ControlInterface
from trestle.core.profile_resolver import ProfileResolver
from trestle.oscal import catalog as cat
from trestle.oscal.common import Property
from trestle.oscal.component import (
ComponentDefinition,
DefinedComponent,
ControlImplementation,
ImplementedRequirement,
Statement,
SetParameter,
)
from ssg.controls import Status, Control
from ssg.utils import required_key
from utils.oscal import add_prop
from utils.oscal.control_selector import ControlSelector
from utils.oscal.params_extractor import ParameterExtractor
from utils.oscal.rules_transformer import RulesTransformer, RuleInfo
from utils.oscal import LOGGER_NAME
logger = logging.getLogger(LOGGER_NAME)
SECTION_PATTERN = r"Section ([a-z]):"
class OscalStatus:
"""
Represent the status of a control in OSCAL.
Notes:
This transforms the status from SSG to OSCAL in the from
string method.
"""
PLANNED = "planned"
NOT_APPLICABLE = "not-applicable"
ALTERNATIVE = "alternative"
IMPLEMENTED = "implemented"
PARTIAL = "partial"
@staticmethod
def from_string(source: str) -> str:
data = {
Status.INHERENTLY_MET: OscalStatus.IMPLEMENTED,
Status.DOES_NOT_MEET: OscalStatus.ALTERNATIVE,
Status.DOCUMENTATION: OscalStatus.IMPLEMENTED,
Status.AUTOMATED: OscalStatus.IMPLEMENTED,
Status.MANUAL: OscalStatus.ALTERNATIVE,
Status.PLANNED: OscalStatus.PLANNED,
Status.PARTIAL: OscalStatus.PARTIAL,
Status.SUPPORTED: OscalStatus.IMPLEMENTED,
Status.PENDING: OscalStatus.ALTERNATIVE,
Status.NOT_APPLICABLE: OscalStatus.NOT_APPLICABLE,
}
if source not in data.keys():
raise ValueError(f"Invalid status: {source}. Use one of {data.keys()}")
return data.get(source) # type: ignore
STATUSES = {PLANNED, NOT_APPLICABLE, ALTERNATIVE, IMPLEMENTED, PARTIAL}
class OSCALProfileHelper:
"""Helper class to handle OSCAL profile."""
def __init__(self, trestle_root: pathlib.Path) -> None:
"""Initialize."""
self._root = trestle_root
self.profile_controls: Set[str] = set()
self.controls_by_label: Dict[str, str] = dict()
def load(self, profile_path: str) -> None:
"""Load the profile catalog."""
profile_resolver = ProfileResolver()
resolved_catalog: cat.Catalog = profile_resolver.get_resolved_profile_catalog(
self._root,
profile_path,
block_params=False,
params_format="[.]",
show_value_warnings=True,
)
for control in CatalogInterface(resolved_catalog).get_all_controls_from_dict():
self.profile_controls.add(control.id)
label = ControlInterface.get_label(control)
if label:
self.controls_by_label[label] = control.id
self._handle_parts(control)
def _handle_parts(
self,
control: TypeWithParts,
) -> None:
"""Handle parts of a control."""
if control.parts:
for part in control.parts:
if not part.id:
continue
self.profile_controls.add(part.id)
label = ControlInterface.get_label(part)
# Avoiding key collision here. The higher level control object will take
# precedence.
if label and label not in self.controls_by_label.keys():
self.controls_by_label[label] = part.id
self._handle_parts(part)
def validate(self, control_id: str) -> Optional[str]:
"""Validate that the control id exists in the catalog and return the id"""
if control_id in self.controls_by_label.keys():
logger.debug(f"Found control {control_id} in control labels")
return self.controls_by_label.get(control_id)
elif control_id in self.profile_controls:
logger.debug(f"Found control {control_id} in profile control ids")
return control_id
logger.debug(f"Control {control_id} does not exist in the profile")
return None
class ComponentDefinitionGenerator:
"""Generate a component definition from a product"""
def __init__(
self,
root: str,
json_path: str,
env_yaml: Dict[str, Any],
vendor_dir: str,
profile_name_or_href,
control_selector: ControlSelector,
) -> None:
"""
Initialize the component definition generator and load the necessary files.
Args:
root: Root of the SSG project
json_path: Path to the rules_dir.json file
env_yaml: Yaml file with environment information
vendor_dir: Path to the vendor directory
profile_name_or_href: Name or href of the profile to use
control_selector: Control selector that contains control responses
"""
self.ssg_root = root
self.trestle_root = pathlib.Path(vendor_dir)
self.product = required_key(env_yaml, "product")
self.env_yaml = env_yaml
self.control_selector = control_selector
profile_path, profile_href = self.get_source(profile_name_or_href)
self.profile_href = profile_href
self.profile = OSCALProfileHelper(self.trestle_root)
self.profile.load(profile_path)
self.params_extractor = ParameterExtractor(root, self.env_yaml)
self.rules_transformer = RulesTransformer(
root, self.env_yaml, json_path, self.params_extractor
)
def get_source(self, profile_name_or_href: str) -> Tuple[str, str]:
"""Get the source of the profile."""
profile_in_trestle_dir = "://" not in profile_name_or_href
profile_href = profile_name_or_href
if profile_in_trestle_dir:
local_path = f"profiles/{profile_name_or_href}/profile.json"
profile_href = TRESTLE_HREF_HEADING + local_path
profile_path = str(self.trestle_root / local_path)
else:
profile_path = profile_href
return profile_path, profile_href
def create_implemented_requirement(
self, control: Control
) -> Optional[ImplementedRequirement]:
"""Create implemented requirement from a control object"""
logger.info(f"Creating implemented requirement for {control.id}")
control_id = self.profile.validate(control.id)
if control_id:
implemented_req = generate_sample_model(ImplementedRequirement)
implemented_req.control_id = control_id
self.handle_response(implemented_req, control)
rule_ids, params_values = self._process_rule_ids(control.rules)
self.add_rules(implemented_req, rule_ids, params_values)
return implemented_req
return None
def add_rules(
self,
type_with_props: TypeWithProps,
rule_ids: List[str],
params_values: Optional[Dict[str, str]] = None,
) -> None:
"""Add rules to a type with props."""
all_props: List[Property] = as_list(type_with_props.props)
self.rules_transformer.add_rules(rule_ids, params_values)
rule_properties: List[Property] = self.rules_transformer.get_rule_id_props(
rule_ids
)
all_props.extend(rule_properties)
type_with_props.props = none_if_empty(all_props)
def _process_rule_ids(
self, rule_ids: List[str]
) -> Tuple[List[str], Dict[str, str]]:
"""
Process rule ids.
Returns:
A tuple of processed rule ids and parameter selection values.
Notes: Rule ids with an "=" are parameters and should not be included when searching for
rules.
"""
processed_rule_ids: List[str] = list()
params_values: Dict[str, str] = dict()
for rule_id in rule_ids:
parts = rule_id.split("=")
if len(parts) == 2:
param_id, value = parts
params_values[param_id] = value
else:
processed_rule_ids.append(rule_id)
return (processed_rule_ids, params_values)
def handle_response(self, implemented_req, control: Control) -> None:
"""
Break down the response into parts.
Args:
implemented_req: The implemented requirement to add the response and statements to.
control_response: The control response to add to the implemented requirement.
"""
control_response = control.notes
pattern = re.compile(SECTION_PATTERN, re.IGNORECASE)
sections_dict = self.build_sections_dict(control_response, pattern)
oscal_status = OscalStatus.from_string(control.status)
if sections_dict:
self._add_response_by_status(implemented_req, oscal_status, REPLACE_ME)
# process into statements
implemented_req.statements = list()
for section_label, section_content in sections_dict.items():
statement_id = self.profile.validate(
f"{implemented_req.control_id}_smt.{section_label}"
)
if statement_id is None:
continue
section_content_str = "\n".join(section_content)
section_content_str = pattern.sub("", section_content_str)
statement = self.create_statement(
statement_id, section_content_str.strip()
)
implemented_req.statements.append(statement)
else:
self._add_response_by_status(
implemented_req, oscal_status, control_response.strip()
)
@staticmethod
def build_sections_dict(
control_response: str, section_pattern: re.Pattern
) -> Dict[str, List[str]]:
"""Find all sections in the control response and build a dictionary of them."""
lines = control_response.split("\n")
sections_dict: Dict[str, List[str]] = dict()
current_section_label = None
for line in lines:
match = section_pattern.match(line)
if match:
current_section_label = match.group(1)
sections_dict[current_section_label] = [line]
elif current_section_label is not None:
sections_dict[current_section_label].append(line)
return sections_dict
@staticmethod
def _add_response_by_status(
impl_req: ImplementedRequirement,
implementation_status: str,
control_response: str,
) -> None:
"""
Add the response to the implemented requirement depending on the status.
Notes: Per OSCAL requirements, any status other than implemented and partial should have
remarks with justification for the status.
"""
status_prop = add_prop(IMPLEMENTATION_STATUS, implementation_status, "")
if (
implementation_status == OscalStatus.IMPLEMENTED
or implementation_status == OscalStatus.PARTIAL
):
impl_req.description = control_response
else:
status_prop.remarks = control_response
impl_req.props = as_list(impl_req.props)
impl_req.props.append(status_prop)
def create_statement(self, statement_id, description="") -> Statement:
"""Create a statement."""
statement = generate_sample_model(Statement)
statement.statement_id = statement_id
if description:
statement.description = description
return statement
def add_set_parameters(self, control_implementation: ControlImplementation) -> None:
"""Add set parameters to a type with props."""
param_selections: Dict[
str, str
] = self.params_extractor.get_all_selected_values()
if param_selections:
all_set_params: List[SetParameter] = as_list(
control_implementation.set_parameters
)
for param_id, value in param_selections.items():
set_param = generate_sample_model(SetParameter)
set_param.param_id = param_id
set_param.values = [value]
all_set_params.append(set_param)
control_implementation.set_parameters = none_if_empty(all_set_params)
def create_control_implementation(self) -> ControlImplementation:
"""Get the control implementation for a component."""
ci = generate_sample_model(ControlImplementation)
ci.source = self.profile_href
all_implement_reqs = list()
for control in self.control_selector.get_controls():
implemented_req = self.create_implemented_requirement(control)
if implemented_req:
all_implement_reqs.append(implemented_req)
ci.implemented_requirements = all_implement_reqs
self.add_set_parameters(ci)
return ci
def create_cd(
self, output: str, component_definition_type: str = "service"
) -> None:
"""Create a component definition and write it to a file."""
logger.info(f"Creating component definition for {self.product}")
component_definition = generate_sample_model(ComponentDefinition)
component_definition.metadata.title = f"Component definition for {self.product}"
component_definition.components = list()
control_implementation: ControlImplementation = (
self.create_control_implementation()
)
if not control_implementation.implemented_requirements:
logger.warning(
f"No implemented requirements found for {self.product}, exiting"
)
return
oscal_component = generate_sample_model(DefinedComponent)
oscal_component.title = self.product
oscal_component.type = component_definition_type
oscal_component.description = self.product
oscal_component.control_implementations = [control_implementation]
# Create all of the top-level component properties for rules
rules: List[RuleInfo] = self.rules_transformer.get_all_rules()
all_rule_properties: List[Property] = self.rules_transformer.transform(rules)
oscal_component.props = none_if_empty(all_rule_properties)
component_definition.components.append(oscal_component)
output_str = output
out_path = pathlib.Path(output_str)
logger.info(f"Writing component definition to {out_path}")
component_definition.oscal_write(out_path)
|