File: test_ansible.py

package info (click to toggle)
scap-security-guide 0.1.78-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 114,600 kB
  • sloc: xml: 245,305; sh: 84,381; python: 33,093; makefile: 27
file content (317 lines) | stat: -rw-r--r-- 11,298 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import os
import re
import textwrap
from collections import OrderedDict

import pytest

import ssg.ansible
from ssg.constants import min_ansible_version


def strings_equal_except_whitespaces(left, right):
    return re.sub(r'\s', '', left) == re.sub(r'\s', '', right)


def test_add_minimum_version():
    good_snippet = """
    # a comment
     - hosts: all
       vars:
       tasks:
    """
    good_snippet = textwrap.dedent(good_snippet)

    bad_snippet = """
    # a comment
     - hosts: all
       pre_tasks:
         - name: something_else
           assert:
             that: "2 > 3"
             msg: "two is less than three!"
       vars:
       tasks:
    """
    bad_snippet = textwrap.dedent(bad_snippet)

    unknown_snippet = """
    I don't think this is YAML
    """

    processed_snippet = """
    # a comment
     - hosts: all
       pre_tasks:
         - name: Verify Ansible meets SCAP-Security-Guide version requirements.
           assert:
             that: "ansible_version.full is version_compare('{min_version}', '>=')"
             msg: >
               "You must update Ansible to at least version {min_version} to use this role."

       vars:
       tasks:
    """.format(min_version=min_ansible_version)
    processed_snippet = textwrap.dedent(processed_snippet)

    output = ssg.ansible.add_minimum_version(good_snippet)
    assert strings_equal_except_whitespaces(output, processed_snippet)

    with pytest.raises(ValueError):
        ssg.ansible.add_minimum_version(bad_snippet)

    assert ssg.ansible.add_minimum_version(unknown_snippet) == unknown_snippet

    assert ssg.ansible.add_minimum_version(processed_snippet) == processed_snippet


def test_task_is():
    # Test with task containing one of the names
    task_with_package = {"ansible.builtin.package": {"name": "vim"}}
    assert ssg.ansible.task_is(task_with_package, ["package", "ansible.builtin.package"])

    # Test with task containing different name
    assert ssg.ansible.task_is(task_with_package, ["ansible.builtin.package"])

    # Test with task not containing any of the names
    task_with_copy = {"copy": {"src": "file", "dest": "/tmp"}}
    assert not ssg.ansible.task_is(task_with_copy, ["package", "ansible.builtin.package"])

    # Test with empty names list
    assert not ssg.ansible.task_is(task_with_package, [])

    # Test with empty task
    assert not ssg.ansible.task_is({}, ["package"])


