1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
"""
Portable file locking utilities.
Based partially on an example by Jonathan Feignberg in the Python
Cookbook [1] (licensed under the Python Software License) and a ctypes port by
Anatoly Techtonik for Roundup [2] (license [3]).
[1] http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65203
[2] https://sourceforge.net/p/roundup/code/ci/default/tree/roundup/backends/portalocker.py
[3] https://sourceforge.net/p/roundup/code/ci/default/tree/COPYING.txt
Example Usage::
>>> from django.core.files import locks
>>> with open('./file', 'wb') as f:
... locks.lock(f, locks.LOCK_EX)
... f.write('Django')
"""
import os
__all__ = ('LOCK_EX', 'LOCK_SH', 'LOCK_NB', 'lock', 'unlock')
def _fd(f):
"""Get a filedescriptor from something which could be a file or an fd."""
return f.fileno() if hasattr(f, 'fileno') else f
if os.name == 'nt':
import msvcrt
from ctypes import (
POINTER, Structure, Union, byref, c_int64, c_ulong, c_void_p, sizeof,
windll,
)
from ctypes.wintypes import BOOL, DWORD, HANDLE
LOCK_SH = 0 # the default
LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK
# --- Adapted from the pyserial project ---
# detect size of ULONG_PTR
if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p
# --- Union inside Structure by stackoverflow:3480240 ---
class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)]
class _OFFSET_UNION(Union):
_anonymous_ = ['_offset']
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)]
class OVERLAPPED(Structure):
_anonymous_ = ['_offset_union']
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)]
LPOVERLAPPED = POINTER(OVERLAPPED)
# --- Define function prototypes for extra safety ---
LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
def lock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
def unlock(f):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
else:
try:
import fcntl
LOCK_SH = fcntl.LOCK_SH # shared lock
LOCK_NB = fcntl.LOCK_NB # non-blocking
LOCK_EX = fcntl.LOCK_EX
except (ImportError, AttributeError):
# File locking is not supported.
LOCK_EX = LOCK_SH = LOCK_NB = 0
# Dummy functions that don't do anything.
def lock(f, flags):
# File is not locked
return False
def unlock(f):
# File is unlocked
return True
else:
def lock(f, flags):
try:
fcntl.flock(_fd(f), flags)
return True
except BlockingIOError:
return False
def unlock(f):
fcntl.flock(_fd(f), fcntl.LOCK_UN)
return True
|