File: vadd_cas.py

package info (click to toggle)
redis 5%3A8.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 22,304 kB
  • sloc: ansic: 216,903; tcl: 51,562; sh: 4,625; perl: 4,214; cpp: 3,568; python: 2,954; makefile: 2,055; ruby: 639; javascript: 30; csh: 7
file content (98 lines) | stat: -rw-r--r-- 3,733 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from test import TestCase, generate_random_vector
import threading
import struct
import math
import time
import random
from typing import List, Dict

class ConcurrentCASTest(TestCase):
    def getname(self):
        return "Concurrent VADD with CAS"

    def estimated_runtime(self):
        return 1.5

    def worker(self, vectors: List[List[float]], start_idx: int, end_idx: int,
              dim: int, results: Dict[str, bool]):
        """Worker thread that adds a subset of vectors using VADD CAS"""
        for i in range(start_idx, end_idx):
            vec = vectors[i]
            name = f"{self.test_key}:item:{i}"
            vec_bytes = struct.pack(f'{dim}f', *vec)

            # Try to add the vector with CAS
            try:
                result = self.redis.execute_command('VADD', self.test_key, 'FP32',
                                                  vec_bytes, name, 'CAS')
                results[name] = (result == 1)  # Store if it was actually added
            except Exception as e:
                results[name] = False
                print(f"Error adding {name}: {e}")

    def verify_vector_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate cosine similarity between two vectors"""
        dot_product = sum(a*b for a,b in zip(vec1, vec2))
        norm1 = math.sqrt(sum(x*x for x in vec1))
        norm2 = math.sqrt(sum(x*x for x in vec2))
        return dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0

    def test(self):
        # Test parameters
        dim = 128
        total_vectors = 5000
        num_threads = 8
        vectors_per_thread = total_vectors // num_threads

        # Generate all vectors upfront
        random.seed(42)  # For reproducibility
        vectors = [generate_random_vector(dim) for _ in range(total_vectors)]

        # Prepare threads and results dictionary
        threads = []
        results = {}  # Will store success/failure for each vector

        # Launch threads
        for i in range(num_threads):
            start_idx = i * vectors_per_thread
            end_idx = start_idx + vectors_per_thread if i < num_threads-1 else total_vectors
            thread = threading.Thread(target=self.worker,
                                   args=(vectors, start_idx, end_idx, dim, results))
            threads.append(thread)
            thread.start()

        # Wait for all threads to complete
        for thread in threads:
            thread.join()

        # Verify cardinality
        card = self.redis.execute_command('VCARD', self.test_key)
        assert card == total_vectors, \
            f"Expected {total_vectors} elements, but found {card}"

        # Verify each vector
        num_verified = 0
        for i in range(total_vectors):
            name = f"{self.test_key}:item:{i}"

            # Verify the item was successfully added
            assert results[name], f"Vector {name} was not successfully added"

            # Get the stored vector
            stored_vec_raw = self.redis.execute_command('VEMB', self.test_key, name)
            stored_vec = [float(x) for x in stored_vec_raw]

            # Verify vector dimensions
            assert len(stored_vec) == dim, \
                f"Stored vector dimension mismatch for {name}: {len(stored_vec)} != {dim}"

            # Calculate similarity with original vector
            similarity = self.verify_vector_similarity(vectors[i], stored_vec)
            assert similarity > 0.99, \
                f"Low similarity ({similarity}) for {name}"

            num_verified += 1

        # Final verification
        assert num_verified == total_vectors, \
            f"Only verified {num_verified} out of {total_vectors} vectors"