1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
from test import TestCase, generate_random_vector
import threading
import time
import struct
class ThreadingStressTest(TestCase):
def getname(self):
return "Concurrent VADD/DEL/VSIM operations stress test"
def estimated_runtime(self):
return 10 # Test runs for 10 seconds
def test(self):
# Constants - easy to modify if needed
NUM_VADD_THREADS = 10
NUM_VSIM_THREADS = 1
NUM_DEL_THREADS = 1
TEST_DURATION = 10 # seconds
VECTOR_DIM = 100
DEL_INTERVAL = 1 # seconds
# Shared flags and state
stop_event = threading.Event()
error_list = []
error_lock = threading.Lock()
def log_error(thread_name, error):
with error_lock:
error_list.append(f"{thread_name}: {error}")
def vadd_worker(thread_id):
"""Thread function to perform VADD operations"""
thread_name = f"VADD-{thread_id}"
try:
vector_count = 0
while not stop_event.is_set():
try:
# Generate random vector
vec = generate_random_vector(VECTOR_DIM)
vec_bytes = struct.pack(f'{VECTOR_DIM}f', *vec)
# Add vector with CAS option
self.redis.execute_command(
'VADD',
self.test_key,
'FP32',
vec_bytes,
f'{self.test_key}:item:{thread_id}:{vector_count}',
'CAS'
)
vector_count += 1
# Small sleep to reduce CPU pressure
if vector_count % 10 == 0:
time.sleep(0.001)
except Exception as e:
log_error(thread_name, f"Error: {str(e)}")
time.sleep(0.1) # Slight backoff on error
except Exception as e:
log_error(thread_name, f"Thread error: {str(e)}")
def del_worker():
"""Thread function that deletes the key periodically"""
thread_name = "DEL"
try:
del_count = 0
while not stop_event.is_set():
try:
# Sleep first, then delete
time.sleep(DEL_INTERVAL)
if stop_event.is_set():
break
self.redis.delete(self.test_key)
del_count += 1
except Exception as e:
log_error(thread_name, f"Error: {str(e)}")
except Exception as e:
log_error(thread_name, f"Thread error: {str(e)}")
def vsim_worker(thread_id):
"""Thread function to perform VSIM operations"""
thread_name = f"VSIM-{thread_id}"
try:
search_count = 0
while not stop_event.is_set():
try:
# Generate query vector
query_vec = generate_random_vector(VECTOR_DIM)
query_str = [str(x) for x in query_vec]
# Perform similarity search
args = ['VSIM', self.test_key, 'VALUES', VECTOR_DIM]
args.extend(query_str)
args.extend(['COUNT', 10])
self.redis.execute_command(*args)
search_count += 1
# Small sleep to reduce CPU pressure
if search_count % 10 == 0:
time.sleep(0.005)
except Exception as e:
# Don't log empty array errors, as they're expected when key doesn't exist
if "empty array" not in str(e).lower():
log_error(thread_name, f"Error: {str(e)}")
time.sleep(0.1) # Slight backoff on error
except Exception as e:
log_error(thread_name, f"Thread error: {str(e)}")
# Start all threads
threads = []
# VADD threads
for i in range(NUM_VADD_THREADS):
thread = threading.Thread(target=vadd_worker, args=(i,))
thread.start()
threads.append(thread)
# DEL threads
for _ in range(NUM_DEL_THREADS):
thread = threading.Thread(target=del_worker)
thread.start()
threads.append(thread)
# VSIM threads
for i in range(NUM_VSIM_THREADS):
thread = threading.Thread(target=vsim_worker, args=(i,))
thread.start()
threads.append(thread)
# Let the test run for the specified duration
time.sleep(TEST_DURATION)
# Signal all threads to stop
stop_event.set()
# Wait for threads to finish
for thread in threads:
thread.join(timeout=2.0)
# Check if Redis is still responsive
try:
ping_result = self.redis.ping()
assert ping_result, "Redis did not respond to PING after stress test"
except Exception as e:
assert False, f"Redis connection failed after stress test: {str(e)}"
# Report any errors for diagnosis, but don't fail the test unless PING fails
if error_list:
error_count = len(error_list)
print(f"\nEncountered {error_count} errors during stress test.")
print("First 5 errors:")
for error in error_list[:5]:
print(f"- {error}")
|