File: assert_ansible_schema.py

package info (click to toggle)
scap-security-guide 0.1.76-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 110,644 kB
  • sloc: xml: 241,883; sh: 73,777; python: 32,527; makefile: 27
file content (137 lines) | stat: -rw-r--r-- 3,833 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/python3

from __future__ import print_function

import argparse
import re
import sys

import yaml


COMMENT_KEYS = set((
    "platform",
    "reboot",
    "strategy",
    "complexity",
    "disruption",
))

TAGS = set((
    "strategy",
    "complexity",
    "disruption",
    "severity",
))


def make_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument("input", nargs="+")
    parser.add_argument("--verbose", "-v", action="store_true")
    parser.add_argument("--cce", action="store_true", help="Require CCEs to be defined as well")
    return parser


def validate_comments(fname, args):
    caught_comments = set()
    regex = re.compile(
        r"# ({keys}) = .*"
        .format(keys="|".join(COMMENT_KEYS)))
    with open(fname, "r") as lines:
        for line in lines:
            found = re.search(regex, line)
            if found:
                caught_comments.add(found.group(1))
    assert COMMENT_KEYS == caught_comments, (
        "Did not found key(s) in comments: {keys}"
        .format(keys=", ".join(COMMENT_KEYS.difference(caught_comments))))

    if args.verbose:
        print("Comment-based metadata OK")


def validate_playbook(playbook, args):
    assert "name" in playbook, "playbook doesn't have a name"
    assert "hosts" in playbook, "playbook doesn't have the hosts entry"
    assert playbook["hosts"] == "@@HOSTS@@", "playbook's hosts is not set to @@HOSTS@@"
    assert "become" in playbook, "playbook doesn't have a become key"
    assert playbook["become"], "become in the playbook is not set to a true-ish value"
    if "vars" in playbook:
        assert playbook["vars"], "there are no variables under the 'vars' key of the playbook"
    assert "tasks" in playbook, "there are no tasks in the playbook"

    first_task = playbook["tasks"][0]
    if "block" in first_task:
        first_task = first_task["block"][0]

    assert "name" in first_task, "The first task doesn't have name"
    assert "tags" in playbook, "the playbook doesn't have tags"

    if args.verbose:
        print("Basic playbook properties OK")

    caught_tags = set()
    tag_regex = re.compile(
        r".*_({keys})"
        .format(keys="|".join(TAGS)))
    cce_regex = re.compile(r"CCE-[0-9]+-[0-9]+")
    for tag in playbook["tags"]:
        assert "@" not in tag, \
            "A playbook tag {tag} contains @, which is unexpected.".format(tag=tag)

        found = re.search(tag_regex, tag)
        if found:
            caught_tags.add(found.group(1))

        if re.search(cce_regex, tag):
            caught_tags.add("CCE")

    tags_not_caught = TAGS.difference(caught_tags)
    assert not tags_not_caught, \
        "Missing tags: {stray}".format(stray=", ".join(tags_not_caught))

    if args.verbose:
        print("Playbook tags OK")

    if args.cce:
        assert "CCE" in caught_tags, "There is no CCE tag in the playbook"

        if args.verbose:
            print("Playbook CCE OK")


def validate_yaml(fname, args):
    stream = open(fname, "r")
    data = yaml.load(stream, Loader=yaml.Loader)
    for playbook in data:
        validate_playbook(playbook, args)


def validate_input(fname, args):
    if args.verbose:
        print("Analyzing {fname}".format(fname=fname))
    try:
        validate_comments(fname, args)
        validate_yaml(fname, args)
        if args.verbose:
            print("Analysis OK")
    except AssertionError as err:
        msg = (
            "Error processing {fname}: {err}"
            .format(fname=fname, err=err)
        )
        print(msg, file=sys.stderr)
        return 1
    return 0


if __name__ == "__main__":
    parser = make_parser()
    args = parser.parse_args()

    ret = 0
    for fname in args.input:
        current_ret = validate_input(fname, args)
        ret = max(current_ret, ret)
    sys.exit(ret)