1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
import json
import hashlib
import os
import socket
import time
from importlib import import_module
from django.core.cache import caches
from django.core.files.base import ContentFile
from django.utils.encoding import force_str, smart_bytes
from django.utils.functional import SimpleLazyObject
from compressor.conf import settings
from compressor.storage import default_offline_manifest_storage
from compressor.utils import get_mod_func
_cachekey_func = None
def get_hexdigest(plaintext, length=None):
digest = hashlib.sha256(smart_bytes(plaintext)).hexdigest()
if length:
return digest[:length]
return digest
def simple_cachekey(key):
return "django_compressor.%s" % force_str(key)
def socket_cachekey(key):
return "django_compressor.%s.%s" % (socket.gethostname(), force_str(key))
def get_cachekey(*args, **kwargs):
global _cachekey_func
if _cachekey_func is None:
try:
mod_name, func_name = get_mod_func(settings.COMPRESS_CACHE_KEY_FUNCTION)
_cachekey_func = getattr(import_module(mod_name), func_name)
except (AttributeError, ImportError, TypeError) as e:
raise ImportError(
"Couldn't import cache key function %s: %s"
% (settings.COMPRESS_CACHE_KEY_FUNCTION, e)
)
return _cachekey_func(*args, **kwargs)
def get_mtime_cachekey(filename):
return get_cachekey("mtime.%s" % get_hexdigest(filename))
def get_offline_hexdigest(render_template_string):
return get_hexdigest(
# Make the hexdigest determination independent of STATIC_URL
render_template_string.replace(
# Cast ``settings.STATIC_URL`` to a string to allow it to be
# a string-alike object to e.g. add ``SCRIPT_NAME`` WSGI param
# as a *path prefix* to the output URL.
# See https://code.djangoproject.com/ticket/25598.
str(settings.STATIC_URL),
"",
)
)
def get_offline_cachekey(source):
return get_cachekey("offline.%s" % get_offline_hexdigest(source))
_offline_manifest = None
def get_offline_manifest():
global _offline_manifest
if _offline_manifest is None:
filename = settings.COMPRESS_OFFLINE_MANIFEST
if default_offline_manifest_storage.exists(filename):
with default_offline_manifest_storage.open(filename) as fp:
_offline_manifest = json.loads(fp.read().decode("utf8"))
else:
_offline_manifest = {}
return _offline_manifest
def flush_offline_manifest():
global _offline_manifest
_offline_manifest = None
def write_offline_manifest(manifest):
content = json.dumps(manifest, indent=2).encode("utf8")
default_offline_manifest_storage.save(
settings.COMPRESS_OFFLINE_MANIFEST, ContentFile(content)
)
flush_offline_manifest()
def get_templatetag_cachekey(compressor, mode, kind):
return get_cachekey("templatetag.%s.%s.%s" % (compressor.cachekey, mode, kind))
def get_mtime(filename):
if settings.COMPRESS_MTIME_DELAY:
key = get_mtime_cachekey(filename)
mtime = cache.get(key)
if mtime is None:
mtime = os.path.getmtime(filename)
cache.set(key, mtime, settings.COMPRESS_MTIME_DELAY)
return mtime
return os.path.getmtime(filename)
def get_hashed_mtime(filename, length=12):
try:
filename = os.path.realpath(filename)
mtime = str(int(get_mtime(filename)))
except OSError:
return None
return get_hexdigest(mtime, length)
def get_hashed_content(filename, length=12):
try:
filename = os.path.realpath(filename)
except OSError:
return None
# should we make sure that file is utf-8 encoded?
with open(filename, "rb") as file:
return get_hexdigest(file.read(), length)
def get_precompiler_cachekey(command, contents):
return hashlib.sha1(
smart_bytes("precompiler.%s.%s" % (command, contents))
).hexdigest()
def cache_get(key):
packed_val = cache.get(key)
if packed_val is None:
return None
val, refresh_time, refreshed = packed_val
if (time.time() > refresh_time) and not refreshed:
# Store the stale value while the cache
# revalidates for another MINT_DELAY seconds.
cache_set(key, val, refreshed=True, timeout=settings.COMPRESS_MINT_DELAY)
return None
return val
def cache_set(key, val, refreshed=False, timeout=None):
if timeout is None:
timeout = settings.COMPRESS_REBUILD_TIMEOUT
refresh_time = timeout + time.time()
real_timeout = timeout + settings.COMPRESS_MINT_DELAY
packed_val = (val, refresh_time, refreshed)
return cache.set(key, packed_val, real_timeout)
cache = SimpleLazyObject(lambda: caches[settings.COMPRESS_CACHE_BACKEND])
|