File: test_avro_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (171 lines) | stat: -rw-r--r-- 6,722 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

# coding: utf-8
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import asyncio
import inspect
import os
from io import BytesIO

import pytest
import unittest
from azure.storage.blob._shared.avro.avro_io_async import AsyncDatumReader
from azure.storage.blob._shared.avro.datafile_async import AsyncDataFileReader

from .test_avro import SCHEMAS_TO_VALIDATE

CODECS_TO_VALIDATE = ['null']

CHANGE_FEED_RECORD = {
    'data': {
        'api': 'PutBlob',
        'blobPropertiesUpdated': None,
        'blobType': 'BlockBlob',
        'clientRequestId': '75b6c460-fcd0-11e9-87e2-85def057dae9',
        'contentLength': 12,
        'contentType': 'text/plain',
        'etag': '0x8D75EF45A3B8617',
        'previousInfo': None,
        'requestId': 'bb219c8e-401e-0028-1fdd-90f393000000',
        'sequencer': '000000000000000000000000000017140000000000000fcc',
        'snapshot': None,
        'storageDiagnostics': {'bid': 'd3053fa1-a006-0042-00dd-902bbb000000',
                               'seq': '(5908,134,4044,0)',
                               'sid': '5aaf98bf-f1d8-dd76-2dd2-9b60c689538d'},
        'url': ''},
    'eventTime': '2019-11-01T17:53:07.5106080Z',
    'eventType': 'BlobCreated',
    'id': 'bb219c8e-401e-0028-1fdd-90f393069ae4',
    'schemaVersion': 3,
    'subject': '/blobServices/default/containers/test/blobs/sdf.txt',
    'topic': '/subscriptions/ba45b233-e2ef-4169-8808-49eb0d8eba0d/resourceGroups/XClient/providers/Microsoft.Storage/storageAccounts/seanchangefeedstage'}


class AsyncBufferedReaderWrapper:
    def __init__(self, reader):
        self._reader = reader

    async def seek(self, offset, whence=0):
        self._reader.seek(offset, whence)

    async def read(self, size=None):
        return self._reader.read(size)

    def close(self):
        self._reader.close()


class AvroReaderTestsAsync(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        test_file_path = inspect.getfile(cls)
        cls._samples_dir_root = os.path.join(os.path.dirname(test_file_path), 'samples')

    @pytest.mark.asyncio
    async def _test_reader(self):
        correct = 0
        nitems = 10
        for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
            for codec in CODECS_TO_VALIDATE:
                file_path = os.path.join(AvroReaderTestsAsync._samples_dir_root, 'test_' + codec + '_' + str(iexample) + '.avro')
                with open(file_path, 'rb') as reader:
                    datum_reader = AsyncDatumReader()
                    async_reader = AsyncBufferedReaderWrapper(reader)
                    async with await AsyncDataFileReader(async_reader, datum_reader).init() as dfr:
                        round_trip_data = []
                        async for x in dfr:
                            round_trip_data.append(x)
                        if ([datum] * nitems) == round_trip_data:
                            correct += 1
        self.assertEqual(
            correct,
            len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))

    def test_reader(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._test_reader())

    @pytest.mark.asyncio
    async def _test_change_feed(self):
        file_path = os.path.join(AvroReaderTestsAsync._samples_dir_root, 'changeFeed.avro')
        with open(file_path, 'rb') as reader:
            datum_reader = AsyncDatumReader()
            async_reader = AsyncBufferedReaderWrapper(reader)
            async with await AsyncDataFileReader(async_reader, datum_reader).init() as dfr:
                data = []
                async for x in dfr:
                    data.append(x)
                self.assertEqual(1, len(data))
                expected_record = CHANGE_FEED_RECORD
                self.assertEqual(expected_record, data[0])

    def test_change_feed(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._test_change_feed())

    @pytest.mark.asyncio
    async def _test_with_hearder_reader(self):
        # Note: only when the data stream doesn't have header, we need header stream to help
        file_path = os.path.join(AvroReaderTestsAsync._samples_dir_root, 'changeFeed.avro')
        # this data stream has header
        full_data_stream = _HeaderStream()
        with open(file_path, 'rb') as reader:
            full_data = reader.read()
            await full_data_stream.write(full_data)
        # This initialization helps find the position after the first sync_marker
        async with await AsyncDataFileReader(full_data_stream, AsyncDatumReader()).init():
            position_after_sync_marker = await full_data_stream.tell()

        # construct the partial data stream which doesn't have header
        partial_data_stream = _HeaderStream()
        with open(file_path, 'rb') as reader:
            reader.seek(position_after_sync_marker)
            await partial_data_stream.write(reader.read())

        header_stream = _HeaderStream()
        with open(file_path, 'rb') as reader:
            header_data = reader.read()
            await header_stream.write(header_data)

        records = []
        df_reader = AsyncDataFileReader(partial_data_stream, AsyncDatumReader(), header_reader=header_stream)
        df_reader = await df_reader.init()
        async for record in df_reader:
            records.append(record)
        self.assertEqual(CHANGE_FEED_RECORD, records[0])
        self.assertIsNot(partial_data_stream.object_position, 0)

    def test_with_hearder_reader(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._test_with_hearder_reader())

class _HeaderStream(object):
    def __init__(self):
        self._bytes_stream = BytesIO()
        self.object_position = 0
        self.block_count = 0
        self.event_index = 0

    async def seek(self, *args, **kwargs):
        return self._bytes_stream.seek(*args, **kwargs)

    async def read(self, *args, **kwargs):
        return self._bytes_stream.read(*args, **kwargs)

    async def write(self, *args, **kwargs):
        return self._bytes_stream.write(*args, **kwargs)

    async def tell(self, *args, **kwargs):
        return self._bytes_stream.tell(*args, **kwargs)

    async def track_object_position(self):
        self.object_position = self.tell()

    async def set_object_index(self, event_index):
        self.event_index = event_index

    async def close(self):
        self._bytes_stream.close()