1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
# -*- coding: utf-8 -*-
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import array
from hashlib import md5
import os
import platform
import sys
import threading
PY2 = sys.version_info.major == 2
WIN = platform.system() == 'Windows'
if WIN:
datadir = os.path.join(os.environ['APPDATA'], 'azure-datalake-store')
else:
datadir = os.sep.join([os.path.expanduser("~"), '.config', 'azure-datalake-store'])
try:
os.makedirs(datadir)
except:
pass
def ensure_writable(b):
if PY2 and isinstance(b, array.array):
return b.tostring()
return b
def write_stdout(data):
""" Write bytes or strings to standard output
"""
try:
sys.stdout.buffer.write(data)
except AttributeError:
sys.stdout.write(data.decode('ascii', 'replace'))
def read_block(f, offset, length, delimiter=None):
""" Read a block of bytes from a file
Parameters
----------
fn: file object
a file object that supports seek, tell and read.
offset: int
Byte offset to start read
length: int
Maximum number of bytes to read
delimiter: bytes (optional)
Ensure reading stops at delimiter bytestring
If using the ``delimiter=`` keyword argument we ensure that the read
stops at or before the delimiter boundaries that follow the location
``offset + length``. For ADL, if no delimiter is found and the data
requested is > 4MB an exception is raised, since a single record cannot
exceed 4MB and be guaranteed to land contiguously in ADL.
The bytestring returned WILL include the
terminating delimiter string.
Examples
--------
>>> from io import BytesIO # doctest: +SKIP
>>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
>>> read_block(f, 0, 13) # doctest: +SKIP
b'Alice, 100\\nBo'
>>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
b'Alice, 100\\n'
>>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
b'\\nCharlie, 300'
>>> f = BytesIO(bytearray(2**22)) # doctest: +SKIP
>>> read_block(f,0,2**22, delimiter=b'\\n') # doctest: +SKIP
IndexError: No delimiter found within max record size of 4MB.
Transfer without specifying a delimiter (as binary) instead.
"""
f.seek(offset)
bytes = f.read(length)
if delimiter:
# max record size is 4MB
max_record = 2**22
if length > max_record:
raise IndexError('Records larger than ' + str(max_record) + ' bytes are not supported. The length requested was: ' + str(length) + 'bytes')
# get the last index of the delimiter if it exists
try:
last_delim_index = len(bytes) -1 - bytes[::-1].index(delimiter)
# this ensures the length includes all of the last delimiter (in the event that it is more than one character)
length = last_delim_index + len(delimiter)
return bytes[0:length]
except ValueError:
# TODO: Before delimters can be supported through the ADLUploader logic, the number of chunks being uploaded
# needs to be visible to this method, since it needs to throw if:
# 1. We cannot find a delimiter in <= 4MB of data
# 2. If the remaining size is less than 4MB but there are multiple chunks that need to be stitched together,
# since the delimiter could be split across chunks.
# 3. If delimiters are specified, there must be logic during segment determination that ensures all chunks
# terminate at the end of a record (on a new line), even if that makes the chunk < 256MB.
if length >= max_record:
raise IndexError('No delimiter found within max record size of ' + str(max_record) + ' bytes. Transfer without specifying a delimiter (as binary) instead.')
return bytes
def tokenize(*args, **kwargs):
""" Deterministic token
>>> tokenize('Hello') == tokenize('Hello')
True
"""
if kwargs:
args = args + (kwargs,)
return md5(str(tuple(args)).encode()).hexdigest()
def commonprefix(paths):
""" Find common directory for all paths
Python's ``os.path.commonprefix`` will not return a valid directory path in
some cases, so we wrote this convenience method.
Examples
--------
>>> # os.path.commonprefix returns '/disk1/foo'
>>> commonprefix(['/disk1/foobar', '/disk1/foobaz'])
'/disk1'
>>> commonprefix(['a/b/c', 'a/b/d', 'a/c/d'])
'a'
>>> commonprefix(['a/b/c', 'd/e/f', 'g/h/i'])
''
"""
return os.path.dirname(os.path.commonprefix(paths))
def clamp(n, smallest, largest):
""" Limit a value to a given range
This is equivalent to smallest <= n <= largest.
Examples
--------
>>> clamp(0, 1, 100)
1
>>> clamp(42, 2, 128)
42
>>> clamp(1024, 1, 32)
32
"""
return max(smallest, min(n, largest))
class CountUpDownLatch:
"""CountUpDownLatch provides a thread safe implementation of Up Down latch
"""
def __init__(self):
self.lock = threading.Condition()
self.val = 0
self.total = 0
def increment(self):
self.lock.acquire()
self.val += 1
self.total += 1
self.lock.release()
def decrement(self):
self.lock.acquire()
self.val -= 1
if self.val <= 0:
self.lock.notify_all()
self.lock.release()
def total_processed(self):
self.lock.acquire()
temp = self.total
self.lock.release()
return temp
def is_zero(self):
self.lock.acquire()
while self.val > 0:
self.lock.wait()
self.lock.release()
return True
|