File: test_azure_mgmt_eventhub_consumergroup.py

package info (click to toggle)
python-azure 20181112%2Bgit-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 407,300 kB
  • sloc: python: 717,190; makefile: 201; sh: 76
file content (142 lines) | stat: -rw-r--r-- 8,436 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# coding: utf-8

#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import unittest
import time

from msrestazure.azure_exceptions import CloudError
import azure.mgmt.eventhub.models
from azure.mgmt.eventhub.models import EHNamespace
from azure.mgmt.eventhub.models import EHNamespace, Sku, SkuName, AuthorizationRule, AccessRights, AccessKeys, Eventhub, CaptureDescription, Destination, EncodingCaptureDescription, ConsumerGroup, ErrorResponseException, ErrorResponse
from azure.common.credentials import ServicePrincipalCredentials

from devtools_testutils import AzureMgmtTestCase, ResourceGroupPreparer

class MgmtEventHubTest(AzureMgmtTestCase):

    def setUp(self):
        super(MgmtEventHubTest, self).setUp()

        self.eventhub_client = self.create_mgmt_client(
            azure.mgmt.eventhub.EventHubManagementClient
        )

    @ResourceGroupPreparer()
    def test_eh_consumergroup_curd(self, resource_group, location):

        resource_group_name = resource_group.name

        # Create a Namespace
        namespace_name = "pythontestcaseeventhubnamespaceConsumerGroup"

        namespaceparameter = EHNamespace(location=location, tags={'tag1': 'value1', 'tag2': 'value2'}, sku=Sku(name=SkuName.standard))
        poller = self.eventhub_client.namespaces.create_or_update(resource_group_name, namespace_name,
                                                                  namespaceparameter)
        creatednamespace = poller.result()
        self.assertEqual(creatednamespace.name, namespace_name)

        #
        # # Get created Namespace
        #
        getnamespaceresponse = self.eventhub_client.namespaces.get(resource_group_name, namespace_name)
        self.assertEqual(getnamespaceresponse.name, namespace_name)

        # # Create a Eventhub
        eventhub_name = "testingpythontestcaseeventhubConsumerGroup"
        eventhubparameter = Eventhub(
            message_retention_in_days=4,
            partition_count=4,
            capture_description=CaptureDescription(
                enabled=True,
                encoding=EncodingCaptureDescription.avro,
                interval_in_seconds=120,
                size_limit_in_bytes=10485763,
                destination=Destination(
                    name="EventHubArchive.AzureBlockBlob",
                    storage_account_resource_id="/subscriptions/"+self.settings.SUBSCRIPTION_ID+"/resourceGroups/Default-Storage-SouthCentralUS/providers/Microsoft.ClassicStorage/storageAccounts/arjunteststorage",
                    blob_container="container",
                    archive_name_format="{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}")
            )
        )
        createdeventhubresponse = self.eventhub_client.event_hubs.create_or_update(resource_group_name, namespace_name,
                                                                                   eventhub_name, eventhubparameter)
        self.assertEqual(createdeventhubresponse.name,eventhub_name)
        self.assertEqual(createdeventhubresponse.capture_description.interval_in_seconds, 120)

        # Get the created eventhub
        geteventhubresponse = self.eventhub_client.event_hubs.get(resource_group_name, namespace_name, eventhub_name)
        self.assertEqual(geteventhubresponse.name, eventhub_name)
        self.assertEqual(geteventhubresponse.capture_description.interval_in_seconds, 120)
        #Get the List of eventhub by namespaces
        getlistbynamespaceeventhubresponse = list(self.eventhub_client.event_hubs.list_by_namespace(resource_group_name,
                                                                                                    namespace_name))
        self.assertGreater(len(getlistbynamespaceeventhubresponse), 0)

        # # update the Created eventhub
        eventhubupdateparameter = Eventhub(
            message_retention_in_days=6,
            partition_count=4,
            capture_description=CaptureDescription(
                enabled=True,
                encoding=EncodingCaptureDescription.avro,
                interval_in_seconds=130,
                size_limit_in_bytes=10485900,
                destination=Destination(
                    name="EventHubArchive.AzureBlockBlob",
                    storage_account_resource_id="/subscriptions/"+self.settings.SUBSCRIPTION_ID+"/resourceGroups/Default-Storage-SouthCentralUS/providers/Microsoft.ClassicStorage/storageAccounts/arjunteststorage",
                    blob_container="container",
                    archive_name_format="{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}")
            )
        )

        updateeventhubresponse = self.eventhub_client.event_hubs.create_or_update(resource_group_name, namespace_name,
                                                                                   eventhub_name, eventhubupdateparameter)
        self.assertEqual(updateeventhubresponse.name, eventhub_name)
        self.assertEqual(updateeventhubresponse.capture_description.interval_in_seconds, 130)
        self.assertEqual(updateeventhubresponse.message_retention_in_days, 6)
        self.assertEqual(updateeventhubresponse.capture_description.size_limit_in_bytes, 10485900)

        # Create ConsumerGroup
        consumergroup_name = "testingpythontestcaseconsumergroup"
        createconsumergroupresponse = self.eventhub_client.consumer_groups.create_or_update(resource_group_name, namespace_name,
                                                                                  eventhub_name, consumergroup_name,
                                                                                            "Testing the User Metadata")

        self.assertEqual(createconsumergroupresponse.name, consumergroup_name)
        self.assertEqual(createconsumergroupresponse.user_metadata, "Testing the User Metadata")

        # Get the Created Consumer group
        getconsumergroupresponse = self.eventhub_client.consumer_groups.get(resource_group_name,
                                                                                           namespace_name,
                                                                                           eventhub_name,
                                                                                           consumergroup_name)
        # update the Created Consumer group
        updateconsumergroupresponse = self.eventhub_client.consumer_groups.create_or_update(resource_group_name,
                                                                                           namespace_name,
                                                                                           eventhub_name,
                                                                                           consumergroup_name,
                                                                                            "update user matadata for testing")
        self.assertEqual(updateconsumergroupresponse.name, consumergroup_name)
        self.assertEqual(updateconsumergroupresponse.user_metadata, "update user matadata for testing")

        # delete the consumergroup
        deleteconsumergroupresponse = self.eventhub_client.consumer_groups.delete(resource_group_name,
                                                                                            namespace_name,
                                                                                            eventhub_name,
                                                                                            consumergroup_name)
        # Delete the created eventhub
        geteventhubresponse = self.eventhub_client.event_hubs.delete(resource_group_name, namespace_name, eventhub_name)

        # Delete the create namespace
        try:
            deletenamespace = self.eventhub_client.namespaces.delete(resource_group_name, namespace_name).result()
        except CloudError as ErrorResponse:
            self.assertTrue("not found" in ErrorResponse.message)

#------------------------------------------------------------------------------
if __name__ == '__main__':
    unittest.main()