File: add_kubernetes_rule.py

package info (click to toggle)
scap-security-guide 0.1.76-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 110,644 kB
  • sloc: xml: 241,883; sh: 73,777; python: 32,527; makefile: 27
file content (529 lines) | stat: -rwxr-xr-x 20,213 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
#!/usr/bin/python3

import argparse
import subprocess
import sys
import textwrap
import os
import re
import time
import yaml

from ssg.utils import mkdir_p

class JinjaString(str):
    pass

def jinja_string_representer(dumper, data):
    # yuumasato: The Jinja syntax is not compatible with YAML as curly braces are special characters.
    # So we use an exotic token to mark where opening and closing Jinja tags would be, they are substituted
    # by actual `{{{` and `}}}` right before being written to file.
    # A the moment this is restricted to macro calls, but could be expanded to conditionals as well.
    sanitized = re.sub(r"{{{", "JiNjA_OpEn", data)
    sanitized = re.sub(r"}}}", "JiNjA_ClOsE", sanitized)
    return dumper.represent_scalar(u'tag:yaml.org,2002:str', sanitized, style="|")

yaml.add_representer(JinjaString, jinja_string_representer)

PROG_DESC = (''' Create and test content files for Kubernetes API checks.

This script is intended to help content writers create a new application check
for OCP4/Kubernetes.

- The 'create' subcommand creates the initial files for a new rule and fetches
  the raw URL of the object in question (unless you specify the URL).

- The 'test' subcommand builds your content locally and tests directly using an
  openscap podman container. The scan container will test against yaml files
  staged under --objectdir.

- The 'cluster-test' subcommand pushes the content to your cluster, and then
  runs a Platform scan for your rule with compliance-operator.

Example workflow:

$ utils/add_kubernetes_rule.py create --rule=ocp_proxy_has_ca \
  --type="proxies.config" --name="cluster" \
  --yamlpath=".spec.trustedCA.name" --match="[a-zA-Z0-9]*"
creating check for "/apis/config.openshift.io/v1/proxies/cluster" with yamlpath ".spec.trustedCA.name" satisfying match of "[a-zA-Z0-9]*"
wrote applications/openshift/ocp_proxy_has_ca/rule.yml

$ mkdir -p /tmp/apis/config.openshift.io/v1/proxies/
$ oc get proxies.config/cluster -o yaml > /tmp/apis/config.openshift.io/v1/proxies/cluster
$ utils/add_kubernetes_rule.py test --rule=ocp_proxy_has_ca
testing rule ocp_proxy_has_ca locally
Title
        None
Rule
        xccdf_org.ssgproject.content_rule_ocp_proxy_has_ca
Ident
        CCE-84209-6
Result
        pass

$ utils/add_kubernetes_rule.py cluster-test --rule=ocp_proxy_has_ca
testing rule ocp_proxy_has_ca in-cluster
deploying compliance-operator
pushing image build to cluster
waiting for cleanup from previous test run
output from last phase check: LAUNCHING NOT-AVAILABLE
output from last phase check: RUNNING NOT-AVAILABLE
output from last phase check: AGGREGATING NOT-AVAILABLE
output from last phase check: DONE COMPLIANT
COMPLIANT

''')

OCP_RULE_DIR = 'applications/openshift'
OSCAP_TEST_IMAGE = 'quay.io/compliance-operator/openscap-ocp:1.3.4'
OSCAP_CMD_TEMPLATE = 'oscap xccdf eval --verbose %s --fetch-remote-resources --profile xccdf_org.ssgproject.content_profile_test --results-arf /tmp/report-arf.xml /content/ssg-ocp4-ds.xml'
PROFILE_PATH = 'products/ocp4/profiles/test.profile'

MOCK_VERSION = ('''status:
  versions:
  - name: operator
    version: 4.6.0-0.ci-2020-06-15-112708
  - name: openshift-apiserver
    version: 4.6.0-0.ci-2020-06-15-112708
''')


def set_operation_value(value, template_vars):
    if value:
        template_vars['operation'] = 'pattern match'
        template_vars['type'] = 'string'

