1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import logging
import sys
import os
import json
import pytest
import uuid
from msrest.serialization import UTC
import datetime as dt
from devtools_testutils import AzureRecordedTestCase
from azure.core.messaging import CloudEvent
from azure.core.exceptions import HttpResponseError
from azure.eventgrid import EventGridConsumerClient, EventGridPublisherClient
from eventgrid_preparer import (
EventGridPreparer,
)
class TestEventGridDualClient(AzureRecordedTestCase):
def create_eg_publisher_client(self, endpoint, topic=None):
credential = self.get_credential(EventGridPublisherClient)
client = self.create_client_from_credential(
EventGridPublisherClient, credential=credential, endpoint=endpoint, namespace_topic=topic
)
return client
def create_eg_consumer_client(self, endpoint, topic=None, subscription=None):
credential = self.get_credential(EventGridConsumerClient)
client = self.create_client_from_credential(
EventGridConsumerClient,
credential=credential,
endpoint=endpoint,
namespace_topic=topic,
subscription=subscription,
)
return client
@pytest.mark.live_test_only
@EventGridPreparer()
def test_eg_dual_client_send_eventgrid_event(self, **kwargs):
eventgrid_endpoint = kwargs["eventgrid_endpoint"]
eventgrid_topic_name = kwargs["eventgrid_topic_name"]
event_endpoint = kwargs["eventgrid_topic_endpoint"]
namespace_client = self.create_eg_publisher_client(eventgrid_endpoint, eventgrid_topic_name)
basic_client = self.create_eg_publisher_client(event_endpoint)
event = {
"id": uuid.uuid4(),
"data": {
"team": "azure-sdk",
"project": "azure-eventgrid",
},
"subject": "Door1",
"eventType": "Azure.SDK.TestEvent",
"eventTime": dt.datetime.now(UTC()),
"dataVersion": "2.0",
}
basic_client.send(event)
with pytest.raises(TypeError):
namespace_client.send(event)
@pytest.mark.live_test_only
@EventGridPreparer()
def test_eg_dual_client_send_custom_event(self, **kwargs):
eventgrid_endpoint = kwargs["eventgrid_endpoint"]
eventgrid_topic_name = kwargs["eventgrid_topic_name"]
custom_event_endpoint = kwargs["eventgrid_custom_event_topic_endpoint"]
namespace_client = self.create_eg_publisher_client(eventgrid_endpoint, eventgrid_topic_name)
basic_client = self.create_eg_publisher_client(custom_event_endpoint)
custom_event = {
"customSubject": "sample",
"customEventType": "sample.event",
"customDataVersion": "2.0",
"customId": "1234",
"customEventTime": dt.datetime.now(UTC()).isoformat(),
"customData": "sample data",
}
basic_client.send(custom_event)
with pytest.raises(HttpResponseError):
namespace_client.send(custom_event)
@pytest.mark.live_test_only
@EventGridPreparer()
def test_eg_dual_client_send_channel_name(self, **kwargs):
eventgrid_endpoint = kwargs["eventgrid_endpoint"]
eventgrid_topic_name = kwargs["eventgrid_topic_name"]
cloud_event_endpoint = kwargs["eventgrid_partner_namespace_topic_endpoint"]
channel_name = kwargs["eventgrid_partner_channel_name"]
namespace_client = self.create_eg_publisher_client(eventgrid_endpoint, eventgrid_topic_name)
basic_client = self.create_eg_publisher_client(cloud_event_endpoint)
cloud_event = CloudEvent(
source="http://samplesource.dev",
data={"sample": "cloudevent"},
type="Sample.Cloud.Event",
)
basic_client.send(cloud_event, channel_name=channel_name)
with pytest.raises(ValueError):
namespace_client.send(cloud_event, channel_name=channel_name)
@pytest.mark.live_test_only
@EventGridPreparer()
def test_eg_dual_client_consumer(self, **kwargs):
eventgrid_endpoint = kwargs["eventgrid_endpoint"]
eventgrid_topic_name = kwargs["eventgrid_topic_name"]
subscription_name = kwargs["eventgrid_event_subscription_name"]
cloud_event_endpoint = kwargs["eventgrid_cloud_event_topic_endpoint"]
namespace_client = self.create_eg_consumer_client(eventgrid_endpoint, eventgrid_topic_name, subscription_name)
namespace_client.receive()
with pytest.raises(ValueError):
basic_client = self.create_eg_consumer_client(cloud_event_endpoint)
basic_client.receive()
|