1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import pytest
import time
import os
import uuid
import warnings
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.eventhub.extensions.checkpointstoreblob._vendor.storage.blob import BlobServiceClient
STORAGE_ENV_KEYS = [
"AZURE_STORAGE_CONN_STR",
"AZURE_STORAGE_DATA_LAKE_ENABLED_CONN_STR"
]
def get_live_storage_blob_client(conn_str_env_key):
try:
storage_connection_str = os.environ[conn_str_env_key]
container_name = str(uuid.uuid4())
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
blob_service_client.create_container(container_name)
return storage_connection_str, container_name
except:
pytest.skip("Storage blob client can't be created")
def remove_live_storage_blob_client(storage_connection_str, container_name):
try:
blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
blob_service_client.delete_container(container_name)
except:
warnings.warn(UserWarning("storage container teardown failed"))
def _claim_and_list_ownership(storage_connection_str, container_name):
fully_qualified_namespace = 'test_namespace'
eventhub_name = 'eventhub'
consumer_group = '$default'
ownership_cnt = 8
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str, container_name)
with checkpoint_store:
ownership_list = checkpoint_store.list_ownership(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group)
assert len(ownership_list) == 0
ownership_list = []
for i in range(ownership_cnt):
ownership = {}
ownership['fully_qualified_namespace'] = fully_qualified_namespace
ownership['eventhub_name'] = eventhub_name
ownership['consumer_group'] = consumer_group
ownership['owner_id'] = 'ownerid'
ownership['partition_id'] = str(i)
ownership['last_modified_time'] = time.time()
ownership["offset"] = "1"
ownership["sequence_number"] = "1"
ownership_list.append(ownership)
claimed_ownership = checkpoint_store.claim_ownership(ownership_list)
for i in range(ownership_cnt):
assert ownership_list[i]['partition_id'] == str(i)
assert claimed_ownership[i]['partition_id'] == str(i)
assert ownership_list[i] != claimed_ownership[i]
ownership_list = checkpoint_store.list_ownership(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group)
assert len(ownership_list) == ownership_cnt
def _update_checkpoint(storage_connection_str, container_name):
fully_qualified_namespace = 'test_namespace'
eventhub_name = 'eventhub'
consumer_group = '$default'
partition_cnt = 8
checkpoint_store = BlobCheckpointStore.from_connection_string(
storage_connection_str, container_name)
with checkpoint_store:
for i in range(partition_cnt):
checkpoint = {
'fully_qualified_namespace': fully_qualified_namespace,
'eventhub_name': eventhub_name,
'consumer_group': consumer_group,
'partition_id': str(i),
'offset': '2',
'sequence_number': 20
}
checkpoint_store.update_checkpoint(checkpoint)
checkpoint_list = checkpoint_store.list_checkpoints(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group=consumer_group)
assert len(checkpoint_list) == partition_cnt
for checkpoint in checkpoint_list:
assert checkpoint['offset'] == '2'
assert checkpoint['sequence_number'] == 20
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_and_list_ownership(conn_str_env_key):
storage_connection_str, container_name = get_live_storage_blob_client(conn_str_env_key)
try:
_claim_and_list_ownership(storage_connection_str, container_name)
finally:
remove_live_storage_blob_client(storage_connection_str, container_name)
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_update_checkpoint(conn_str_env_key):
storage_connection_str, container_name = get_live_storage_blob_client(conn_str_env_key)
try:
_update_checkpoint(storage_connection_str, container_name)
finally:
remove_live_storage_blob_client(storage_connection_str, container_name)
|