File: test_cloud_event_tracing.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (63 lines) | stat: -rw-r--r-- 2,625 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import pytest
import json

from azure.core.pipeline import PipelineRequest, PipelineContext
from azure.core.pipeline.transport import HttpRequest
from azure.core.messaging import CloudEvent
from azure.eventgrid._legacy._policies import CloudEventDistributedTracingPolicy
from _mocks import cloud_storage_dict

# spell-checker:disable
_content_type = "application/cloudevents-batch+json; charset=utf-8"
_traceparent_value = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
_tracestate_value = "rojo=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01,congo=lZWRzIHRoNhcm5hbCBwbGVhc3VyZS4"
# spell-chcker:enable


class EventGridSerializationTests(object):
    def test_cloud_event_policy_copies(self):
        policy = CloudEventDistributedTracingPolicy()

        body = json.dumps([cloud_storage_dict])
        universal_request = HttpRequest("POST", "http://127.0.0.1/", data=body)
        universal_request.headers["content-type"] = _content_type
        universal_request.headers["traceparent"] = _traceparent_value
        universal_request.headers["tracestate"] = _tracestate_value

        request = PipelineRequest(universal_request, PipelineContext(None))

        resp = policy.on_request(request)

        body = json.loads(request.http_request.body)

        for item in body:
            assert "traceparent" in item
            assert "tracestate" in item

    def test_cloud_event_policy_no_copy_if_trace_exists(self):
        policy = CloudEventDistributedTracingPolicy()

        cloud_storage_dict.update({"traceparent": "exists", "tracestate": "state_exists"})
        body = json.dumps([cloud_storage_dict])
        universal_request = HttpRequest("POST", "http://127.0.0.1/", data=body)
        universal_request.headers["content-type"] = _content_type
        universal_request.headers["traceparent"] = _traceparent_value
        universal_request.headers["tracestate"] = _tracestate_value

        request = PipelineRequest(universal_request, PipelineContext(None))

        resp = policy.on_request(request)

        body = json.loads(request.http_request.body)

        for item in body:
            assert "traceparent" in item
            assert "tracestate" in item
            assert item["traceparent"] == "exists"
            assert item["tracestate"] == "state_exists"