1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
# This file is part of cloud-init. See LICENSE file for license information.
import time
from contextlib import suppress
from unittest.mock import PropertyMock
import pytest
import responses
from cloudinit.reporting import flush_events
from cloudinit.reporting.events import report_start_event
from cloudinit.reporting.handlers import WebHookHandler
class TestWebHookHandler:
@pytest.fixture(autouse=True)
def setup(self, mocker):
handler = WebHookHandler(endpoint="http://localhost")
m_registered_items = mocker.patch(
"cloudinit.registry.DictRegistry.registered_items",
new_callable=PropertyMock,
)
m_registered_items.return_value = {"webhook": handler}
@responses.activate
def test_webhook_handler(self, caplog):
"""Test the happy path."""
responses.add(responses.POST, "http://localhost", status=200)
report_start_event("name", "description")
flush_events()
assert 1 == caplog.text.count(
"Read from http://localhost (200, 0b) after 1 attempts"
)
@responses.activate
def test_404(self, caplog):
"""Test failure"""
responses.add(responses.POST, "http://localhost", status=404)
report_start_event("name", "description")
flush_events()
assert 1 == caplog.text.count("Failed posting event")
@responses.activate
def test_background_processing(self, caplog):
"""Test that processing happens in background.
In the non-flush case, ensure that the event is still posted.
Since the event is posted in the background, wait while looping.
"""
responses.add(responses.POST, "http://localhost", status=200)
report_start_event("name", "description")
start_time = time.time()
while time.time() - start_time < 3:
with suppress(AssertionError):
assert (
"Read from http://localhost (200, 0b) after 1 attempts"
in caplog.text
)
break
else:
pytest.fail("Never got expected log message")
@responses.activate
@pytest.mark.parametrize(
"num_failures,expected_log_count,expected_cancel",
[(2, 2, False), (3, 3, True), (50, 3, True)],
)
def test_failures_cancel_flush(
self, caplog, num_failures, expected_log_count, expected_cancel
):
"""Test that too many failures will cancel further processing on flush.
2 messages should not cancel on flush
3 or more should cancel on flush
The number of received messages will be based on how many have
been processed before the flush was initiated.
"""
responses.add(responses.POST, "http://localhost", status=404)
for _ in range(num_failures):
report_start_event("name", "description")
flush_events()
# Force a context switch. Without this, it's possible that the
# expected log message hasn't made it to the log file yet
time.sleep(0.01)
# If we've pushed a bunch of messages, any number could have been
# processed before we get to the flush.
assert (
expected_log_count
<= caplog.text.count("Failed posting event")
<= num_failures
)
cancelled_message = (
"Multiple consecutive failures in WebHookHandler. "
"Cancelling all queued events"
)
if expected_cancel:
assert cancelled_message in caplog.text
else:
assert cancelled_message not in caplog.text
@responses.activate
def test_multiple_failures_no_flush(self, caplog):
"""Test we don't cancel posting if flush hasn't been requested.
Since processing happens in the background, wait in a loop
for all messages to be posted
"""
responses.add(responses.POST, "http://localhost", status=404)
for _ in range(10):
report_start_event("name", "description")
start_time = time.time()
while time.time() - start_time < 3:
with suppress(AssertionError):
assert 10 == caplog.text.count("Failed posting event")
break
time.sleep(0.01) # Force context switch
else:
pytest.fail(
"Expected 20 failures, only got "
f"{caplog.text.count('Failed posting event')}"
)
|