1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
"""
Examples to show usage of the azure-core-tracing-opentelemetry
with the Eventhub SDK.
This example traces calls for sending a batch to eventhub.
An alternative path to export using the OpenTelemetry exporter for Azure Monitor
is also mentioned in the sample. Please take a look at the commented code.
"""
# Declare OpenTelemetry as enabled tracing plugin for Azure SDKs
from azure.core.settings import settings
settings.tracing_implementation = "opentelemetry"
# In the below example, we use a simple console exporter, uncomment these lines to use
# the OpenTelemetry exporter for Azure Monitor.
# Example of a trace exporter for Azure Monitor, but you can use anything OpenTelemetry supports.
# from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
# exporter = AzureMonitorTraceExporter(
# connection_string="the connection string used for your Application Insights resource"
# )
# Regular open telemetry usage from here, see https://github.com/open-telemetry/opentelemetry-python
# for details
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
# Simple console exporter
exporter = ConsoleSpanExporter()
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# see issue https://github.com/open-telemetry/opentelemetry-python/issues/3713
trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(exporter)) # type: ignore
from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient
import os
FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_HOSTNAME"]
EVENTHUB_NAME = os.environ["EVENT_HUB_NAME"]
credential = os.environ["EVENTHUB_CONN_STR"]
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))
def on_partition_initialize(partition_context):
# Put your code here.
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
# Put your code here.
print("Partition: {} has been closed, reason for closing: {}.".format(partition_context.partition_id, reason))
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print(
"An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id, error
)
)
else:
print("An exception: {} occurred during the load balance process.".format(error))
with tracer.start_as_current_span(name="MyApplication"):
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=credential,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
)
try:
with consumer_client:
consumer_client.receive(
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print("Stopped receiving.")
|