def set_entity_value(value, template_vars):
    if value is not None:
        template_vars['entity_check'] = value

def set_check_existence_value(value, template_vars):
    if value is not None:
        template_vars['check_existence'] = value

def set_template_vars(value, template_vars):
    for var in value.split(","):
        key, value = var.strip().split(":")
        template_vars[key.strip()] = value.strip()

PROFILE_TEMPLATE = ('''documentation_complete: true

title: 'Test Profile for {RULE_NAME}'

platform: ocp4

description: Test Profile
selections:
- {RULE_NAME}
''')


TEST_SCAN_TEMPLATE = ('''apiVersion: compliance.openshift.io/v1alpha1
kind: ComplianceScan
metadata:
  name: test
  namespace: {NAMESPACE}
spec:
  scanType: {TYPE}
  profile: {PROFILE}
  content: ssg-ocp4-ds.xml
  contentImage: image-registry.openshift-image-registry.svc:5000/openshift-compliance/openscap-ocp4-ds:latest
  debug: true
''')


def needs_oc(func):
    def wrapper(args):
        if which('oc') is None:
            print('oc is required for this command.')
            return 1

        return func(args)
    return wrapper


def needs_working_cluster(func):
    def wrapper(args):
        ret_code, output = subprocess.getstatusoutput(
            'oc whoami')
        if ret_code != 0:
            print("* Error connecting to cluster")
            print(output)
            return ret_code

        return func(args)
    return wrapper

def which(program):
    fpath, fname = os.path.split(program)
    if fpath:
        if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
            return program
    else:
        for path in os.environ["PATH"].split(os.pathsep):
            exe_file = os.path.join(path, program)
            if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
                return exe_file

    return None


def create_base_rule(args, url=None, node_rule=False):
    rule_yaml = dict()
    rule_yaml['documentation_complete'] = True
    rule_yaml['title'] = args.title
    if node_rule:
        rule_yaml['platform'] = 'ocp4-node'
    rule_yaml['description'] = args.description
    rule_yaml['rationale'] = 'TBD'
    rule_yaml['identifiers'] = dict()
    rule_yaml['severity'] = args.severity
    if args.jqfilter:
        rule_yaml['warnings'] = [{'general': JinjaString("{{{ openshift_filtered_cluster_setting({'%s': '%s'}) | indent(4) }}}" % (url, args.jqfilter))}]
    elif url:
        rule_yaml['warnings'] = [{'general': JinjaString('{{{ openshift_cluster_setting("%s") | indent(4) }}}' % (url))}]
    rule_yaml['template'] = dict()

    return rule_yaml


def save_rule(rule_yaml_path, rule_yaml):
    with open(rule_yaml_path, 'w') as f:
        yaml_contents = yaml.dump(rule_yaml, None, indent=4, sort_keys=False, canonical=False, default_flow_style=False, width=120)
        # Adds a blank line between keys
        formatted_yaml_contents = re.sub(r"\n(\w+:.*)", r"\n\n\1", yaml_contents)

        # Replace placeholders for CaC/content Jinja2 expressions
        formatted_yaml_contents = re.sub(r"JiNjA_OpEn", r"{{{", formatted_yaml_contents)
        formatted_yaml_contents = re.sub(r"JiNjA_ClOsE", r"}}}", formatted_yaml_contents)

        f.write(formatted_yaml_contents)
    print('* Wrote ' + rule_yaml_path)


def createNodeRuleFunc(args):
    group_path = os.path.join(OCP_RULE_DIR, args.group)
    if args.group:
        if not os.path.isdir(group_path):
            print("ERROR: The specified group '%s' doesn't exist in the '%s' directory" % (
                args.group, OCP_RULE_DIR))
            return 0

    rule_path = os.path.join(group_path, args.rule)
    rule_yaml_path = os.path.join(rule_path, 'rule.yml')

    mkdir_p(rule_path)

    rule_yaml = create_base_rule(args)

    template = rule_yaml['template']
    template['name'] = args.template

    template['vars'] = dict()
    template_vars = set_template_vars(args.template_vars, template['vars'])

    save_rule(rule_yaml_path, rule_yaml)
    return 0


