1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
import os
from datetime import datetime, timezone
from urllib.parse import urljoin
from django.conf import settings
from django.core.files import File, locks
from django.core.files.move import file_move_safe
from django.core.signals import setting_changed
from django.utils._os import safe_join
from django.utils.deconstruct import deconstructible
from django.utils.encoding import filepath_to_uri
from django.utils.functional import cached_property
from .base import Storage
from .mixins import StorageSettingsMixin
@deconstructible(path="django.core.files.storage.FileSystemStorage")
class FileSystemStorage(Storage, StorageSettingsMixin):
"""
Standard filesystem storage
"""
# The combination of O_CREAT and O_EXCL makes os.open() raise OSError if
# the file already exists before it's opened.
OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0)
def __init__(
self,
location=None,
base_url=None,
file_permissions_mode=None,
directory_permissions_mode=None,
):
self._location = location
self._base_url = base_url
self._file_permissions_mode = file_permissions_mode
self._directory_permissions_mode = directory_permissions_mode
setting_changed.connect(self._clear_cached_properties)
@cached_property
def base_location(self):
return self._value_or_setting(self._location, settings.MEDIA_ROOT)
@cached_property
def location(self):
return os.path.abspath(self.base_location)
@cached_property
def base_url(self):
if self._base_url is not None and not self._base_url.endswith("/"):
self._base_url += "/"
return self._value_or_setting(self._base_url, settings.MEDIA_URL)
@cached_property
def file_permissions_mode(self):
return self._value_or_setting(
self._file_permissions_mode, settings.FILE_UPLOAD_PERMISSIONS
)
@cached_property
def directory_permissions_mode(self):
return self._value_or_setting(
self._directory_permissions_mode, settings.FILE_UPLOAD_DIRECTORY_PERMISSIONS
)
def _open(self, name, mode="rb"):
return File(open(self.path(name), mode))
def _save(self, name, content):
full_path = self.path(name)
# Create any intermediate directories that do not exist.
directory = os.path.dirname(full_path)
try:
if self.directory_permissions_mode is not None:
# Set the umask because os.makedirs() doesn't apply the "mode"
# argument to intermediate-level directories.
old_umask = os.umask(0o777 & ~self.directory_permissions_mode)
try:
os.makedirs(
directory, self.directory_permissions_mode, exist_ok=True
)
finally:
os.umask(old_umask)
else:
os.makedirs(directory, exist_ok=True)
except FileExistsError:
raise FileExistsError("%s exists and is not a directory." % directory)
# There's a potential race condition between get_available_name and
# saving the file; it's possible that two threads might return the
# same name, at which point all sorts of fun happens. So we need to
# try to create the file, but if it already exists we have to go back
# to get_available_name() and try again.
while True:
try:
# This file has a file path that we can move.
if hasattr(content, "temporary_file_path"):
file_move_safe(content.temporary_file_path(), full_path)
# This is a normal uploadedfile that we can stream.
else:
# The current umask value is masked out by os.open!
fd = os.open(full_path, self.OS_OPEN_FLAGS, 0o666)
_file = None
try:
locks.lock(fd, locks.LOCK_EX)
for chunk in content.chunks():
if _file is None:
mode = "wb" if isinstance(chunk, bytes) else "wt"
_file = os.fdopen(fd, mode)
_file.write(chunk)
finally:
locks.unlock(fd)
if _file is not None:
_file.close()
else:
os.close(fd)
except FileExistsError:
# A new name is needed if the file exists.
name = self.get_available_name(name)
full_path = self.path(name)
else:
# OK, the file save worked. Break out of the loop.
break
if self.file_permissions_mode is not None:
os.chmod(full_path, self.file_permissions_mode)
# Ensure the saved path is always relative to the storage root.
name = os.path.relpath(full_path, self.location)
# Ensure the moved file has the same gid as the storage root.
self._ensure_location_group_id(full_path)
# Store filenames with forward slashes, even on Windows.
return str(name).replace("\\", "/")
def _ensure_location_group_id(self, full_path):
if os.name == "posix":
file_gid = os.stat(full_path).st_gid
location_gid = os.stat(self.location).st_gid
if file_gid != location_gid:
try:
os.chown(full_path, uid=-1, gid=location_gid)
except PermissionError:
pass
def delete(self, name):
if not name:
raise ValueError("The name must be given to delete().")
name = self.path(name)
# If the file or directory exists, delete it from the filesystem.
try:
if os.path.isdir(name):
os.rmdir(name)
else:
os.remove(name)
except FileNotFoundError:
# FileNotFoundError is raised if the file or directory was removed
# concurrently.
pass
def exists(self, name):
return os.path.lexists(self.path(name))
def listdir(self, path):
path = self.path(path)
directories, files = [], []
with os.scandir(path) as entries:
for entry in entries:
if entry.is_dir():
directories.append(entry.name)
else:
files.append(entry.name)
return directories, files
def path(self, name):
return safe_join(self.location, name)
def size(self, name):
return os.path.getsize(self.path(name))
def url(self, name):
if self.base_url is None:
raise ValueError("This file is not accessible via a URL.")
url = filepath_to_uri(name)
if url is not None:
url = url.lstrip("/")
return urljoin(self.base_url, url)
def _datetime_from_timestamp(self, ts):
"""
If timezone support is enabled, make an aware datetime object in UTC;
otherwise make a naive one in the local timezone.
"""
tz = timezone.utc if settings.USE_TZ else None
return datetime.fromtimestamp(ts, tz=tz)
def get_accessed_time(self, name):
return self._datetime_from_timestamp(os.path.getatime(self.path(name)))
def get_created_time(self, name):
return self._datetime_from_timestamp(os.path.getctime(self.path(name)))
def get_modified_time(self, name):
return self._datetime_from_timestamp(os.path.getmtime(self.path(name)))
|