1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
# -*- coding: utf-8 -*-
"""Unit tests for resultdump."""
import datetime
import logging
from sbws.lib.relaylist import Relay
from sbws.lib.resultdump import (
ResultError,
ResultErrorStream,
ResultSuccess,
load_result_file,
trim_results_ip_changed,
)
def test_trim_results_ip_changed_defaults(resultdict_ip_not_changed):
results_dict = trim_results_ip_changed(resultdict_ip_not_changed)
assert resultdict_ip_not_changed == results_dict
def test_trim_results_ip_changed_on_changed_ipv4_changed(
resultdict_ip_changed, resultdict_ip_changed_trimmed
):
results_dict = trim_results_ip_changed(
resultdict_ip_changed, on_changed_ipv4=True
)
assert resultdict_ip_changed_trimmed == results_dict
def test_trim_results_ip_changed_on_changed_ipv4_no_changed(
resultdict_ip_not_changed,
):
results_dict = trim_results_ip_changed(
resultdict_ip_not_changed, on_changed_ipv4=True
)
assert resultdict_ip_not_changed == results_dict
def test_trim_results_ip_changed_on_changed_ipv6(
caplog, resultdict_ip_not_changed
):
results_dict = trim_results_ip_changed(
resultdict_ip_not_changed, on_changed_ipv6=True
)
assert resultdict_ip_not_changed == results_dict
# There might be other logs from other threads.
with caplog.at_level(logging.WARNING):
assert (
"Resetting bandwidth results when IPv6 changes, "
"is not yet implemented.\n" in caplog.text
)
def test_resultdump(
rd, args, conf_results, controller, router_status, server_descriptor
):
from sbws import settings
relay = Relay(
router_status.fingerprint,
controller,
ns=router_status,
desc=server_descriptor,
)
relay.increment_relay_recent_priority_list()
relay.increment_relay_recent_measurement_attempt()
r = ResultSuccess(
[],
2000,
relay,
["A", "B"],
"http://localhost/bw",
"scanner_nick",
)
# Storing the result with `rd.queue.put` will not store the result to disk
# because the thread is not spawned with pytest.
rd.store_result(r)
results = rd.results_for_relay(relay)
# It has stored the result
assert 1 == len(results)
# The result has the correct attribute
assert 1 == len(results[0].relay_recent_priority_list)
# Store a second result for the same relay
r = ResultError(
relay,
["A", "B"],
"http://localhost/bw",
"scanner_nick",
)
rd.store_result(r)
assert 2 == len(results)
assert 1 == len(results[1].relay_recent_priority_list)
settings.set_end_event()
def test_load(datadir):
results = load_result_file(str(datadir.join("results.txt")))
results = [v for values in results.values() for v in values]
r1 = results[1]
assert isinstance(r1, ResultSuccess)
assert isinstance(
r1.relay_recent_measurement_attempt[0], datetime.datetime
)
assert 2 == len(r1.relay_recent_measurement_attempt)
assert 3 == len(r1.relay_recent_priority_list)
assert 3 == len(r1.relay_in_recent_consensus)
r2 = results[2]
assert isinstance(r2, ResultErrorStream)
assert isinstance(
r2.relay_recent_measurement_attempt[0], datetime.datetime
)
assert 2 == len(r2.relay_recent_measurement_attempt)
assert 3 == len(r2.relay_recent_priority_list)
assert 3 == len(r2.relay_in_recent_consensus)
|