1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
import time
from pytest import approx
from azure.kusto.ingest._storage_account_set import _RankedStorageAccountSet
ACCOUNT_1: str = "ACCOUNT_1"
ACCOUNT_2: str = "ACCOUNT_2"
ACCOUNT_3: str = "ACCOUNT_3"
ACCOUNT_4: str = "ACCOUNT_4"
ACCOUNT_5: str = "ACCOUNT_5"
def create_storage_account_set(time_provider=time.time) -> _RankedStorageAccountSet:
storage_account_set = _RankedStorageAccountSet(time_provider=time_provider)
storage_account_set.add_storage_account(ACCOUNT_1)
storage_account_set.add_storage_account(ACCOUNT_2)
storage_account_set.add_storage_account(ACCOUNT_3)
storage_account_set.add_storage_account(ACCOUNT_4)
storage_account_set.add_storage_account(ACCOUNT_5)
return storage_account_set
def test_check_rank_when_no_data():
# When there's no success/failure data, all accounts should have the same rank
storage_account_set = create_storage_account_set()
assert storage_account_set.get_storage_account(ACCOUNT_1).get_rank() == 1
assert storage_account_set.get_storage_account(ACCOUNT_2).get_rank() == 1
assert storage_account_set.get_storage_account(ACCOUNT_3).get_rank() == 1
assert storage_account_set.get_storage_account(ACCOUNT_4).get_rank() == 1
assert storage_account_set.get_storage_account(ACCOUNT_5).get_rank() == 1
# Same test but with shuffled accounts
ranked_accounts = storage_account_set.get_ranked_shuffled_accounts()
for account in ranked_accounts:
assert account.get_rank() == 1
pass
def test_accounts_are_shuffled():
# Create long list of accounts and verify that they are shuffled
storage_account_set = _RankedStorageAccountSet()
for i in range(100):
storage_account_set.add_storage_account(f"ACCOUNT_{i}")
ranked_accounts_1 = storage_account_set.get_ranked_shuffled_accounts()
ranked_accounts_2 = storage_account_set.get_ranked_shuffled_accounts()
# Use set to verify same accounts are in both lists
a = set(ranked_accounts_1)
b = set(ranked_accounts_2)
assert a == b
# Use != to verify that the lists are not identical in order
assert ranked_accounts_1 != ranked_accounts_2
def test_check_rank_when_all_failure():
current_time = 0
def time_provider():
return current_time
storage_account_set = create_storage_account_set(time_provider)
# Simulate data in 10 seconds passing
for current_time in range(10):
storage_account_set.add_account_result(ACCOUNT_1, False)
storage_account_set.add_account_result(ACCOUNT_2, False)
storage_account_set.add_account_result(ACCOUNT_3, False)
storage_account_set.add_account_result(ACCOUNT_4, False)
storage_account_set.add_account_result(ACCOUNT_5, False)
# All accounts should have the same rank (0)
assert storage_account_set.get_storage_account(ACCOUNT_1).get_rank() == 0
assert storage_account_set.get_storage_account(ACCOUNT_2).get_rank() == 0
assert storage_account_set.get_storage_account(ACCOUNT_3).get_rank() == 0
assert storage_account_set.get_storage_account(ACCOUNT_4).get_rank() == 0
assert storage_account_set.get_storage_account(ACCOUNT_5).get_rank() == 0
def test_check_rank_when_success_rate_is_different():
current_time = 0
def time_provider():
return current_time
storage_account_set = create_storage_account_set(time_provider)
# Simulate data in 30 seconds passing
for current_time in range(60):
storage_account_set.add_account_result(ACCOUNT_1, True) # 100% success
storage_account_set.add_account_result(ACCOUNT_2, current_time % 10 != 0) # ~90% success
storage_account_set.add_account_result(ACCOUNT_3, current_time % 2 == 0) # ~50% success
storage_account_set.add_account_result(ACCOUNT_4, current_time % 3 == 0) # ~33% success
storage_account_set.add_account_result(ACCOUNT_5, False) # 0% success
# Get shuffled accounts
ranked_accounts = storage_account_set.get_ranked_shuffled_accounts()
# Verify that the accounts are ranked in the correct order
assert ranked_accounts[0].get_account_name() == ACCOUNT_1
assert ranked_accounts[1].get_account_name() == ACCOUNT_2
assert ranked_accounts[2].get_account_name() in [ACCOUNT_3, ACCOUNT_4]
assert ranked_accounts[3].get_account_name() in [ACCOUNT_3, ACCOUNT_4]
assert ranked_accounts[4].get_account_name() == ACCOUNT_5
# Verify the rank itself
assert ranked_accounts[0].get_rank() == 1
assert ranked_accounts[1].get_rank() == approx(0.9, 0.01)
assert storage_account_set.accounts[ACCOUNT_3].get_rank() == 0.5
assert storage_account_set.accounts[ACCOUNT_4].get_rank() == approx(0.32, 0.1)
assert ranked_accounts[4].get_rank() == 0
def test_old_results_count_for_less():
current_time = 0
def time_provider():
return current_time
storage_account_set = create_storage_account_set(time_provider)
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, False)
# rank should be smaller than 0.5 as new samples are more important
assert storage_account_set.accounts[ACCOUNT_1].get_rank() < 0.5
def test_multiple_results():
current_time = 0
def time_provider():
return current_time
storage_account_set = create_storage_account_set(time_provider)
# all results go to the same bucket
storage_account_set.add_account_result(ACCOUNT_1, True)
storage_account_set.add_account_result(ACCOUNT_1, True)
storage_account_set.add_account_result(ACCOUNT_1, True)
storage_account_set.add_account_result(ACCOUNT_1, False)
storage_account_set.add_account_result(ACCOUNT_1, False)
storage_account_set.add_account_result(ACCOUNT_1, False)
assert storage_account_set.accounts[ACCOUNT_1].get_rank() == 0.5
def test_buckets_are_recycled():
current_time = 0
def time_provider():
return current_time
storage_account_set = create_storage_account_set(time_provider)
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, True)
# rank should be greater than 0 as we have 3 successful results
assert storage_account_set.accounts[ACCOUNT_1].get_rank() > 0
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, False)
current_time += 10
storage_account_set.add_account_result(ACCOUNT_1, False)
assert storage_account_set.accounts[ACCOUNT_1].get_rank() == 0
def test_big_break():
current_time = 0
def time_provider():
return current_time
storage_account_set = create_storage_account_set(time_provider)
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, True)
current_time += 11
storage_account_set.add_account_result(ACCOUNT_1, True)
assert storage_account_set.accounts[ACCOUNT_1].get_rank() > 0
current_time += 90
storage_account_set.add_account_result(ACCOUNT_1, False)
assert storage_account_set.accounts[ACCOUNT_1].get_rank() == 0
|