1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import sys
import uuid
from io import BytesIO
from os import path
import pytest
from azure.kusto.ingest import FileDescriptor, BlobDescriptor, StreamDescriptor
class TestDescriptors:
"""Test class for FileDescriptor and BlobDescriptor."""
# this is the size with LF line endings
uncompressed_size = 1569
# this is the size with CRLF line endings
uncompressed_size_2 = 1578
mock_size = 10
INVALID_UUID = "12345"
TEST_UUID_STR = "5bcc12b7-e35c-4c76-a40a-2d89e6c2c7dd"
TEST_UUID = uuid.UUID("5bcc12b7-e35c-4c76-a40a-2d89e6c2c7dd", version=4)
def test_unzipped_file_with_size(self):
"""Tests FileDescriptor with size and unzipped file."""
filePath = path.join(path.dirname(path.abspath(__file__)), "input", "dataset.csv")
descriptor = FileDescriptor(filePath, self.mock_size)
with descriptor.open(True) as stream:
assert descriptor.size == self.mock_size
assert descriptor.stream_name.endswith(".csv.gz")
if sys.version_info[0] >= 3:
assert stream.readable()
assert stream.tell() == 0
assert stream.closed is True
def test_unzipped_file_without_size(self):
"""Tests FileDescriptor without size and unzipped file."""
filePath = path.join(path.dirname(path.abspath(__file__)), "input", "dataset.csv")
descriptor = FileDescriptor(filePath, 0)
with descriptor.open(True) as stream:
# TODO: since we don't know if the file is opened on CRLF system or an LF system, allow both sizes
# a more robust approach would be to open the file and check
assert descriptor.size in (self.uncompressed_size, self.uncompressed_size_2)
assert descriptor.stream_name.endswith(".csv.gz")
if sys.version_info[0] >= 3:
assert stream.readable()
assert stream.tell() == 0
assert stream.closed is True
def test_zipped_file_with_size(self):
"""Tests FileDescriptor with size and zipped file."""
filePath = path.join(path.dirname(path.abspath(__file__)), "input", "dataset.csv.gz")
descriptor = FileDescriptor(filePath, self.mock_size)
with descriptor.open(False) as stream:
assert descriptor.size == self.mock_size
assert descriptor.stream_name.endswith(".csv.gz")
if sys.version_info[0] >= 3:
assert stream.readable()
assert stream.tell() == 0
assert stream.closed is True
def test_gzip_file_without_size(self):
"""Tests FileDescriptor without size and zipped file."""
filePath = path.join(path.dirname(path.abspath(__file__)), "input", "dataset.csv.gz")
descriptor = FileDescriptor(filePath, 0)
with descriptor.open(False) as stream:
assert descriptor.size == self.uncompressed_size
assert descriptor.stream_name.endswith(".csv.gz")
if sys.version_info[0] >= 3:
assert stream.readable()
assert stream.tell() == 0
assert stream.closed is True
def test_zip_file_without_size(self):
"""Tests FileDescriptor without size and zipped file."""
filePath = path.join(path.dirname(path.abspath(__file__)), "input", "dataset.csv.zip")
descriptor = FileDescriptor(filePath, 0)
with descriptor.open(False) as stream:
# the zip archive contains 2 copies of the source file
assert descriptor.size == self.uncompressed_size * 2
assert descriptor.stream_name.endswith(".csv.zip")
if sys.version_info[0] >= 3:
assert stream.readable()
assert stream.tell() == 0
assert stream.closed is True
def test_unzipped_file_dont_compress(self):
"""Tests FileDescriptor with size and unzipped file."""
filePath = path.join(path.dirname(path.abspath(__file__)), "input", "dataset.csv")
descriptor = FileDescriptor(filePath, self.mock_size)
with descriptor.open(False) as stream:
assert descriptor.size == self.mock_size
assert descriptor.stream_name.endswith(".csv")
if sys.version_info[0] >= 3:
assert stream.readable()
assert stream.tell() == 0
assert stream.closed is True
def test_uuid_stream_descriptor(self):
dummy_stream = BytesIO(b"dummy")
descriptor = StreamDescriptor(dummy_stream)
assert descriptor.source_id
assert descriptor.source_id != TestDescriptors.TEST_UUID
assert uuid.UUID(str(descriptor.source_id), version=4)
descriptor = StreamDescriptor(dummy_stream, source_id=TestDescriptors.TEST_UUID_STR)
assert descriptor.source_id == TestDescriptors.TEST_UUID
descriptor = StreamDescriptor(dummy_stream, source_id=TestDescriptors.TEST_UUID)
assert descriptor.source_id == TestDescriptors.TEST_UUID
with pytest.raises(ValueError):
StreamDescriptor(dummy_stream, source_id=TestDescriptors.INVALID_UUID)
def test_uuid_file_descriptor(self):
dummy_file = "dummy"
descriptor = FileDescriptor(dummy_file)
assert descriptor.source_id
assert descriptor.source_id != TestDescriptors.TEST_UUID
assert uuid.UUID(str(descriptor.source_id), version=4)
descriptor = FileDescriptor(dummy_file, source_id=TestDescriptors.TEST_UUID_STR)
assert descriptor.source_id == TestDescriptors.TEST_UUID
descriptor = FileDescriptor(dummy_file, source_id=TestDescriptors.TEST_UUID)
assert descriptor.source_id == TestDescriptors.TEST_UUID
with pytest.raises(ValueError):
FileDescriptor(dummy_file, source_id=TestDescriptors.INVALID_UUID)
def test_uuid_blob_descriptor(self):
dummy_file = "dummy"
descriptor = BlobDescriptor(dummy_file)
assert descriptor.source_id
assert descriptor.source_id != TestDescriptors.TEST_UUID
assert uuid.UUID(str(descriptor.source_id), version=4)
descriptor = BlobDescriptor(dummy_file, source_id=TestDescriptors.TEST_UUID_STR)
assert descriptor.source_id == TestDescriptors.TEST_UUID
descriptor = BlobDescriptor(dummy_file, source_id=TestDescriptors.TEST_UUID)
assert descriptor.source_id == TestDescriptors.TEST_UUID
with pytest.raises(ValueError):
BlobDescriptor(dummy_file, source_id=TestDescriptors.INVALID_UUID)
|