@needs_oc
def createPlatformRuleFunc(args):
    url = args.url
    retries = 0
    namespace_flag = ''
    if args.namespace is not None:
        namespace_flag = '-n ' + args.namespace
    elif args.all_namespaces:
        namespace_flag = '-A'

    group_path = os.path.join(OCP_RULE_DIR, args.group)
    if args.group:
        if not os.path.isdir(group_path):
            print("ERROR: The specified group '%s' doesn't exist in the '%s' directory" % (
                args.group, OCP_RULE_DIR))
            return 0

    rule_path = os.path.join(group_path, args.rule)
    while url is None and retries < 5:
        retries += 1
        cmdstr = 'oc get %s' % (args.type)

        if args.name:
            cmdstr += ' ' + args.name

        cmdstr += ' %s --loglevel=6' % (namespace_flag)

        print("Running: " + cmdstr)
        ret_code, output = subprocess.getstatusoutput(cmdstr)

        if ret_code != 0:
            print('error running oc, check connection to the cluster: %d\n %s' % (
                ret_code, output))
            continue

        fetch_line = ""
        url_part = ""
        lines = output.splitlines()
        for line in lines:
            if 'GET' in line:
                fetch_line = line
                break

        if len(fetch_line) > 0:
            # extract the object url from the debug line
            full_url = fetch_line[fetch_line.index("GET"):].split(" ")[1]
            url_part = full_url[full_url.rfind("/api"):]

        if len(url_part) > 0 and '/api' in url_part:
            url = url_part

    if url is None:
        print('there was a problem finding the URL from the oc debug output. Hint: override this automatic check with --url')
        return 1

    print('* Creating check for "%s" with yamlpath "%s" satisfying match of "%s"' % (
        url, args.yamlpath, args.match))
    rule_yaml_path = os.path.join(rule_path, 'rule.yml')

    mkdir_p(rule_path)

    rule_yaml = create_base_rule(args, url)

    template = rule_yaml['template']
    template['name'] = 'yamlfile_value'

    template['vars'] = dict()
    template_vars = template['vars']
    template_vars['ocp_data'] = "true"
    if args.jqfilter:
        template_vars['filepath'] = JinjaString("{{{ openshift_filtered_path('%s', '%s') }}}" % (url, args.jqfilter))
    else:
        template_vars['filepath'] = url
    template_vars['yamlpath'] = args.yamlpath

    set_entity_value(args.match_entity, template_vars)
    set_check_existence_value(args.check_existence, template_vars)

    if args.match:
        value_dict = dict()
        value_dict['value'] = args.match
        set_operation_value(args.regex, value_dict)

        template_vars['values'] = [value_dict]
    else:
        template_vars['xccdf_variable'] = args.variable

    save_rule(rule_yaml_path, rule_yaml)

    return 0


def createTestProfile(rule):
    # create a solo profile for rule
    with open(PROFILE_PATH, 'w') as f:
        f.write(PROFILE_TEMPLATE.format(RULE_NAME=rule))


