1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
import os
import re
import subprocess
from stat import S_ISLNK
from .helpers import posix_acl_use_stored_uid_gid, user2uid, group2gid, safe_decode, safe_encode
API_VERSION = 3
cdef extern from "sys/types.h":
int ACL_TYPE_ACCESS
int ACL_TYPE_DEFAULT
cdef extern from "sys/acl.h":
ctypedef struct _acl_t:
pass
ctypedef _acl_t *acl_t
int acl_free(void *obj)
acl_t acl_get_file(const char *path, int type)
acl_t acl_set_file(const char *path, int type, acl_t acl)
acl_t acl_from_text(const char *buf)
char *acl_to_text(acl_t acl, ssize_t *len)
cdef extern from "acl/libacl.h":
int acl_extended_file(const char *path)
_comment_re = re.compile(' *#.*', re.M)
def acl_use_local_uid_gid(acl):
"""Replace the user/group field with the local uid/gid if possible
"""
entries = []
for entry in safe_decode(acl).split('\n'):
if entry:
fields = entry.split(':')
if fields[0] == 'user' and fields[1]:
fields[1] = str(user2uid(fields[1], fields[3]))
elif fields[0] == 'group' and fields[1]:
fields[1] = str(group2gid(fields[1], fields[3]))
entries.append(':'.join(fields[:3]))
return safe_encode('\n'.join(entries))
cdef acl_append_numeric_ids(acl):
"""Extend the "POSIX 1003.1e draft standard 17" format with an additional uid/gid field
"""
entries = []
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
if entry:
type, name, permission = entry.split(':')
if name and type == 'user':
entries.append(':'.join([type, name, permission, str(user2uid(name, name))]))
elif name and type == 'group':
entries.append(':'.join([type, name, permission, str(group2gid(name, name))]))
else:
entries.append(entry)
return safe_encode('\n'.join(entries))
cdef acl_numeric_ids(acl):
"""Replace the "POSIX 1003.1e draft standard 17" user/group field with uid/gid
"""
entries = []
for entry in _comment_re.sub('', safe_decode(acl)).split('\n'):
if entry:
type, name, permission = entry.split(':')
if name and type == 'user':
uid = str(user2uid(name, name))
entries.append(':'.join([type, uid, permission, uid]))
elif name and type == 'group':
gid = str(group2gid(name, name))
entries.append(':'.join([type, gid, permission, gid]))
else:
entries.append(entry)
return safe_encode('\n'.join(entries))
def acl_get(path, item, st, numeric_owner=False):
"""Saves ACL Entries
If `numeric_owner` is True the user/group field is not preserved only uid/gid
"""
cdef acl_t default_acl = NULL
cdef acl_t access_acl = NULL
cdef char *default_text = NULL
cdef char *access_text = NULL
p = <bytes>os.fsencode(path)
if S_ISLNK(st.st_mode) or acl_extended_file(p) <= 0:
return
if numeric_owner:
converter = acl_numeric_ids
else:
converter = acl_append_numeric_ids
try:
access_acl = acl_get_file(p, ACL_TYPE_ACCESS)
if access_acl:
access_text = acl_to_text(access_acl, NULL)
if access_text:
item[b'acl_access'] = converter(access_text)
default_acl = acl_get_file(p, ACL_TYPE_DEFAULT)
if default_acl:
default_text = acl_to_text(default_acl, NULL)
if default_text:
item[b'acl_default'] = converter(default_text)
finally:
acl_free(default_text)
acl_free(default_acl)
acl_free(access_text)
acl_free(access_acl)
def acl_set(path, item, numeric_owner=False):
"""Restore ACL Entries
If `numeric_owner` is True the stored uid/gid is used instead
of the user/group names
"""
cdef acl_t access_acl = NULL
cdef acl_t default_acl = NULL
p = <bytes>os.fsencode(path)
if numeric_owner:
converter = posix_acl_use_stored_uid_gid
else:
converter = acl_use_local_uid_gid
access_text = item.get(b'acl_access')
default_text = item.get(b'acl_default')
if access_text:
try:
access_acl = acl_from_text(<bytes>converter(access_text))
if access_acl:
acl_set_file(p, ACL_TYPE_ACCESS, access_acl)
finally:
acl_free(access_acl)
if default_text:
try:
default_acl = acl_from_text(<bytes>converter(default_text))
if default_acl:
acl_set_file(p, ACL_TYPE_DEFAULT, default_acl)
finally:
acl_free(default_acl)
def umount(mountpoint):
return subprocess.call(['fusermount', '-u', mountpoint])
|