File: test_transfer.py

package info (click to toggle)
azure-data-lake-store-python 1.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 31,952 kB
  • sloc: python: 4,332; makefile: 192
file content (117 lines) | stat: -rw-r--r-- 3,968 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding: utf-8 -*-
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import os
import time

from azure.datalake.store.core import AzureDLPath
from azure.datalake.store.transfer import ADLTransferClient
from tests.testing import azure, posix


def test_shutdown(azure):
    def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, retries=5, shutdown_event=None):
        while shutdown_event and not shutdown_event.is_set():
            time.sleep(0.1)
        return size, None

    client = ADLTransferClient(azure, transfer=transfer, chunksize=1,
                               chunked=False)
    client.submit('foo', 'bar', 16)
    client.run(monitor=False)
    client.shutdown()

    assert client.progress[0].state == 'finished'


def test_submit_and_run(azure):
    def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_event=None):
        return size, None

    client = ADLTransferClient(azure, transfer=transfer, chunksize=8,
                               chunked=False)

    client.submit('foo', 'bar', 16)
    client.submit('abc', '123', 8)

    nfiles = len(client.progress)
    assert nfiles == 2
    assert len([client.progress[i].chunks for i in range(nfiles)])

    assert all([client.progress[i].state == 'pending' for i in range(nfiles)])
    assert all([chunk.state == 'pending' for f in client.progress
                                         for chunk in f.chunks])

    expected = {('bar', 0), ('bar', 8), ('123', 0)}
    assert {(chunk.name, chunk.offset) for f in client.progress
                                       for chunk in f.chunks} == expected

    client.run()

    assert all([client.progress[i].state == 'finished' for i in range(nfiles)])
    assert all([chunk.state == 'finished' for f in client.progress
                                          for chunk in f.chunks])
    assert all([chunk.expected == chunk.actual for f in client.progress
                                               for chunk in f.chunks])


def test_update_progress(azure):
    """
    Upload a 32 bytes file in chunks of 8 and test that the progress is incrementally
    updated.
    """
    calls = []

    def recording_callback(progress, total):
        calls.append((progress, total))

    def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_event=None):
        return size, None

    client = ADLTransferClient(azure, transfer=transfer, chunksize=8,
                               chunked=True, progress_callback=recording_callback)

    client.submit('foo', AzureDLPath('bar'), 32)
    client.run()

    assert calls == [(8, 32), (16, 32), (24, 32), (32, 32)]


def test_merge(azure):

    calls = []

    def merge(adlfs, outfile, files, shutdown_event=None, overwrite=False):
        calls.append(files)

    def transfer(adlfs, src, dst, offset, size, blocksize, buffersize, shutdown_event=None):
        return size, None

    class XLoaderMock(object):
        _overwrite = False

    file_size = 32
    chunk_size = 8
    client = ADLTransferClient(azure, parent=XLoaderMock(), transfer=transfer, merge=merge,
                               chunksize=chunk_size, chunked=True)

    client.submit('foo', AzureDLPath('bar'), file_size)
    client.run()

    assert len(calls[0]) == file_size / chunk_size


def test_temporary_path(azure):
    def transfer(adlfs, src, dst, offset, size, blocksize, buffersize):
        return size, None

    client = ADLTransferClient(azure, transfer=transfer, chunksize=8,
                               unique_temporary=False)
    client.submit('foo', AzureDLPath('bar'), 16)

    assert os.path.dirname(posix(client.progress[0].chunks[0].name)) == 'bar.segments'