1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
import json
from devtools_testutils import AzureRecordedTestCase, recorded_by_proxy
from azure.eventgrid import EventGridPublisherClient
from cloudevents.http import CloudEvent
from eventgrid_preparer import (
EventGridPreparer,
)
class TestEventGridPublisherClientCncf(AzureRecordedTestCase):
def create_eg_publisher_client(self, endpoint):
credential = self.get_credential(EventGridPublisherClient)
client = self.create_client_from_credential(EventGridPublisherClient, credential=credential, endpoint=endpoint)
return client
@EventGridPreparer()
@recorded_by_proxy
def test_send_cloud_event_data_dict(self, eventgrid_cloud_event_topic_endpoint):
client = self.create_eg_publisher_client(eventgrid_cloud_event_topic_endpoint)
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = {"message": "Hello World!"}
cloud_event = CloudEvent(attributes, data)
def callback(request):
req = json.loads(request.http_request.body)
assert req[0].get("data") is not None
assert isinstance(req[0], dict)
assert req[0].get("type") == "com.example.sampletype1"
assert req[0].get("source") == "https://example.com/event-producer"
client.send(cloud_event, raw_request_hook=callback)
@EventGridPreparer()
@recorded_by_proxy
def test_send_cloud_event_data_base64_using_data(self, eventgrid_cloud_event_topic_endpoint):
client = self.create_eg_publisher_client(eventgrid_cloud_event_topic_endpoint)
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = b"hello world"
cloud_event = CloudEvent(attributes, data)
def callback(request):
req = json.loads(request.http_request.body)
assert req[0].get("data_base64") is not None
assert req[0].get("data") is None
client.send(cloud_event, raw_request_hook=callback)
@EventGridPreparer()
@recorded_by_proxy
def test_send_cloud_event_data_none(self, eventgrid_cloud_event_topic_endpoint):
client = self.create_eg_publisher_client(eventgrid_cloud_event_topic_endpoint)
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = None
cloud_event = CloudEvent(attributes, data)
client.send(cloud_event)
@EventGridPreparer()
@recorded_by_proxy
def test_send_cloud_event_data_str(self, eventgrid_cloud_event_topic_endpoint):
client = self.create_eg_publisher_client(eventgrid_cloud_event_topic_endpoint)
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = "hello world"
def callback(request):
req = json.loads(request.http_request.body)
assert req[0].get("data_base64") is None
assert req[0].get("data") is not None
cloud_event = CloudEvent(attributes, data)
client.send(cloud_event, raw_request_hook=callback)
@EventGridPreparer()
@recorded_by_proxy
def test_send_cloud_event_data_as_list(self, eventgrid_cloud_event_topic_endpoint):
client = self.create_eg_publisher_client(eventgrid_cloud_event_topic_endpoint)
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = "hello world"
cloud_event = CloudEvent(attributes, data)
client.send([cloud_event])
@EventGridPreparer()
@recorded_by_proxy
def test_send_cloud_event_data_with_extensions(self, eventgrid_cloud_event_topic_endpoint):
client = self.create_eg_publisher_client(eventgrid_cloud_event_topic_endpoint)
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
"ext1": "extension",
}
data = "hello world"
cloud_event = CloudEvent(attributes, data)
client.send([cloud_event])
|