File: migration_guide.md

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (360 lines) | stat: -rw-r--r-- 19,924 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# Guide for migrating azure-eventhub to v5 from v1

This guide is intended to assist in the migration to `azure-eventhub` v5 from v1. It will focus on side-by-side comparisons for similar operations between the two packages.

Familiarity with the `azure-eventhub` v1 package is assumed. For those new to the Event Hubs client library for Python, please refer to the [README for `azure-eventhub`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub/README.md) rather than this guide.

## Table of contents

* [Migration benefits](#migration-benefits)
    - [Cross Service SDK improvements](#cross-service-sdk-improvements)
    - [New features](#new-features)
* [Important changes](#important-changes)
    - [Client hierarchy](#client-hierarchy)
    - [Client constructors](#client-constructors)
    - [Sending events](#sending-events)
    - [Receiving events](#receiving-events)
    - [Migrating code from EventProcessorHost to EventHubConsumerClient for receiving events](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events)
* [Additional samples](#additional-samples)

## Migration benefits

A natural question to ask when considering whether or not to adopt a new version or library is what the benefits of doing so would be. As Azure has matured and been embraced by a more diverse group of developers, we have been focused on learning the patterns and practices to best support developer productivity and to understand the gaps that the Python client libraries have.

There were several areas of consistent feedback expressed across the Azure client library ecosystem. One of the most important is that the client libraries for different Azure services have not had a consistent approach to organization, naming, and API structure. Additionally, many developers have felt that the learning curve was difficult, and the APIs did not offer a good, approachable, and consistent onboarding story for those learning Azure or exploring a specific Azure service.

To try and improve the development experience across Azure services, a set of uniform [design guidelines](https://azure.github.io/azure-sdk/general_introduction.html) was created for all languages to drive a consistent experience with established API patterns for all services. A set of [Python-specific guidelines](https://azure.github.io/azure-sdk/python/guidelines/index.html) was also introduced to ensure that Python clients have a natural and idiomatic feel with respect to the Python ecosystem. Further details are available in the guidelines for those interested.

### Cross Service SDK improvements

The modern Event Hubs client library also provides the ability to share in some of the cross-service improvements made to the Azure development experience, such as:
- using the new [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/README.md) library to share a single authentication approach between clients
- a unified logging and diagnostics pipeline offering a common view of the activities across each of the client libraries

### New features

We have a variety of new features in version 5 of the Event Hubs library.

- Ability to create a batch of messages with the `EventHubProducer.create_batch()` and `EventDataBatch.add()` APIs. This will help you manage events to be sent in the most optimal way.
- Ability to configure the retry policy used by operations on the clients.
- Authentication with AAD credentials using [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/README.md).

Refer to the [changelog](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub/CHANGELOG.md) for more new features, changes and bug fixes.

## Important changes

### Client hierarchy

In the interest of simplifying the API surface, we've made two distinct clients: the `EventHubProducerClient` for sending events and the `EventHubConsumerClient` for receiving events. This is in contrast to the single `EventHubClient` that was used to create senders and receivers.
We've also merged the functionality from `EventProcessorHost` into `EventHubConsumerClient`.

#### Approachability
By having a single entry point for sending, the `EventHubProducerClient` helps with the discoverability of the API
as you can explore all available features for sending events through methods from a single client, as opposed to searching
through documentation or exploring namespace for the types that you can instantiate.

Similarly, by having a single entry point for receiving of any type (from single partition, all partitions, or with load balancing and checkpointing features) within Event Hubs, the `EventHubConsumerClient` helps with the discoverability of the API as you can explore all available features for receiving events through methods from a single client, as opposed to searching
through documentation or exploring namespace for the types that you can instantiate.

#### Consistency
We now have methods with similar names, signature and location for sending and receiving.
This provides consistency and predictability on the various features of the library.

### Client constructors

While we continue to support connection strings when constructing a client, below are the differences in the two versions:
- In v5, we now support the use of Azure Active Directory for authentication.
The new [`azure-identity`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/identity/azure-identity/README.md) library allows us
to share a single authentication solution between clients of different Azure services.
- The option to construct a client using an address of the form `amqps://<SAS-policy-name>:<SAS-key>@<fully-qualified-namespace>/<eventhub-name>` is no longer supported in v5. This address is not readily available in the Azure portal or in any tooling and so was subject to human error. We instead recommend using the connection string if you want to use a SAS policy.

In v1:
```python
# Authenticate with address
eventhub_client = EventHubClient(address)

# Authenticate with connection string
eventhub_client = EventHubClient.from_connection_string(conn_str)

# Authenticate the EventProcessorHost and StorageCheckpointLeaseManager
eh_config = EventHubConfig(eh_namespace, eventhub_name, user, key, consumer_group="$default")
storage_manager = AzureStorageCheckpointLeaseManager(storage_account_name, storage_key, lease_container_name)
host = EventProcessorHost(EventProcessor, eventhub_config, storage_manager)
```
In v5:
```python
# Address is no longer used for authentication.

# Authenticate with connection string
producer_client = EventHubProducerClient.from_connection_string(conn_str)
consumer_client = EventHubConsumerClient.from_connection_string(conn_str)
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_conn_str, container_name)
consumer_client_with_checkpoint_store = EventHubConsumerClient.from_connection_string(conn_str, consumer_group='$Default', checkpoint_store=checkpoint_store)

# Authenticate with Active Directory
from azure.identity import EnvironmentCredential
producer_client = EventHubProducerClient(fully_qualified_namespace, eventhub_name, credential=EnvironmentCredential())
consumer_client = EventHubConsumerClient(fully_qualified_namespace, eventhub_name, consumer_group='$Default', credential=EnvironmentCredential())
checkpoint_store = BlobCheckpointStore(blob_account_url, container_name, credential=EnvironmentCredential())
consumer_client_with_checkpoint_store = EventHubConsumerClient(fully_qualified_namespace, eventhub_name, consumer_group='$Default', credential=EnvironmentCredential(), checkpoint_store=checkpoint_store)
```
### Sending events

- The `run` and `stop` methods were previously used since the single `EventHubClient` controlled the lifecycle for all senders and receivers. In v5, the `run` and `stop` methods are deprecated since the `EventHubProducerClient` controls its own lifecycle.
- The `add_sender` method is no longer used to create sender clients. Instead, the `EventHubProducerClient` is used for sending events.
- The `send` method that allowed sending single events in each call is removed in favor of the `send_batch` to encourage sending events in batches for better throughput.
- The new `send_batch` method takes a list of `EventData` objects that is batched into a single message by the client before sending.
- The above approach fails if the list of events increase the size limit of the message. To safely send within size limits, use the `EventDataBatch` object to which you can add `EventData` objects until the size limit is reached after which you can send it using the same `send_batch` method.

In v1:
```python
client = EventHubClient(address)
sender = client.add_sender()
client.run()
sender.send(EventData('Single message'))
client.stop()
```

In v5:
```python
producer_client = EventHubProducerClient.from_connection_string(conn_str, eventhub_name)

# Send list of EventData. This can fail if the list exceeds size limit.
event_data_batch = [EventData('Single message')]
producer.send_batch(event_data_batch)

# Send EventDataBatch. Multiple messages will safely be sent by using `create_batch` to create a batch object.
# `add` will throw a ValueError if added size results in the batch exceeding the maximum batch size.
event_data_batch = producer.create_batch()
event_data_batch.add(EventData('Single message'))
producer.send_batch(event_data_batch)
```

### Receiving events

- The `run` and `stop` methods were previously used since the single `EventHubClient` controlled the lifecycle for all senders and receivers. In v5, the `run` and `stop` methods are deprecated since the `EventHubConsumerClient` controls its own lifecycle.
- The `add_receiver` method is no longer used to create receiver clients. Instead, the `EventHubConsumerClient` is used for receiving events.
- In v1, the `receive` method returned a list of `EventData`. You would call this method repeatedly every time you want receive a set of events. In v5, the new `receive` method takes user callback to process events and any resulting errors. This way, you call the method once and it continues to process incoming events until you stop it.
- Additionally, we have a method `receive_batch` which behaves the same as `receive`, but calls the user callback with a batch of events instead of single events.
- The same methods can be used whether you want to receive from a single partition or from all partitions.

In v1:
```python
client = EventHubClient(address)
receiver = client.add_receiver(consumer_group, partition)
client.run()
batch = receiver.receive()
client.stop()
```

In v5:
```python
# Receive
def on_event(partition_context, event):
    print("Received event from partition: {}.".format(partition_context.partition_id))

consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, eventhub_name=eh_name)
with consumer_client:
    consumer_client.receive(on_event=on_even, partition_id=partition_id)

# Receive batch
def on_event_batch(partition_context, event_batch):
    print("Partition {}, Received count: {}".format(partition_context.partition_id, len(event_batch)))

consumer_client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group, eventhub_name=eh_name)
with consumer_client:
    consumer_client.receive_batch(on_event_batch=on_event_batch, partition_id=partition_id)
```
### Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events

In V1, `EventProcessorHost` allowed you to balance the load between multiple instances of
your program when receiving events.

In V5, `EventHubConsumerClient` allows you to do the same with the `receive()` method if you
pass a `CheckpointStore` to the constructor.

So in v1:
```python
import logging
import asyncio
import os
from azure.eventprocessorhost import (
    AbstractEventProcessor,
    AzureStorageCheckpointLeaseManager,
    EventHubConfig,
    EventProcessorHost,
    EPHOptions)
logger = logging.getLogger("azure.eventhub")
class EventProcessor(AbstractEventProcessor):
    def __init__(self, params=None):
        super().__init__(params)
        self._msg_counter = 0
    async def open_async(self, context):
        logger.info("Connection established {}".format(context.partition_id))
    async def close_async(self, context, reason):
        logger.info("Connection closed (reason {}, id {})".format(
            reason,
            context.partition_id))
    async def process_events_async(self, context, messages):
        self._msg_counter += len(messages)
        logger.info("Partition id {}, Events processed {}".format(context.partition_id, self._msg_counter))
        await context.checkpoint_async()
    async def process_error_async(self, context, error):
        logger.error("Event Processor Error {!r}".format(error))
# Storage Account Credentials
STORAGE_ACCOUNT_NAME = os.environ.get('AZURE_STORAGE_ACCOUNT')
STORAGE_KEY = os.environ.get('AZURE_STORAGE_ACCESS_KEY')
LEASE_CONTAINER_NAME = "leases"
NAMESPACE = os.environ.get('EVENT_HUB_NAMESPACE')
EVENTHUB = os.environ.get('EVENT_HUB_NAME')
USER = os.environ.get('EVENT_HUB_SAS_POLICY')
KEY = os.environ.get('EVENT_HUB_SAS_KEY')
# Eventhub config and storage manager
eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$Default")
eh_options = EPHOptions()
eh_options.debug_trace = False
storage_manager = AzureStorageCheckpointLeaseManager(
    STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)
# Event loop and host
loop = asyncio.get_event_loop()
host = EventProcessorHost(
    EventProcessor,
    eh_config,
    storage_manager,
    ep_params=["param1","param2"],
    eph_options=eh_options,
    loop=loop)
try:
    loop.run_until_complete(host.open_async())
finally:
    await host.close_async()
    loop.stop()
```

And in v5:
```python
import asyncio
import os
import logging
from collections import defaultdict
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
logging.basicConfig(level=logging.INFO)
CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name"
logger = logging.getLogger("azure.eventhub")
events_processed = defaultdict(int)
async def on_event(partition_context, event):
    partition_id = partition_context.partition_id
    events_processed[partition_id] += 1
    logger.info("Partition id {}, Events processed {}".format(partition_id, events_processed[partition_id]))
    await partition_context.update_checkpoint(event)
async def on_partition_initialize(context):
    logger.info("Partition {} initialized".format(context.partition_id))
async def on_partition_close(context, reason):
    logger.info("Partition {} has closed, reason {})".format(context.partition_id, reason))
async def on_error(context, error):
    if context:
        logger.error("Partition {} has a partition related error {!r}.".format(context.partition_id, error))
    else:
        logger.error("Receiving event has a non-partition error {!r}".format(error))
async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR,
        consumer_group="$Default",
        checkpoint_store=checkpoint_store,
    )
    async with client:
        await client.receive(
            on_event,
            on_error=on_error,  # optional
            on_partition_initialize=on_partition_initialize,  # optional
            on_partition_close=on_partition_close,  # optional
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
```

> **Note:** V1 checkpoints are not compatible with V5 checkpoints.
If pointed at the same blob, consumption will begin at the first message.
V1 checkpoint json in the respective blobs can be manually converted (per-partition) if needed.
In V1 checkpoints (sequence_number and offset) are stored in the format of json along with ownership information
as the content of the blob, while in V5, checkpoints are kept in the metadata of a blob and the metadata is composed of name-value pairs.
Please check [update_checkpoint](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/eventhub/azure-eventhub-checkpointstoreblob/azure/eventhub/extensions/checkpointstoreblob/_blobstoragecs.py#L231-L250) in V5 for implementation detail.

The following code snippet can be used to migrate checkpoint data from the legacy format. This snippet assumes that the default prefix configuration for the `EventProcessorHost` was used. If a custom prefix was configured, this code will need to be adjusted to account for the difference in format.
```python
import os
import json
from azure.storage.blob import BlobServiceClient, ContainerClient

EVENT_HUB_HOSTNAME = os.environ["EVENT_HUB_HOSTNAME"]  # <your-eventhub-namespace>.servicebus.windows.net
EVENT_HUB_NAME = os.environ["EVENT_HUB_NAME"]
EVENT_HUB_CONSUMER_GROUP = "$Default"  # Name of Event Hub consumer group

STORAGE_CONNECTION_STR = os.environ["AZURE_STORAGE_CONN_STR"]
BLOB_CONTAINER_NAME = "your-blob-container-name"  # Blob container to upload updated checkpoint information to.
LEGACY_BLOB_CONTAINER_NAME = "your-legacy-blob-container-name" # Please make sure the legacy blob container resource exists.


def readLegacyCheckpoints(
    storage_connection_str, legacy_blob_container_name, consumer_group
):
    container_client = ContainerClient.from_connection_string(
        storage_connection_str, legacy_blob_container_name
    )
    legacy_checkpoints = []

    # Read and process legacy checkpoints in blobs in container.
    for blob in container_client.list_blobs():
        blob_client = container_client.get_blob_client(blob)
        stream = blob_client.download_blob()
        for chunk in stream.chunks():
            legacy_checkpoints.append(json.loads(chunk.decode("UTF-8")))
    return legacy_checkpoints


if __name__ == "__main__":
    legacy_checkpoints = readLegacyCheckpoints(
        STORAGE_CONNECTION_STR, LEGACY_BLOB_CONTAINER_NAME, EVENT_HUB_CONSUMER_GROUP
    )

    # The checkpoint blobs require a specific naming scheme to be valid for use with the
    # V5 CheckpointStore.
    prefix = "{}/{}/{}/checkpoint/".format(
        EVENT_HUB_FULLY_QUALIFIED_NAMESPACE.lower(),
        EVENT_HUB_NAME.lower(),
        EVENT_HUB_CONSUMER_GROUP.lower(),
    )

    # Create the storage client to write the migrated checkpoints.  This example
    # assumes that the connection string grants the appropriate permissions to create a
    # container in the storage account.
    blob_service_client = BlobServiceClient.from_connection_string(
        STORAGE_CONNECTION_STR
    )
    container_client = blob_service_client.get_container_client(BLOB_CONTAINER_NAME)
    try:
        # Create container if it doesn't already exist.
        container_client.create_container()
    except:
        pass

    # Translate each legacy checkpoint, storing offset and sequence data into correct
    # blob to align with V5 BlobCheckpointStore.
    for checkpoint in legacy_checkpoints:
        metadata = {
            "offset": str(checkpoint["offset"]),
            "sequencenumber": str(checkpoint["sequence_number"]),
        }
        name = "{}{}".format(prefix, checkpoint["partition_id"])
        container_client.upload_blob(name, data="", metadata=metadata)
```

## Additional samples

More examples can be found at [Samples for azure-eventhub](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub/samples)