@needs_oc
@needs_working_cluster
def clusterTestFunc(args):

    print('* Testing rule %s in-cluster' % args.rule)

    findout = subprocess.getoutput(
        "find %s -name '%s' -type d" % (OCP_RULE_DIR, args.rule))
    if findout == "":
        print('ERROR: no rule for %s, run "create" first' % args.rule)
        return 1

    if not args.skip_deploy:
        subprocess.run("utils/deploy_compliance_operator.sh")

    if not args.skip_build:
        createTestProfile(args.rule)
        print('* Pushing image build to cluster')
        # execute the build_ds_container script
        buildp = subprocess.run(
            ['utils/build_ds_container.py', '-P', 'ocp4', 'rhcos4'])
        if buildp.returncode != 0:
            try:
                os.remove(PROFILE_PATH)
            except OSError:
                pass
            return 1

    ret_code, _ = subprocess.getstatusoutput(
        'oc delete -n {NAMESPACE} compliancescans/test'.format(NAMESPACE=args.namespace))
    if ret_code == 0:
        # if previous compliancescans were actually deleted, wait a bit to allow resources to clean up.
        print('* Waiting for cleanup from a previous test run')
        time.sleep(20)

    # create a single-rule scan
    print("* Running scan with rule '%s'" % args.rule)
    profile = 'xccdf_org.ssgproject.content_profile_test'
    apply_cmd = ['oc', 'apply', '-f', '-']
    with subprocess.Popen(apply_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
        _, err = proc.communicate(
            input=TEST_SCAN_TEMPLATE.format(PROFILE=profile, TYPE=args.scantype, NAMESPACE=args.namespace).encode())
        if proc.returncode != 0:
            print('Error applying scan object: %s' % err)
            try:
                os.remove(PROFILE_PATH)
            except OSError:
                pass
            return 1

    # poll for the DONE result
    timeout = time.time() + 120   # A couple of minutes is generous for the platform scan.
    scan_result = None
    while True:
        ret_code, output = subprocess.getstatusoutput(
            'oc get -n {NAMESPACE} compliancescans/test -o template="{{{{.status.phase}}}} {{{{.status.result}}}}"'.format(NAMESPACE=args.namespace))
        if output is not None:
            print('> Output from last phase check: %s' % output)
        if output.startswith('DONE'):
            scan_result = output[5:]
            break
        if time.time() >= timeout:
            break
        time.sleep(2)

    if scan_result is None:
        print('ERROR: Timeout waiting for scan to finish')
        return 1

    print("* The result is '%s'" % scan_result)
    return 0


def testFunc(args):
    if which('podman') is None:
        print('podman is required')
        return 1

    print('testing rule %s locally' % args.rule)

    if not args.skip_build:
        createTestProfile(args.rule)
        ret_code, out = subprocess.getstatusoutput('./build_product --datastream-only ocp4')
        if ret_code != 0:
            print('build failed: %s' % out)
            return 1

    # mock a passing result for the implicit ocp4 version check
    version_dir = args.objectdir + '/apis/config.openshift.io/v1/clusteroperators'
    mock_version_file = os.path.join(version_dir, 'openshift-apiserver')
    if not os.path.exists(mock_version_file):
        mkdir_p(version_dir)
        with open(mock_version_file, 'w') as f:
            f.write(MOCK_VERSION)

    oscap_cmd_opts = OSCAP_CMD_TEMPLATE % (args.verbosity)
    pod_cmd = 'podman run -it --security-opt label=disable -v "%s:/content" -v "%s:/kubernetes-api-resources" %s %s' % (args.contentdir,
                                                                                                                        args.objectdir, OSCAP_TEST_IMAGE, oscap_cmd_opts)
    print(subprocess.getoutput(pod_cmd))


def main():
    parser = argparse.ArgumentParser(
        prog="add_kubernetes_rule.py",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent(PROG_DESC))
    subparser = parser.add_subparsers(
        dest='subcommand', title='subcommands', help='pick one')
    create_parser = subparser.add_parser(
        'create', help='Bootstrap the XML and YML files under %s for a new check.' % OCP_RULE_DIR)

    common_rule_args = argparse.ArgumentParser(add_help=False)
    common_rule_args.add_argument(
        '--rule', required=True, help='The name of the rule to create. Required.')
    common_rule_args.add_argument(
        '--group', default="", help='The group directory of the rule to create.')
    common_rule_args.add_argument(
        '--name', help='The name of the Kubernetes object to check.')
    common_rule_args.add_argument(
        '--title', help='A short description of the check.')
    common_rule_args.add_argument(
        '--description', help='A human-readable description of the provided matching criteria.')
    common_rule_args.add_argument(
        '--severity', default="unknown", help='the severity of the rule.')
    common_rule_args.add_argument(
        '--identifiers', default="TBD", help='an identifier for the rule (CCE number)')
    common_rule_args.add_argument(
        '--jqfilter', default="", help='A JQ filter to select the data passed down for OVAL evaluation.')

    type_parser = create_parser.add_subparsers(dest='rule types', title='Creates a rule', help='Types of rules')
    platform_parser = type_parser.add_parser('platform', help='Creates a Platform rule',  parents=[common_rule_args])
    platform_parser.add_argument('--yamlpath',
                               help='The yaml-path of the element to match against.')
    value_or_variable = platform_parser.add_mutually_exclusive_group()
    value_or_variable.add_argument(
        '--match', help='A string value or regex providing the matching criteria. One of "match" or "variable" are required')
    value_or_variable.add_argument(
        '--variable', help='A string name of the XCCDF variable to with the value to check for. Mutually exclusive with "match" option')
    platform_parser.add_argument(
        '--namespace', help='The namespace of the Kubernetes object (optional for cluster-scoped objects)', default=None)
    platform_parser.add_argument(
        '--all-namespaces', action="store_true", help='The namespace of the Kubernetes object (optional for cluster-scoped objects)',
        default=False)
    platform_parser.add_argument(
        '--type', required=True, help='The type of Kubernetes object, e.g., configmap. Required.')
    platform_parser.add_argument(
        '--url', help='The direct api path (metadata.selfLink) of the object, which overrides --type --name and --namespace options.')
    platform_parser.add_argument(
        '--regex', default=False, action="store_true", help='treat the --match value as a regex')
    platform_parser.add_argument(
        '--match-entity', help='the entity_check value to apply, i.e., "all", "at least one", "none exist"')
    platform_parser.add_argument(
        '--check-existence', help='check_existence` value for the `yamlfilecontent_test`.')
    platform_parser.add_argument(
        '--negate', default=False, action="store_true", help='negate the given matching criteria (does NOT match). Default is false.')
    platform_parser.set_defaults(func=createPlatformRuleFunc)

    node_parser = type_parser.add_parser('node', help='Creates a Node rule',  parents=[common_rule_args])
    node_parser.add_argument(
        '--template',  help='The tempate to use in a Node rule')
    node_parser.add_argument(
        '--template-vars',  help='The inputs for the template, coma separated')
    node_parser.set_defaults(func=createNodeRuleFunc)

    cluster_test_parser = subparser.add_parser(
        'cluster-test', help='Test a rule on a running OCP cluster using the compliance-operator.')
    cluster_test_parser.add_argument(
        '--rule', required=True, help='The name of the rule to test. Required.')
    cluster_test_parser.add_argument(
        '--skip-deploy', default=False, action="store_true", help='Skip deploying the compliance-operator. Default is to deploy.')
    cluster_test_parser.add_argument(
        '--skip-build', default=False, action="store_true", help='Skip building and pushing the data stream. Default is true.')
    cluster_test_parser.add_argument(
        '--scan-type', help='Type of scan to execute.', dest="scantype",
        default="Platform",
        choices=["Node", "Platform"])
    cluster_test_parser.add_argument(
        '--namespace', help='Namespace where compliance operator is installed. Default is "openshift-compliance".', dest="namespace", default="openshift-compliance"
    )
    cluster_test_parser.set_defaults(func=clusterTestFunc)

    test_parser = subparser.add_parser(
        'test', help='Test a rule locally against a directory of mocked object files using podman and an oscap container.')
    test_parser.add_argument('--rule', required=True,
                             help='The name of the rule to test.')
    test_parser.add_argument(
        '--contentdir', default="./build", help='The path to the directory containing the data stream')
    test_parser.add_argument(
        '--skip-build', default=False, action="store_true", help='Skip building the data stream. Default is false.')
    test_parser.add_argument('--objectdir', default="/tmp",
                             help='The path to a directory structure of yaml objects to test against.')
    test_parser.add_argument('--verbosity', default="INFO",
                             choices=['INFO', 'DEVEL'],
                             help='How verbose should OpenScap be')
    test_parser.set_defaults(func=testFunc)

    args = parser.parse_args()

    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())