1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import pytest
import json
from azure.core.pipeline import PipelineRequest, PipelineContext
from azure.core.pipeline.transport import HttpRequest
from azure.core.messaging import CloudEvent
from azure.eventgrid._legacy._policies import CloudEventDistributedTracingPolicy
from _mocks import cloud_storage_dict
# spell-checker:disable
_content_type = "application/cloudevents-batch+json; charset=utf-8"
_traceparent_value = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"
_tracestate_value = "rojo=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01,congo=lZWRzIHRoNhcm5hbCBwbGVhc3VyZS4"
# spell-chcker:enable
class EventGridSerializationTests(object):
def test_cloud_event_policy_copies(self):
policy = CloudEventDistributedTracingPolicy()
body = json.dumps([cloud_storage_dict])
universal_request = HttpRequest("POST", "http://127.0.0.1/", data=body)
universal_request.headers["content-type"] = _content_type
universal_request.headers["traceparent"] = _traceparent_value
universal_request.headers["tracestate"] = _tracestate_value
request = PipelineRequest(universal_request, PipelineContext(None))
resp = policy.on_request(request)
body = json.loads(request.http_request.body)
for item in body:
assert "traceparent" in item
assert "tracestate" in item
def test_cloud_event_policy_no_copy_if_trace_exists(self):
policy = CloudEventDistributedTracingPolicy()
cloud_storage_dict.update({"traceparent": "exists", "tracestate": "state_exists"})
body = json.dumps([cloud_storage_dict])
universal_request = HttpRequest("POST", "http://127.0.0.1/", data=body)
universal_request.headers["content-type"] = _content_type
universal_request.headers["traceparent"] = _traceparent_value
universal_request.headers["tracestate"] = _tracestate_value
request = PipelineRequest(universal_request, PipelineContext(None))
resp = policy.on_request(request)
body = json.loads(request.http_request.body)
for item in body:
assert "traceparent" in item
assert "tracestate" in item
assert item["traceparent"] == "exists"
assert item["tracestate"] == "state_exists"
|