File: gen_ai_trace_verifier.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (134 lines) | stat: -rw-r--r-- 6,115 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import numbers
import json
from opentelemetry.sdk.trace import Span


class GenAiTraceVerifier:

    def check_span_attributes(self, span, attributes):
        # Convert the list of tuples to a dictionary for easier lookup
        attribute_dict = dict(attributes)

        for attribute_name in span.attributes.keys():

            # Check if the attribute name exists in the input attributes
            if attribute_name not in attribute_dict:
                print(f"Attribute name {attribute_name} not in dictionary. Return False.")
                return False

            attribute_value = attribute_dict[attribute_name]
            print(f"Attribute name: `{attribute_name}`. Expected attribute value: `{attribute_value}`.")

            if isinstance(attribute_value, list):
                # Check if the attribute value in the span matches the provided list
                if span.attributes[attribute_name] != attribute_value:
                    print(
                        f"Case 1: Attribute values do not match (`{span.attributes[attribute_name]}` != `{attribute_value}`). Return False."
                    )
                    return False
            elif isinstance(attribute_value, tuple):
                # Check if the attribute value in the span matches the provided list
                if span.attributes[attribute_name] != attribute_value:
                    print(
                        f"Case 2: Attribute values do not match (`{span.attributes[attribute_name]}` != `{attribute_value}`). Return False."
                    )
                    return False
            else:
                # Check if the attribute value matches the provided value
                if attribute_value == "+":
                    if not isinstance(span.attributes[attribute_name], numbers.Number):
                        print(f"Actual attribute value is not a number.")
                        return False
                    if span.attributes[attribute_name] < 0:
                        print(f"Actual attribute value is negative.")
                        return False
                elif attribute_value != "" and span.attributes[attribute_name] != attribute_value:
                    print(
                        f"Case 3: Attribute values do not match (`{span.attributes[attribute_name]}` != `{attribute_value}`). Return False."
                    )
                    return False
                # Check if the attribute value in the span is not empty when the provided value is ""
                elif attribute_value == "" and not span.attributes[attribute_name]:
                    print(f"Case 4. Return False.")
                    return False

        return True

    def is_valid_json(self, my_string):
        try:
            json_object = json.loads(my_string)
        except ValueError as e1:
            return False
        except TypeError as e2:
            return False
        return True

    def check_json_string(self, expected_json, actual_json):
        if self.is_valid_json(expected_json) and self.is_valid_json(actual_json):
            return self.check_event_attributes(json.loads(expected_json), json.loads(actual_json))
        else:
            return False

    def check_event_attributes(self, expected_dict, actual_dict):
        if set(expected_dict.keys()) != set(actual_dict.keys()):
            return False
        for key, expected_val in expected_dict.items():
            if key not in actual_dict:
                return False
            actual_val = actual_dict[key]

            if self.is_valid_json(expected_val):
                if not self.is_valid_json(actual_val):
                    return False
                if not self.check_json_string(expected_val, actual_val):
                    return False
            elif isinstance(expected_val, dict):
                if not isinstance(actual_val, dict):
                    return False
                if not self.check_event_attributes(expected_val, actual_val):
                    return False
            elif isinstance(expected_val, list):
                if not isinstance(actual_val, list):
                    return False
                if len(expected_val) != len(actual_val):
                    return False
                for expected_list, actual_list in zip(expected_val, actual_val):
                    if not self.check_event_attributes(expected_list, actual_list):
                        return False
            elif isinstance(expected_val, str) and expected_val == "*":
                if actual_val == "":
                    return False
            elif expected_val != actual_val:
                return False
        return True

    def check_span_events(self, span, expected_events):
        span_events = list(span.events)  # Create a list of events from the span

        for expected_event in expected_events:
            for actual_event in span_events:
                if expected_event["name"] == actual_event.name:
                    if not self.check_event_attributes(expected_event["attributes"], actual_event.attributes):
                        return False
                    span_events.remove(actual_event)  # Remove the matched event from the span_events
                    break
            else:
                return False  # If no match found for an expected event

        if len(span_events) > 0:  # If there are any additional events in the span_events
            return False

        prev_event = None
        for actual_event in list(span.events):
            if prev_event is not None and actual_event.timestamp <= prev_event.timestamp:
                print(
                    f"Event {actual_event.name} has a timestamp {actual_event.timestamp} that is not greater than the previous event's timestamp {prev_event.timestamp}, {prev_event.name}"
                )
                return False
            prev_event = actual_event

        return True