| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 
 | # Guide for migrating azure-eventhub to v5 from v1
This guide is intended to assist in the migration to `azure-eventhub` v5 from v1. It will focus on side-by-side comparisons for similar operations between the two packages.
Familiarity with the `azure-eventhub` v1 package is assumed. For those new to the Event Hubs client library for Python, please refer to the [README for `azure-eventhub`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub/README.md) rather than this guide.
## Table of contents
* [Migration benefits](#migration-benefits)
    - [Cross Service SDK improvements](#cross-service-sdk-improvements)
    - [New features](#new-features)
* [Important changes](#important-changes)
    - [Client hierarchy](#client-hierarchy)
    - [Client constructors](#client-constructors)
    - [Sending events](#sending-events)
    - [Receiving events](#receiving-events)
    - [Migrating code from EventProcessorHost to EventHubConsumerClient for receiving events](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events)
* [Additional samples](#additional-samples)
## Migration benefits
A natural question to ask when considering whether or not to adopt a new version or library is what the benefits of doing so would be. As Azure has matured and been embraced by a more diverse group of developers, we have been focused on learning the patterns and practices to best support developer productivity and to understand the gaps that the Python client libraries have.
There were several areas of consistent feedback expressed across the Azure client library ecosystem. One of the most important is that the client libraries for different Azure services have not had a consistent approach to organization, naming, and API structure. Additionally, many developers have felt that the learning curve was difficult, and the APIs did not offer a good, approachable, and consistent onboarding story for those learning Azure or exploring a specific Azure service.
To try and improve the development experience across Azure services, a set of uniform [design guidelines](https://azure.github.io/azure-sdk/general_introduction.html) was created for all languages to drive a consistent experience with established API patterns for all services. A set of [Python-specific guidelines](https://azure.github.io/azure-sdk/python/guidelines/index.html) was also introduced to ensure that Python clients have a natural and idiomatic feel with respect to the Python ecosystem. Further details are available in the guidelines for those interested.
### Cross Service SDK improvements
The modern Event Hubs client library also provides the ability to share in some of the cross-service improvements made to the Azure development experience, such as:
- using the new [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/README.md) library to share a single authentication approach between clients
- a unified logging and diagnostics pipeline offering a common view of the activities across each of the client libraries
### New features
We have a variety of new features in version 5 of the Event Hubs library.
- Ability to create a batch of messages with the `EventHubProducer.create_batch()` and `EventDataBatch.add()` APIs. This will help you manage events to be sent in the most optimal way.
- Ability to configure the retry policy used by operations on the clients.
- Authentication with AAD credentials using [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/README.md).
Refer to the [changelog](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub/CHANGELOG.md) for more new features, changes and bug fixes.
## Important changes
### Client hierarchy
In the interest of simplifying the API surface, we've made two distinct clients: the `EventHubProducerClient` for sending events and the `EventHubConsumerClient` for receiving events. This is in contrast to the single `EventHubClient` that was used to create senders and receivers.
We've also merged the functionality from `EventProcessorHost` into `EventHubConsumerClient`.
#### Approachability
By having a single entry point for sending, the `EventHubProducerClient` helps with the discoverability of the API
as you can explore all available features for sending events through methods from a single client, as opposed to searching
through documentation or exploring namespace for the types that you can instantiate.
Similarly, by having a single entry point for receiving of any type (from single partition, all partitions, or with load balancing and checkpointing features) within Event Hubs, the `EventHubConsumerClient` helps with the discoverability of the API as you can explore all available features for receiving events through methods from a single client, as opposed to searching
through documentation or exploring namespace for the types that you can instantiate.
#### Consistency
We now have methods with similar names, signature and location for sending and receiving.
This provides consistency and predictability on the various features of the library.
### Client constructors
While we continue to support connection strings when constructing a client, below are the differences in the two versions:
- In v5, we now support the use of Azure Active Directory for authentication.
The new [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/README.md) library allows us
to share a single authentication solution between clients of different Azure services.
- The option to construct a client using an address of the form `amqps://<SAS-policy-name>:<SAS-key>@<fully-qualified-namespace>/<eventhub-name>` is no longer supported in v5. This address is not readily available in the Azure portal or in any tooling and so was subject to human error. We instead recommend using the connection string if you want to use a SAS policy.
In v1:
```python
# Authenticate with address
eventhub_client = EventHubClient(address)
# Authenticate with connection string
eventhub_client = EventHubClient.from_connection_string(conn_str)
# Authenticate the EventProcessorHost and StorageCheckpointLeaseManager
eh_config = EventHubConfig(eh_namespace, eventhub_name, user, key, consumer_group="$default")
storage_manager = AzureStorageCheckpointLeaseManager(storage_account_name, storage_key, lease_container_name)
host = EventProcessorHost(EventProcessor, eventhub_config, storage_manager)
```
In v5:
```python
# Address is no longer used for authentication.
# Authenticate with connection string
producer_client = EventHubProducerClient.from_connection_string(conn_str)
consumer_client = EventHubConsumerClient.from_connection_string(conn_str)
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_conn_str, container_name)
consumer_client_with_checkpoint_store = EventHubConsumerClient.from_connection_string(conn_str, consumer_group='$Default', checkpoint_store=checkpoint_store)
# Authenticate with Active Directory
from azure.identity import EnvironmentCredential
producer_client = EventHubProducerClient(fully_qualified_namespace, eventhub_name, credential=EnvironmentCredential())
consumer_client = EventHubConsumerClient(fully_qualified_namespace, eventhub_name, consumer_group='$Default', credential=EnvironmentCredential())
checkpoint_store = BlobCheckpointStore(blob_account_url, container_name, credential=EnvironmentCredential())
consumer_client_with_checkpoint_store = EventHubConsumerClient(fully_qualified_namespace, eventhub_name, consumer_group='$Default', credential=EnvironmentCredential(), checkpoint_store=checkpoint_store)
```
### Sending events
- The `run` and `stop` methods were previously used since the single `EventHubClient` controlled the lifecycle for all senders and receivers. In v5, the `run` and `stop` methods are deprecated since the `EventHubProducerClient` controls its own lifecycle.
- The `add_sender` method is no longer used to create sender clients. Instead, the `EventHubProducerClient` is used for sending events.
- The `send` method that allowed sending single events in each call is removed in favor of the `send_batch` to encourage sending events in batches for better throughput.
- The new `send_batch` method takes a list of `EventData` objects that is batched into a single message by the client before sending.
- The above approach fails if the list of events increase the size limit of the message. To safely send within size limits, use the `EventDataBatch` object to which you can add `EventData` objects until the size limit is reached after which you can send it using the same `send_batch` method.
In v1:
```python
client = EventHubClient(address)
sender = client.add_sender()
client.run()
sender.send(EventData('Single message'))
client.stop()
```
In v5:
```python
producer_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name)
# Send list of EventData. This can fail if the list exceeds size limit.
event_data_batch = [EventData('Single message')]
producer.send_batch(event_data_batch)
# Send EventDataBatch. Multiple messages will safely be sent by using `create_batch` to create a batch object.
# `add` will throw a ValueError if added size results in the batch exceeding the maximum batch size.
event_data_batch = producer.create_batch()
event_data_batch.add(EventData('Single message'))
producer.send_batch(event_data_batch)
```
### Receiving events
- The `run` and `stop` methods were previously used since the single `EventHubClient` controlled the lifecycle for all senders and receivers. In v5, the `run` and `stop` methods are deprecated since the `EventHubConsumerClient` controls its own lifecycle.
- The `add_receiver` method is no longer used to create receiver clients. Instead, the `EventHubConsumerClient` is used for receiving events.
- In v1, the `receive` method returned a list of `EventData`. You would call this method repeatedly every time you want receive a set of events. In v5, the new `receive` method takes user callback to process events and any resulting errors. This way, you call the method once and it continues to process incoming events until you stop it.
- Additionally, we have a method `receive_batch` which behaves the same as `receive`, but calls the user callback with a batch of events instead of single events.
- The same methods can be used whether you want to receive from a single partition or from all partitions.
In v1:
```python
client = EventHubClient(address)
receiver = client.add_receiver(consumer_group, partition)
client.run()
batch = receiver.receive()
client.stop()
```
In v5:
```python
# Receive
def on_event(partition_context, event):
    print("Received event from partition: {}.".format(partition_context.partition_id))
consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, eventhub_name=eh_name)
with consumer_client:
    consumer_client.receive(on_event=on_even, partition_id=partition_id)
# Receive batch
def on_event_batch(partition_context, event_batch):
    print("Partition {}, Received count: {}".format(partition_context.partition_id, len(event_batch)))
consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, eventhub_name=eh_name)
with consumer_client:
    consumer_client.receive_batch(on_event_batch=on_event_batch, partition_id=partition_id)
```
### Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events
In V1, `EventProcessorHost` allowed you to balance the load between multiple instances of
your program when receiving events.
In V5, `EventHubConsumerClient` allows you to do the same with the `receive()` method if you
pass a `CheckpointStore` to the constructor.
So in v1:
```python
import logging
import asyncio
import os
from azure.eventprocessorhost import (
    AbstractEventProcessor,
    AzureStorageCheckpointLeaseManager,
    EventHubConfig,
    EventProcessorHost,
    EPHOptions)
logger = logging.getLogger("azure.eventhub")
class EventProcessor(AbstractEventProcessor):
    def __init__(self, params=None):
        super().__init__(params)
        self._msg_counter = 0
    async def open_async(self, context):
        logger.info("Connection established {}".format(context.partition_id))
    async def close_async(self, context, reason):
        logger.info("Connection closed (reason {}, id {})".format(
            reason,
            context.partition_id))
    async def process_events_async(self, context, messages):
        self._msg_counter += len(messages)
        logger.info("Partition id {}, Events processed {}".format(context.partition_id, self._msg_counter))
        await context.checkpoint_async()
    async def process_error_async(self, context, error):
        logger.error("Event Processor Error {!r}".format(error))
# Storage Account Credentials
STORAGE_ACCOUNT_NAME = os.environ.get('AZURE_STORAGE_ACCOUNT')
STORAGE_KEY = os.environ.get('AZURE_STORAGE_ACCESS_KEY')
LEASE_CONTAINER_NAME = "leases"
NAMESPACE = os.environ.get('EVENT_HUB_NAMESPACE')
EVENTHUB = os.environ.get('EVENT_HUB_NAME')
USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')
# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$Default")
eh_options = EPHOptions()
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(
    STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
# Event loop and host
loop = asyncio.get_event_loop()
host = EventProcessorHost(
    EventProcessor,
    eh_config,
    storage_manager,
    ep_params=["param1","param2"],
    eph_options=eh_options,
    loop=loop)
try:
    loop.run_until_complete(host.open_async())
finally:
    await host.close_async()
    loop.stop()
```
And in v5:
```python
import asyncio
import os
import logging
from collections import defaultdict
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
logging.basicConfig(level=logging.INFO)
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name"
logger = logging.getLogger("azure.eventhub")
events_processed = defaultdict(int)
async def on_event(partition_context, event):
    partition_id = partition_context.partition_id
    events_processed[partition_id] += 1
    logger.info("Partition id {}, Events processed {}".format(partition_id, events_processed[partition_id]))
    await partition_context.update_checkpoint(event)
async def on_partition_initialize(context):
    logger.info("Partition {} initialized".format(context.partition_id))
async def on_partition_close(context, reason):
    logger.info("Partition {} has closed, reason {})".format(context.partition_id, reason))
async def on_error(context, error):
    if context:
        logger.error("Partition {} has a partition related error {!r}.".format(context.partition_id, error))
    else:
        logger.error("Receiving event has a non-partition error {!r}".format(error))
async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR,
        consumer_group="$Default",
        checkpoint_store=checkpoint_store,
    )
    async with client:
        await client.receive(
            on_event,
            on_error=on_error,  # optional
            on_partition_initialize=on_partition_initialize,  # optional
            on_partition_close=on_partition_close,  # optional
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
```
> **Note:** V1 checkpoints are not compatible with V5 checkpoints.
If pointed at the same blob, consumption will begin at the first message.
V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed.
In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information
as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs.
Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail.
The following code snippet can be used to migrate checkpoint data from the legacy format. This snippet assumes that the default prefix configuration for the `EventProcessorHost` was used. If a custom prefix was configured, this code will need to be adjusted to account for the difference in format.
```python
import os
import json
from azure.storage.blob import BlobServiceClient, ContainerClient
EVENT_HUB_HOSTNAME = os.environ["EVENT_HUB_HOSTNAME"]  # <your-eventhub-namespace>.servicebus.windows.net
EVENT_HUB_NAME = os.environ["EVENT_HUB_NAME"]
EVENT_HUB_CONSUMER_GROUP = "$Default"  # Name of Event Hub consumer group
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name"  # Blob container to upload updated checkpoint information to.
LEGACY_BLOB_CONTAINER_NAME = "your-legacy-blob-container-name" # Please make sure the legacy blob container resource exists.
def readLegacyCheckpoints(
    storage_connection_str, legacy_blob_container_name, consumer_group
):
    container_client = ContainerClient.from_connection_string(
        storage_connection_str, legacy_blob_container_name
    )
    legacy_checkpoints = []
    # Read and process legacy checkpoints in blobs in container.
    for blob in container_client.list_blobs():
        blob_client = container_client.get_blob_client(blob)
        stream = blob_client.download_blob()
        for chunk in stream.chunks():
            legacy_checkpoints.append(json.loads(chunk.decode("UTF-8")))
    return legacy_checkpoints
if __name__ == "__main__":
    legacy_checkpoints = readLegacyCheckpoints(
        STORAGE_CONNECTION_STR, LEGACY_BLOB_CONTAINER_NAME, EVENT_HUB_CONSUMER_GROUP
    )
    # The checkpoint blobs require a specific naming scheme to be valid for use with the
    # V5 CheckpointStore.
    prefix = "{}/{}/{}/checkpoint/".format(
        EVENT_HUB_FULLY_QUALIFIED_NAMESPACE.lower(),
        EVENT_HUB_NAME.lower(),
        EVENT_HUB_CONSUMER_GROUP.lower(),
    )
    # Create the storage client to write the migrated checkpoints.  This example
    # assumes that the connection string grants the appropriate permissions to create a
    # container in the storage account.
    blob_service_client = BlobServiceClient.from_connection_string(
        STORAGE_CONNECTION_STR
    )
    container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME)
    try:
        # Create container if it doesn't already exist.
        container_client.create_container()
    except:
        pass
    # Translate each legacy checkpoint, storing offset and sequence data into correct
    # blob to align with V5 BlobCheckpointStore.
    for checkpoint in legacy_checkpoints:
        metadata = {
            "offset": str(checkpoint["offset"]),
            "sequencenumber": str(checkpoint["sequence_number"]),
        }
        name = "{}{}".format(prefix, checkpoint["partition_id"])
        container_client.upload_blob(name, data="", metadata=metadata)
```
## Additional samples
More examples can be found at [Samples for azure-eventhub](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub/samples)
 |