1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
import logging
import multiprocessing
import os
import re
import time
from collections import namedtuple
from contextlib import contextmanager
from functools import lru_cache
from itertools import chain
from pathlib import Path
from typing import Set
import pytest
from cloudinit.subp import subp
from tests.integration_tests.instances import IntegrationInstance
log = logging.getLogger("integration_testing")
key_pair = namedtuple("key_pair", "public_key private_key")
ASSETS_DIR = Path("tests/integration_tests/assets")
KEY_PATH = ASSETS_DIR / "keys"
def verify_ordered_items_in_text(to_verify: list, text: str):
"""Assert all items in list appear in order in text.
Examples:
verify_ordered_items_in_text(['a', '1'], 'ab1') # passes
verify_ordered_items_in_text(['1', 'a'], 'ab1') # raises AssertionError
"""
index = 0
for item in to_verify:
try:
matched = re.search(item, text[index:])
except re.error:
matched = re.search(re.escape(item), text[index:])
assert matched, "Expected item not found: '{}'".format(item)
index = matched.start()
def verify_clean_log(log: str, ignore_deprecations: bool = True):
"""Assert no unexpected tracebacks or warnings in logs"""
if ignore_deprecations:
is_deprecated = re.compile("deprecat", flags=re.IGNORECASE)
log_lines = log.split("\n")
log_lines = list(
filter(lambda line: not is_deprecated.search(line), log_lines)
)
log = "\n".join(log_lines)
error_logs = re.findall("CRITICAL.*", log) + re.findall("ERROR.*", log)
if error_logs:
raise AssertionError(
"Found unexpected errors: %s" % "\n".join(error_logs)
)
warning_count = log.count("WARN")
expected_warnings = 0
traceback_count = log.count("Traceback")
expected_tracebacks = 0
warning_texts = [
# Consistently on all Azure launches:
# azure.py[WARNING]: No lease found; using default endpoint
"No lease found; using default endpoint",
# Ubuntu lxd storage
"thinpool by default on Ubuntu due to LP #1982780",
"WARNING]: Could not match supplied host pattern, ignoring:",
]
traceback_texts = []
if "oracle" in log:
# LP: #1842752
lease_exists_text = "Stderr: RTNETLINK answers: File exists"
warning_texts.append(lease_exists_text)
traceback_texts.append(lease_exists_text)
# LP: #1833446
fetch_error_text = (
"UrlError: 404 Client Error: Not Found for url: "
"http://169.254.169.254/latest/meta-data/"
)
warning_texts.append(fetch_error_text)
traceback_texts.append(fetch_error_text)
# Oracle has a file in /etc/cloud/cloud.cfg.d that contains
# users:
# - default
# - name: opc
# ssh_redirect_user: true
# This can trigger a warning about opc having no public key
warning_texts.append(
"Unable to disable SSH logins for opc given ssh_redirect_user"
)
for warning_text in warning_texts:
expected_warnings += log.count(warning_text)
for traceback_text in traceback_texts:
expected_tracebacks += log.count(traceback_text)
assert warning_count == expected_warnings, (
f"Unexpected warning count != {expected_warnings}. Found: "
f"{re.findall('WARNING.*', log)}"
)
assert traceback_count == expected_tracebacks
def get_inactive_modules(log: str) -> Set[str]:
matches = re.findall(
r"Skipping modules '(.*)' because no applicable config is provided.",
log,
)
return set(
map(
lambda module: module.strip(),
chain(*map(lambda match: match.split(","), matches)),
)
)
@contextmanager
def emit_dots_on_travis():
"""emit a dot every 60 seconds if running on Travis.
Travis will kill jobs that don't emit output for a certain amount of time.
This context manager spins up a background process which will emit a dot to
stdout every 60 seconds to avoid being killed.
It should be wrapped selectively around operations that are known to take a
long time.
"""
if os.environ.get("TRAVIS") != "true":
# If we aren't on Travis, don't do anything.
yield
return
def emit_dots():
while True:
log.info(".")
time.sleep(60)
dot_process = multiprocessing.Process(target=emit_dots)
dot_process.start()
try:
yield
finally:
dot_process.terminate()
def get_test_rsa_keypair(key_name: str = "test1") -> key_pair:
private_key_path = KEY_PATH / "id_rsa.{}".format(key_name)
public_key_path = KEY_PATH / "id_rsa.{}.pub".format(key_name)
with public_key_path.open() as public_file:
public_key = public_file.read()
with private_key_path.open() as private_file:
private_key = private_file.read()
return key_pair(public_key, private_key)
def get_console_log(client: IntegrationInstance):
try:
console_log = client.instance.console_log()
except NotImplementedError:
pytest.skip("NotImplementedError when requesting console log")
if console_log.lower().startswith("no console output"):
pytest.fail("no console output")
return console_log
@lru_cache()
def lxd_has_nocloud(client: IntegrationInstance) -> bool:
# Bionic or Focal may be detected as NoCloud rather than LXD
lxd_image_metadata = subp(
["lxc", "config", "metadata", "show", client.instance.name]
)
return "/var/lib/cloud/seed/nocloud" in lxd_image_metadata.stdout
|