File: ansible.py

package info (click to toggle)
scap-security-guide 0.1.78-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 114,600 kB
  • sloc: xml: 245,305; sh: 84,381; python: 33,093; makefile: 27
file content (206 lines) | stat: -rw-r--r-- 6,636 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""
Common functions for processing Ansible in SSG
"""

from __future__ import absolute_import
from __future__ import print_function

import collections
import copy
import re

from .constants import ansible_version_requirement_pre_task_name
from .constants import min_ansible_version
from . import yaml


def add_minimum_version(ansible_src):
    """
    Adds a minimum Ansible version requirement to an Ansible script.

    This function inserts a pre_task into the provided Ansible script to assert
    that the Ansible version is greater than or equal to a specified minimum version.
    If the script already contains a pre_task or the version check, it will return
    the original script. If a pre_task exists but does not contain the version check,
    it raises a ValueError.

    Args:
        ansible_src (str): The source code of the Ansible script.

    Returns:
        str: The modified Ansible script with the minimum version requirement added.

    Raises:
        ValueError: If a pre_task already exists in the Ansible script but does not
                    contain the version check.
    """
    pre_task = (""" - hosts: all
   pre_tasks:
     - name: %s
       assert:
         that: "ansible_version.full is version_compare('%s', '>=')"
         msg: >
           "You must update Ansible to at least version %s to use this role."
          """ % (ansible_version_requirement_pre_task_name,
                 min_ansible_version, min_ansible_version))

    if ' - hosts: all' not in ansible_src:
        return ansible_src

    if 'pre_task' in ansible_src:
        if 'ansible_version.full is version_compare' in ansible_src:
            return ansible_src

        raise ValueError(
            "A pre_task already exists in ansible_src; failing to process: %s" %
            ansible_src)

    return ansible_src.replace(" - hosts: all", pre_task, 1)


def remove_too_many_blank_lines(ansible_src):
    """
    Condenses three or more consecutive empty lines into two empty lines.

    Args:
        ansible_src (str): The source string from an Ansible file.

    Returns:
        str: The modified string with excessive blank lines reduced.
    """
    return re.sub(r'\n{4,}', '\n\n\n', ansible_src, 0, flags=re.M)


def remove_trailing_whitespace(ansible_src):
    """
    Remove trailing whitespace from each line in the given Ansible source string.

    Args:
        ansible_src (str): The Ansible source code as a string.

    Returns:
        str: The Ansible source code with trailing whitespace removed from each line.
    """

    return re.sub(r'[ \t]+$', '', ansible_src, 0, flags=re.M)


package_facts_task = collections.OrderedDict([
    ('name', 'Gather the package facts'),
    ('ansible.builtin.package_facts', {'manager': 'auto'}),
    ('tags', ['always'])
])
service_facts_task = collections.OrderedDict([
    ('name', 'Gather the service facts'),
    ('ansible.builtin.service_facts', None),
    ('tags', ['always'])
])


def task_is(task, names):
    """
    Check if the task is one of the given names.

    Args:
        task (dict): The task to check.
        names (list): List of task names to check against.

    Returns:
        bool: True if the task contains any of the given names as keys, False otherwise.
    """
    for name in names:
        if name in task:
            return True
    return False


class AnsibleSnippetsProcessor:
    """
    Processes Ansible snippets to optimize package management tasks.

    This class processes a collection of Ansible snippets by:
    - Collecting package-related tasks for batch processing
    - Skipping redundant package_facts tasks
    - Handling block tasks recursively
    - Preserving other tasks as-is
    """

    def __init__(self, all_snippets):
        """
        Initialize the processor with a collection of snippets.

        Args:
            all_snippets (list): List of Ansible snippet strings to process.
        """
        self.all_snippets = all_snippets
        self.package_tasks = []
        self.service_tasks = []
        self.other_tasks = []

    def _process_task(self, task):
        """
        Process a single task, determining how to handle it.

        Args:
            task (dict): The task to process.

        Returns:
            dict or None: The processed task, or None if the task should be skipped.
        """

        if task_is(task, ["block"]):
            # Process block tasks recursively
            new_block = []
            for subtask in task["block"]:
                if self._process_task(subtask) is not None:
                    new_block.append(subtask)
            task["block"] = new_block
            if "special_service_block" in task.get("tags", []):
                # Collect service tasks to be processed later
                self.service_tasks.append(task)
                return None
            return task if new_block else None
        if task_is(task, ["ansible.builtin.package_facts", "package_facts"]):
            # Skip package_facts tasks because they will be replaced by
            # a single package_facts task that will be added later
            return None
        if task_is(task, ["ansible.builtin.service_facts", "service_facts"]):
            # Skip service_facts tasks because they will be replaced by
            # a single service_facts task that will be added later
            return None
        if task_is(task, ["ansible.builtin.package", "package"]):
            # Collect package tasks to be processed later
            self.package_tasks.append(task)
            return None
        return task

    def _process_snippet(self, snippet):
        """
        Process a single snippet, extracting tasks from it.

        Args:
            snippet (str): The YAML snippet string to process.
        """
        tasks = yaml.ordered_load(snippet)
        for task in tasks:
            if self._process_task(task) is not None:
                self.other_tasks.append(task)

    def process_snippets(self):
        """
        Process all snippets provided during initialization.
        """
        for snippet in self.all_snippets:
            self._process_snippet(snippet)

    def get_ansible_tasks(self):
        """
        Get the final list of processed Ansible tasks.

        Package facts tasks are added at the beginning and end of package tasks,
        then combined with other tasks.

        Returns:
            list: Combined list of all processed tasks.
        """
        return [package_facts_task, *self.package_tasks, copy.deepcopy(package_facts_task), *self.service_tasks, service_facts_task, *self.other_tasks]