1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
import logging
import os
import random
import shutil
import subprocess
import sys
import tempfile
import time
from ipaddress import ip_address
import dpkt
import pytest
import deckard
from contrib.namespaces import LinuxNamespace
from networking import InterfaceManager
def set_coverage_env(path, qmin):
"""Sets up enviroment variables so code coverage utility can work."""
if os.environ.get("COVERAGE"):
exports = subprocess.check_output([os.environ["COVERAGE_ENV_SCRIPT"],
os.environ["DAEMONSRCDIR"],
os.environ["COVERAGE_STATSDIR"],
path + "-qmin-" + str(qmin)]).decode()
for export in exports.split():
key, value = export.split("=", 1)
value = value.strip('"')
os.environ[key] = value
def check_platform():
if sys.platform == 'windows':
pytest.exit('Not supported at all on Windows')
# Suppress extensive Augeas logging
logging.getLogger("augeas").setLevel(logging.ERROR)
check_platform()
class DeckardUnderLoadError(Exception):
pass
class TCPDump:
"""This context manager captures a PCAP file and than checks it for obvious errors."""
DUMPCAP_CMD = ["dumpcap", "-i", "any", "-q", "-P", "-w"]
def __init__(self, config):
self.config = config
self.config["tmpdir"] = self.get_tmpdir()
self.tcpdump = None
self.config["pcap"] = os.path.join(self.config["tmpdir"], "deckard.pcap")
def __enter__(self):
cmd = self.DUMPCAP_CMD.copy()
cmd.append(self.config["pcap"])
# pylint: disable=consider-using-with
self.tcpdump = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
def __exit__(self, _, exc_value, __):
# Wait for the PCAP to be finalized
while not os.path.exists(self.config["pcap"]):
time.sleep(1)
self.tcpdump.terminate()
self.tcpdump.wait()
self.check_for_unknown_server()
if exc_value is None:
if self.config.get('noclean') or "DECKARD_NOCLEAN" in os.environ:
# Do not clear files if the server crashed (for analysis)
logging.getLogger('deckard.hint').info(
'test working directory %s', self.config["tmpdir"])
else:
shutil.rmtree(self.config["tmpdir"])
else:
if isinstance(exc_value, ValueError):
self.check_for_icmp()
raise
@staticmethod
def get_tmpdir():
if "DECKARD_DIR" in os.environ:
tmpdir = os.environ["DECKARD_DIR"]
if os.path.lexists(tmpdir):
raise ValueError(f'DECKARD_DIR "{tmpdir}" must not exist')
else:
tmpdir = tempfile.mkdtemp(suffix='', prefix='tmpdeckard')
return tmpdir
def check_for_icmp(self):
""" Checks Deckards's PCAP for ICMP packets """
# Deckard's responses to resolvers might be delayed due to load which
# leads the resolver to close the port and to the test failing in the
# end. We partially detect these by checking the PCAP for ICMP packets.
udp_seen = False
with open(self.config["pcap"], "rb") as f:
pcap = dpkt.pcap.Reader(f)
for _, packet in pcap:
ip = dpkt.sll.SLL(packet).data
if isinstance(ip.data, dpkt.udp.UDP):
udp_seen = True
if udp_seen:
if isinstance(ip.data, (dpkt.icmp.ICMP, dpkt.icmp6.ICMP6)):
raise DeckardUnderLoadError("Deckard is under load. "
"Other errors might be false negatives. "
"Consider retrying the job later.")
def check_for_unknown_server(self):
unknown_addresses = set()
with open(self.config["pcap"], "rb") as f:
pcap = dpkt.pcap.Reader(f)
for _, packet in pcap:
ip = dpkt.sll.SLL(packet).data
try:
if ip.p != dpkt.ip.IP_PROTO_TCP or ip.p != dpkt.ip.IP_PROTO_UDP:
continue
except AttributeError:
continue
dest = str(ip_address(ip.dst))
if dest not in self.config["if_manager"].added_addresses:
unknown_addresses.add(dest)
if unknown_addresses:
raise RuntimeError("Binary under test queried an IP address not present"
f" in scenario {unknown_addresses}")
def run_test(path, qmin, config, max_retries, retries=0):
set_coverage_env(path, qmin)
try:
with LinuxNamespace("net"):
config["if_manager"] = InterfaceManager()
with TCPDump(config):
deckard.process_file(path, qmin, config)
except deckard.DeckardUnderLoadError as e:
if retries < max_retries:
logging.error("Deckard under load. Retrying…")
# Exponential backoff
time.sleep((2 ** retries) + random.random())
run_test(path, qmin, config, max_retries, retries + 1)
else:
raise e
def test_passes_qmin_on(scenario, max_retries):
if scenario.qmin is True or scenario.qmin is None:
run_test(scenario.path, True, scenario.config, max_retries)
else:
pytest.skip("Query minimization is off in test config")
def test_passes_qmin_off(scenario, max_retries):
if scenario.qmin is False or scenario.qmin is None:
run_test(scenario.path, False, scenario.config, max_retries)
else:
pytest.skip("Query minimization is on in test config")
|