File: test_storage_blob_partition_manager.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (132 lines) | stat: -rw-r--r-- 5,134 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import pytest
import time
import os
import uuid
import warnings

from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.eventhub.extensions.checkpointstoreblob._vendor.storage.blob import BlobServiceClient


STORAGE_ENV_KEYS = [
    "AZURE_STORAGE_CONN_STR",
    "AZURE_STORAGE_DATA_LAKE_ENABLED_CONN_STR"
]


def get_live_storage_blob_client(conn_str_env_key):
    try:
        storage_connection_str = os.environ[conn_str_env_key]
        container_name = str(uuid.uuid4())
        blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
        blob_service_client.create_container(container_name)
        return storage_connection_str, container_name
    except:
        pytest.skip("Storage blob client can't be created")


def remove_live_storage_blob_client(storage_connection_str, container_name):
    try:
        blob_service_client = BlobServiceClient.from_connection_string(storage_connection_str)
        blob_service_client.delete_container(container_name)
    except:
        warnings.warn(UserWarning("storage container teardown failed"))


def _claim_and_list_ownership(storage_connection_str, container_name):
    fully_qualified_namespace = 'test_namespace'
    eventhub_name = 'eventhub'
    consumer_group = '$default'
    ownership_cnt = 8

    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str, container_name)
    with checkpoint_store:
        ownership_list = checkpoint_store.list_ownership(
            fully_qualified_namespace=fully_qualified_namespace,
            eventhub_name=eventhub_name,
            consumer_group=consumer_group)
        assert len(ownership_list) == 0

        ownership_list = []

        for i in range(ownership_cnt):
            ownership = {}
            ownership['fully_qualified_namespace'] = fully_qualified_namespace
            ownership['eventhub_name'] = eventhub_name
            ownership['consumer_group'] = consumer_group
            ownership['owner_id'] = 'ownerid'
            ownership['partition_id'] = str(i)
            ownership['last_modified_time'] = time.time()
            ownership["offset"] = "1"
            ownership["sequence_number"] = "1"
            ownership_list.append(ownership)

        claimed_ownership = checkpoint_store.claim_ownership(ownership_list)

        for i in range(ownership_cnt):
            assert ownership_list[i]['partition_id'] == str(i)
            assert claimed_ownership[i]['partition_id'] == str(i)
            assert ownership_list[i] != claimed_ownership[i]

        ownership_list = checkpoint_store.list_ownership(
            fully_qualified_namespace=fully_qualified_namespace,
            eventhub_name=eventhub_name,
            consumer_group=consumer_group)
        assert len(ownership_list) == ownership_cnt


def _update_checkpoint(storage_connection_str, container_name):
    fully_qualified_namespace = 'test_namespace'
    eventhub_name = 'eventhub'
    consumer_group = '$default'
    partition_cnt = 8

    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str, container_name)
    with checkpoint_store:
        for i in range(partition_cnt):
            checkpoint = {
                'fully_qualified_namespace': fully_qualified_namespace,
                'eventhub_name': eventhub_name,
                'consumer_group': consumer_group,
                'partition_id': str(i),
                'offset': '2',
                'sequence_number': 20
            }
            checkpoint_store.update_checkpoint(checkpoint)

        checkpoint_list = checkpoint_store.list_checkpoints(
            fully_qualified_namespace=fully_qualified_namespace,
            eventhub_name=eventhub_name,
            consumer_group=consumer_group)
        assert len(checkpoint_list) == partition_cnt
        for checkpoint in checkpoint_list:
            assert checkpoint['offset'] == '2'
            assert checkpoint['sequence_number'] == 20


@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_and_list_ownership(conn_str_env_key):
    storage_connection_str, container_name = get_live_storage_blob_client(conn_str_env_key)
    try:
        _claim_and_list_ownership(storage_connection_str, container_name)
    finally:
        remove_live_storage_blob_client(storage_connection_str, container_name)


@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_update_checkpoint(conn_str_env_key):
    storage_connection_str, container_name = get_live_storage_blob_client(conn_str_env_key)
    try:
        _update_checkpoint(storage_connection_str, container_name)
    finally:
        remove_live_storage_blob_client(storage_connection_str, container_name)