1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
"""Base class and common code for :py:mod:`dirq` package.
It is used internally by :py:mod:`dirq` modules and should not
be used elsewhere.
Author
------
Konstantin Skaburskas <konstantin.skaburskas@gmail.com>
License and Copyright
---------------------
ASL 2.0
Copyright (C) CERN 2011-2021
"""
import dirq
import codecs
import errno
import inspect
import os
import random
import re
import sys
import time
from dirq.utils import VALID_STR_TYPES
__author__ = dirq.AUTHOR
__version__ = dirq.VERSION
__date__ = dirq.DATE
__DIRECTORY_REGEXP = '[0-9a-f]{8}'
__ELEMENT_REGEXP = '[0-9a-f]{14}'
_DIRECTORY_REGEXP = re.compile('^(%s)$' % __DIRECTORY_REGEXP)
_ELEMENT_REGEXP = re.compile('^(%s)$' % __ELEMENT_REGEXP)
_DIRELT_REGEXP = re.compile('^%s/%s$' % (__DIRECTORY_REGEXP, __ELEMENT_REGEXP))
WARN = False
def _warn(text):
""" Print a warning. """
if WARN:
sys.stdout.write('%s, at %s line %s\n' %
(text, __name__,
inspect.currentframe().f_back.f_lineno))
sys.stdout.flush()
def _name(rndhex):
"""
Return the name of a new element to (try to) use with:
* 8 hexadecimal digits for the number of seconds since the Epoch
* 5 hexadecimal digits for the microseconds part
* 1 hexadecimal digit from the pid to further reduce name collisions
Properties:
* fixed size (14 hexadecimal digits)
* likely to be unique (with high-probability)
* can be lexically sorted
* ever increasing (for a given process)
* reasonably compact
* matching _ELEMENT_REGEXP
"""
now = time.time()
secs = int(now)
msecs = int((now - secs) * 1000000)
return "%08x%05x%01x" % (secs, msecs, rndhex)
def _directory_contents(path, missingok=True):
"""Get the contents of a directory as a list of names, without . and ..
Raise:
OSError - can't list directory
Note:
* if the optional second argument is true, it is not an error if the
directory does not exist (anymore)
"""
try:
return os.listdir(path)
except Exception:
error = sys.exc_info()[1]
if not missingok and not error.errcode == errno.ENOENT:
raise OSError("cannot listdir(%s): %s" % (path, error))
# RACE: this path does not exist (anymore)
return []
def _wrapped_makedirs(path):
"""Wrapped os.makedirs() used by _special_mkdir()"""
try:
os.makedirs(path)
except OSError:
error = sys.exc_info()[1]
if error.errno == errno.EEXIST and os.path.isdir(path):
return (False, None)
elif error.errno == errno.EISDIR:
return (False, None)
return (False, "cannot mkdir(%s): %s" % (path, error))
else:
return (True, None)
def _special_mkdir(path, umask=None):
"""
Recursively create directories specified in path:
* return true on success
* return false if the directory already exists
* die in case of any other error
Raise:
OSError - can't make directory
"""
if umask is None:
result, error = _wrapped_makedirs(path)
else:
oldumask = os.umask(umask)
result, error = _wrapped_makedirs(path)
os.umask(oldumask)
if error is None:
return result
raise OSError(error)
def _special_rmdir(path):
"""
Delete a directory:
* return true on success
* return false if the path does not exist (anymore)
* die in case of any other error
Raise:
OSError - can't delete given directory
"""
try:
os.rmdir(path)
except Exception:
error = sys.exc_info()[1]
if not error.errno == errno.ENOENT:
raise OSError("cannot rmdir(%s): %s" % (path, error))
# RACE: this path does not exist (anymore)
return False
else:
return True
def _file_read(path, utf8):
"""Read from a file.
Raise:
OSError - problems opening/closing file
IOError - file read error
"""
try:
if utf8:
fileh = codecs.open(path, 'r', "utf8")
else:
fileh = open(path, 'rb')
except Exception:
error = sys.exc_info()[1]
raise OSError("cannot open %s: %s" % (path, error))
try:
data = fileh.read()
except Exception:
error = sys.exc_info()[1]
raise IOError("cannot read %s: %s" % (path, error))
try:
fileh.close()
except Exception:
error = sys.exc_info()[1]
raise OSError("cannot close %s: %s" % (path, error))
return data
def _file_create(path, umask=None, utf8=False):
"""Open a file defined by 'path' and return file handler.
Raises:
OSError - if file exists
"""
if umask is not None:
oldumask = os.umask(umask)
if utf8:
if os.path.exists(path):
ex = OSError("[Errno %i] File exists: %s" % (errno.EEXIST, path))
ex.errno = errno.EEXIST
raise ex
fileh = codecs.open(path, 'w', 'utf8')
else:
fileh = os.fdopen(
os.open(path, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 438), 'wb')
if umask is not None:
os.umask(oldumask)
return fileh
def _file_write(path, utf8, umask, data):
"""Write to a file.
Raise:
OSError - problems opening/closing file
IOError - file write error
"""
fileh = _file_create(path, umask=umask, utf8=utf8)
try:
fileh.write(data)
except Exception:
error = sys.exc_info()[1]
raise IOError("cannot write to %s: %s" % (path, error))
try:
fileh.close()
except Exception:
error = sys.exc_info()[1]
raise OSError("cannot close %s: %s" % (path, error))
class QueueBase(object):
"""QueueBase
"""
def __init__(self, path, umask=None, rndhex=None):
"""
Arguments:
path
the queue toplevel directory
umask
the umask to use when creating files and directories
(default: use the running process' umask)
rndhex
the hexadecimal digit to use in names
(default: randomly chosen)
Raise:
TypeError - wrong input data types provided
OSError - can't create directory structure
"""
self.dirs = []
self.elts = []
self._next_exception = False
if type(path) not in VALID_STR_TYPES:
raise TypeError("'path' should be str or unicode")
self.path = path
if umask is not None and not isinstance(umask, int):
raise TypeError("'umask' should be integer")
self.umask = umask
if rndhex is not None and not isinstance(rndhex, int):
raise TypeError("'rndhex' should be integer")
if rndhex is None:
self.rndhex = random.randint(0, 15)
else:
self.rndhex = rndhex % 16
# create top level directory
_special_mkdir(path, self.umask)
# store the queue unique identifier
if sys.platform in ['win32']:
self.id = self.path
else:
stat = os.stat(self.path)
self.id = '%s:%s' % (stat.st_dev, stat.st_ino)
def __iter__(self):
"""Return iterator over element names.
"""
self._reset()
self._next_exception = True
return self
def names(self):
"""Return iterator over element names.
"""
return self.__iter__()
def copy(self):
"""Copy/clone the object. Return copy of the object.
Note:
* the main purpose is to copy/clone the iterator cached state
* the other structured attributes (including schema) are not cloned
"""
import copy
new = copy.deepcopy(self)
new.dirs = []
new.elts = []
return new
def _reset(self):
"""Regenerate list of intermediate directories. Drop cached
elements list.
Raise:
OSError - can't list directories
"""
self.dirs = []
for name in _directory_contents(self.path):
if _DIRECTORY_REGEXP.match(name):
self.dirs.append(name)
self.dirs.sort()
self.elts = []
def first(self):
"""Return the first element in the queue and cache information about
the next ones.
Raise:
OSError - can't list directories
"""
self._reset()
return self.next()
def _build_elements(self):
""" This should be implemented by sub classes. """
raise NotImplementedError('Implement in sub-class.')
def __next__(self):
"""Return name of the next element in the queue, only using cached
information. When queue is empty, depending on the iterator
protocol - return empty string or raise StopIteration.
Return:
name of the next element in the queue
Raise:
StopIteration - when used as Python iterator via
__iter__() method
"""
if self.elts:
return self.elts.pop(0)
if self._build_elements():
return self.elts.pop(0)
if self._next_exception:
self._next_exception = False
raise StopIteration
else:
return ''
next = __next__
def touch(self, ename):
"""Touch an element directory to indicate that it is still being used.
Note:
this is only really useful for locked elements but we allow it
for all.
Raises:
EnvironmentError - on any IOError, OSError in utime()
NOTE: this may not work on OSes with directories implemented not as
files (eg. Windows). See doc for os.utime().
"""
path = '%s/%s' % (self.path, ename)
try:
os.utime(path, None)
except (IOError, OSError):
error = sys.exc_info()[1]
raise EnvironmentError("cannot utime(%s, None): %s" %
(path, error))
|