1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import time
import asyncio
from collections import defaultdict
from ._test_base import _EventHubProcessorTest
class ProcessEventsBatchTest(_EventHubProcessorTest):
def __init__(self, arguments):
super().__init__(arguments)
self._partition_event_count = defaultdict(int)
def process_events_sync(self, partition_context, events):
try:
if events:
if self.args.processing_delay:
delay_in_seconds = self.args.processing_delay / 1000
if self.args.processing_delay_strategy == "sleep":
time.sleep(delay_in_seconds)
elif self.args.processing_delay_strategy == "spin":
starttime = time.time()
while (time.time() - starttime) < delay_in_seconds:
pass
# Consume properties and body.
_ = [(list(e.body), str(e)) for e in events]
if self.args.checkpoint_interval:
self._partition_event_count[partition_context.partition_id] += len(events)
if self._partition_event_count[partition_context.partition_id] >= self.args.checkpoint_interval:
partition_context.update_checkpoint()
self._partition_event_count[partition_context.partition_id] = 0
for e in events:
self.event_raised_sync()
except Exception as e:
self.error_raised_sync(e)
async def process_events_async(self, partition_context, events):
try:
if events:
if self.args.processing_delay:
delay_in_seconds = self.args.processing_delay / 1000
if self.args.processing_delay_strategy == "sleep":
await asyncio.sleep(delay_in_seconds)
elif self.args.processing_delay_strategy == "spin":
starttime = time.time()
while (time.time() - starttime) < delay_in_seconds:
pass
# Consume properties and body.
_ = [(list(e.body), str(e)) for e in events]
if self.args.checkpoint_interval:
self._partition_event_count[partition_context.partition_id] += len(events)
if self._partition_event_count[partition_context.partition_id] >= self.args.checkpoint_interval:
await partition_context.update_checkpoint()
self._partition_event_count[partition_context.partition_id] = 0
for e in events:
await self.event_raised_async()
except Exception as e:
await self.error_raised_async(e)
def process_error_sync(self, _, error):
print(error)
self.error_raised_sync(error)
async def process_error_async(self, _, error):
print(error)
await self.error_raised_async(error)
def start_events_sync(self) -> None:
"""
Start the process for receiving events.
"""
self.consumer.receive_batch(
on_event_batch=self.process_events_sync,
max_batch_size=self.args.max_batch_size,
on_error=self.process_error_sync,
max_wait_time=self.args.max_wait_time,
prefetch=self.args.prefetch_count,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def start_events_async(self) -> None:
"""
Start the process for receiving events.
"""
await self.async_consumer.receive_batch(
on_event_batch=self.process_events_async,
max_batch_size=self.args.max_batch_size,
on_error=self.process_error_async,
max_wait_time=self.args.max_wait_time,
prefetch=self.args.prefetch_count,
starting_position="-1", # "-1" is from the beginning of the partition.
)
@staticmethod
def add_arguments(parser):
super(ProcessEventsBatchTest, ProcessEventsBatchTest).add_arguments(parser)
parser.add_argument(
"--max-batch-size",
nargs="?",
type=int,
help="Maximum number of events to process in a single batch. Defaults to 100.",
default=100,
)
|