File: reaper.py

package info (click to toggle)
cloud-init 25.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,412 kB
  • sloc: python: 135,894; sh: 3,883; makefile: 141; javascript: 30; xml: 22
file content (205 lines) | stat: -rw-r--r-- 7,821 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""Defines _Reaper, which destroys instances in a background thread

This class is intended to be a singleton which is instantiated on session setup
and cleaned on session teardown. Any instances submitted to the reaper are
destroyed. Instances that refuse to be destroyed due to external library errors
or flaky infrastructure are tracked, retried and upon test session completion
are reported to the end user as a test warning.
"""

from __future__ import annotations  # required for Python 3.8

import logging
import queue
import threading
import warnings
from typing import Final, List, Optional

from tests.integration_tests.instances import IntegrationInstance

LOG = logging.getLogger()


class Reaper:
    def __init__(self, timeout: float = 30.0):
        # self.timeout sets the amount of time to sleep before retrying
        self.timeout = timeout
        # self.wake_reaper tells the reaper to wake up.
        #
        # A lock is used for synchronization. This means that notify() will
        # block if
        # the reaper is currently awake.
        #
        # It is set by:
        # - signal interrupt indicating cleanup
        # - session completion indicating cleanup
        # - reaped instance indicating work to be done
        self.wake_reaper: Final[threading.Condition] = threading.Condition()

        # self.exit_reaper tells the reaper loop to tear down, called once at
        # end of tests
        self.exit_reaper: Final[threading.Event] = threading.Event()

        # List of instances which temporarily escaped death
        # The primary porpose of the reaper is to coax these instance towards
        # eventual demise and report their insubordination on shutdown.
        self.undead_ledger: Final[List[IntegrationInstance]] = []

        # Queue of newly reaped instances
        self.reaped_instances: Final[queue.Queue[IntegrationInstance]] = (
            queue.Queue()
        )

        # Thread object, handle used to re-join the thread
        self.reaper_thread: Optional[threading.Thread] = None

        # Count the dead
        self.counter = 0

    def reap(self, instance: IntegrationInstance):
        """reap() submits an instance to the reaper thread.

        An instance that is passed to the reaper must not be used again. It may
        not be dead yet, but it has no place among the living.
        """
        LOG.info("Reaper: receiving %s", instance.instance.id)

        self.reaped_instances.put(instance)
        with self.wake_reaper:
            self.wake_reaper.notify()
            LOG.info("Reaper: awakened to reap")

    def start(self):
        """Spawn the reaper background thread."""
        LOG.info("Reaper: starting")
        self.reaper_thread = threading.Thread(
            target=self._reaper_loop, name="reaper"
        )
        self.reaper_thread.start()

    def stop(self):
        """Stop the reaper background thread and wait for completion."""
        LOG.info("Reaper: stopping")
        self.exit_reaper.set()
        with self.wake_reaper:
            self.wake_reaper.notify()
            LOG.info("Reaper: awakened to reap")
        if self.reaper_thread and self.reaper_thread.is_alive():
            self.reaper_thread.join()
        LOG.info("Reaper: stopped")

    def _destroy(self, instance: IntegrationInstance) -> bool:
        """destroy() destroys an instance and returns True on success."""
        try:
            LOG.info("Reaper: destroying %s", instance.instance.id)
            instance.destroy()
            self.counter += 1
            return True
        except Exception as e:
            LOG.warning(
                "Error while tearing down instance %s: %s ", instance, e
            )
            return False

    def _reaper_loop(self) -> None:
        """reaper_loop() manages all instances that have been reaped

        tasks:
        - destroy newly reaped instances
        - manage a ledger undead instances
        - periodically attempt to kill undead instances
        - die when instructed to
        - ensure that every reaped instance is destroyed at least once before
          reaper dies
        """
        LOG.info("Reaper: exalted in life, to assist others in death")
        while True:
            # nap until woken or timeout
            with self.wake_reaper:
                self.wake_reaper.wait(timeout=self.timeout)
            if self._do_reap():
                break
        LOG.info("Reaper: exited")

    def _do_reap(self) -> bool:
        """_do_reap does a single pass of the reaper loop

        return True if the loop should exit
        """

        new_undead_instances: List[IntegrationInstance] = []

        # first destroy all newly reaped instances
        while not self.reaped_instances.empty():
            instance = self.reaped_instances.get_nowait()
            instance_id = instance.instance.id
            success = self._destroy(instance)
            if not success:
                LOG.warning(
                    "Reaper: failed to destroy %s",
                    instance.instance.id,
                )
                # failure to delete, add to the ledger
                new_undead_instances.append(instance)
            else:
                LOG.info("Reaper: destroyed %s", instance_id)

        # every instance has tried at least once and the reaper has been
        # instructed to tear down - so do it
        if self.exit_reaper.is_set():
            if not self.reaped_instances.empty():
                # race: an instance was added to the queue after iteration
                # completed. Destroy the latest instance.
                self._update_undead_ledger(new_undead_instances)
                return False
            self._update_undead_ledger(new_undead_instances)
            LOG.info("Reaper: exiting")
            if self.undead_ledger:
                # undead instances exist - unclean teardown
                LOG.info(
                    "Reaper: the faults of incompetent abilities will be "
                    "consigned to oblivion, as myself must soon be to the "
                    "mansions of rest."
                )
                warnings.warn(f"Test instance(s) leaked: {self.undead_ledger}")
            else:
                LOG.info("Reaper: duties complete, my turn to rest")
            LOG.info(
                "Reaper: reaped %s/%s instances",
                self.counter,
                self.counter + len(self.undead_ledger),
            )
            return True

        # attempt to destroy all instances which previously refused to destroy
        for instance in self.undead_ledger:
            if self.exit_reaper.is_set() and self.reaped_instances.empty():
                # don't retry instances if the exit_reaper Event is set
                break
            instance_id = instance.instance.id
            if self._destroy(instance):
                self.undead_ledger.remove(instance)
                LOG.info("Reaper: destroyed %s (undead)", instance_id)

        self._update_undead_ledger(new_undead_instances)
        return False

    def _update_undead_ledger(
        self, new_undead_instances: List[IntegrationInstance]
    ):
        """update the ledger with newly undead instances"""
        if new_undead_instances:
            if self.undead_ledger:
                LOG.info(
                    "Reaper: instance(s) not ready to die %s, will now join "
                    "the ranks of the undead: %s",
                    new_undead_instances,
                    self.undead_ledger,
                )
            else:
                LOG.info(
                    "Reaper: instance(s) not ready to die %s",
                    new_undead_instances,
                )
        self.undead_ledger.extend(new_undead_instances)
        return False