class TestAnsibleSnippetsProcessor:

    def test_initialization(self):
        """Test processor initialization."""
        snippets = ["snippet1", "snippet2"]
        processor = ssg.ansible.AnsibleSnippetsProcessor(snippets)

        assert processor.all_snippets == snippets
        assert processor.package_tasks == []
        assert processor.other_tasks == []

    def test_process_task_package(self):
        """Test processing of package tasks."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        package_task = {"ansible.builtin.package": {"name": "vim"}}
        result = processor._process_task(package_task)

        assert result is None  # Package tasks should be skipped from immediate processing
        assert package_task in processor.package_tasks

    def test_process_task_package_facts(self):
        """Test processing of package_facts tasks."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        package_facts_task = {"ansible.builtin.package_facts": {"manager": "auto"}}
        result = processor._process_task(package_facts_task)

        assert result is None  # package_facts tasks should be skipped
        assert len(processor.package_tasks) == 0  # Should not be added to package_tasks

    def test_process_task_package_facts_short_name(self):
        """Test processing of package_facts tasks with short name."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        package_facts_task = {"package_facts": {"manager": "auto"}}
        result = processor._process_task(package_facts_task)

        assert result is None  # package_facts tasks should be skipped

    def test_process_task_service_facts(self):
        """Test processing of service_facts tasks."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        service_facts_task = {"ansible.builtin.service_facts": None}
        result = processor._process_task(service_facts_task)

        assert result is None  # service_facts tasks should be skipped
        assert len(processor.other_tasks) == 0  # Should not be added to other_tasks

    def test_process_task_other(self):
        """Test processing of other tasks."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        copy_task = {"copy": {"src": "file", "dest": "/tmp"}}
        result = processor._process_task(copy_task)

        assert result == copy_task  # Other tasks should be returned as-is
        assert len(processor.package_tasks) == 0
        assert len(processor.other_tasks) == 0  # _process_task doesn't add to other_tasks

    def test_process_task_block_empty(self):
        """Test processing of empty block tasks."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        block_task = {"block": []}
        result = processor._process_task(block_task)

        assert result is None  # Empty blocks should be filtered out

    def test_process_task_block_with_content(self):
        """Test processing of block tasks with content."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        block_task = {
            "block": [
                {"copy": {"src": "file", "dest": "/tmp"}},
                {"ansible.builtin.package": {"name": "vim"}},
                {"ansible.builtin.package_facts": {"manager": "auto"}}
            ]
        }
        result = processor._process_task(block_task)

        # Should return block with only the copy task (package tasks are collected, package_facts skipped)
        assert result is not None
        assert "block" in result
        assert len(result["block"]) == 1
        assert result["block"][0] == {"copy": {"src": "file", "dest": "/tmp"}}
        assert len(processor.package_tasks) == 1

    def test_process_task_block_all_filtered(self):
        """Test processing of block tasks where all subtasks are filtered."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        block_task = {
            "block": [
                {"ansible.builtin.package": {"name": "vim"}},
                {"ansible.builtin.package_facts": {"manager": "auto"}}
            ]
        }
        result = processor._process_task(block_task)

        assert result is None  # Block should be None if all subtasks are filtered
        assert len(processor.package_tasks) == 1

    def test_process_snippet(self):
        """Test processing of a complete snippet."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])

        snippet = """
        - name: Install vim
          ansible.builtin.package:
            name: vim
        - name: Gather facts
          ansible.builtin.package_facts:
            manager: auto
        - name: Copy file
          copy:
            src: file
            dest: /tmp
        """

        processor._process_snippet(snippet)

        assert len(processor.package_tasks) == 1
        assert len(processor.other_tasks) == 1
        assert processor.other_tasks[0]["copy"]["src"] == "file"

    def test_process_snippets(self):
        """Test processing multiple snippets."""
        snippet1 = """
        - name: Install vim
          ansible.builtin.package:
            name: vim
        """

        snippet2 = """
        - name: Copy file
          copy:
            src: file
            dest: /tmp
        """

        processor = ssg.ansible.AnsibleSnippetsProcessor([snippet1, snippet2])
        processor.process_snippets()

        assert len(processor.package_tasks) == 1
        assert len(processor.other_tasks) == 1

    def test_get_ansible_tasks(self):
        """Test getting the final ansible tasks."""
        snippet = """
        - name: Install vim
          ansible.builtin.package:
            name: vim
        - name: Copy file
          copy:
            src: file
            dest: /tmp
        """

        processor = ssg.ansible.AnsibleSnippetsProcessor([snippet])
        processor.process_snippets()

        tasks = processor.get_ansible_tasks()

        # Should have: package_facts_task + package_task + package_facts_task + service_facts_task + other_task
        assert len(tasks) == 5
        assert tasks[0] == ssg.ansible.package_facts_task  # First package facts task
        assert "ansible.builtin.package" in tasks[1]  # Package task
        assert tasks[2] == ssg.ansible.package_facts_task  # Second package facts task
        assert "ansible.builtin.service_facts" in tasks[3]  # Service facts task
        assert "copy" in tasks[4]  # Other task

    def test_get_ansible_tasks_no_package_tasks(self):
        """Test getting ansible tasks when there are no package tasks."""
        snippet = """
        - name: Copy file
          copy:
            src: file
            dest: /tmp
        """

        processor = ssg.ansible.AnsibleSnippetsProcessor([snippet])
        processor.process_snippets()

        tasks = processor.get_ansible_tasks()

        # Should have: package_facts_task + package_facts_task + service_facts_task + other_task
        assert len(tasks) == 4
        assert tasks[0] == ssg.ansible.package_facts_task
        assert tasks[1] == ssg.ansible.package_facts_task
        assert tasks[2] == ssg.ansible.service_facts_task
        assert "copy" in tasks[3]  # Other task

    def test_empty_snippets(self):
        """Test processing empty snippets list."""
        processor = ssg.ansible.AnsibleSnippetsProcessor([])
        processor.process_snippets()

        tasks = processor.get_ansible_tasks()

        # Should have only the two package facts tasks and service facts task
        assert len(tasks) == 3
        assert tasks[0] == ssg.ansible.package_facts_task
        assert tasks[1] == ssg.ansible.package_facts_task
        assert tasks[2] == ssg.ansible.service_facts_task

    def test_malformed_snippet(self):
        """Test handling of malformed YAML snippets."""
        processor = ssg.ansible.AnsibleSnippetsProcessor(["invalid: yaml: content:"])

        # This should raise SystemExit when trying to parse invalid YAML
        with pytest.raises(SystemExit):
            processor.process_snippets()

    def test_process_task_special_service_block(self):
      """Test processing of special service block tasks."""
      processor = ssg.ansible.AnsibleSnippetsProcessor([])

      service_task = {
        "name": "Enable service sshd",
        "block": [{"name":"Gather package facts", "ansible.builtin.package_facts": {"manager": "auto"}}, {"name": "Enable service sshd", "ansible.builtin.systemd": {"name": "sshd"}}],
        "tags": ["special_service_block"]
      }
      result = processor._process_task(service_task)

      assert result is None  # Service tasks should be skipped
      assert len(processor.service_tasks) == 1
      assert processor.service_tasks[0] == service_task
      assert "ansible.builtin.package_facts" not in processor.service_tasks[0]["block"][0]