File: utils.py

package info (click to toggle)
django-cacheops 7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 404 kB
  • sloc: python: 3,189; sh: 7; makefile: 4
file content (161 lines) | stat: -rw-r--r-- 5,008 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import re
import json
import inspect
import sys
from funcy import memoize, compose, wraps, any, any_fn, select_values, mapcat

from django.db import models
from django.http import HttpRequest

from .conf import model_profile


def model_family(model):
    """
    The family is models sharing a database table, events on one should affect each other.

    We simply collect a list of all proxy models, including subclasess, superclasses and siblings.
    Two descendants of an abstract model are not family - they cannot affect each other.
    """
    if model._meta.abstract:  # No table - no family
        return set()

    @memoize
    def class_tree(cls):
        # NOTE: we also list multitable submodels here, we just don't care.
        #       Cacheops doesn't support them anyway.
        return {cls} | set(mapcat(class_tree, cls.__subclasses__()))

    table_bases = {b for b in model.__mro__ if issubclass(b, models.Model) and b is not models.Model
                   and not b._meta.proxy and not b._meta.abstract}
    family = set(mapcat(class_tree, table_bases))
    return {cls for cls in family if not cls._meta.abstract}

@memoize
def family_has_profile(cls):
    return any(model_profile, model_family(cls))


class MonkeyProxy(object):
    pass

def monkey_mix(cls, mixin):
    """
    Mixes a mixin into existing class.
    Does not use actual multi-inheritance mixins, just monkey patches methods.
    Mixin methods can call copies of original ones stored in `_no_monkey` proxy:

    class SomeMixin(object):
        def do_smth(self, arg):
            ... do smth else before
            self._no_monkey.do_smth(self, arg)
            ... do smth else after
    """
    assert not hasattr(cls, '_no_monkey'), 'Multiple monkey mix not supported'
    cls._no_monkey = MonkeyProxy()

    test = any_fn(inspect.isfunction, inspect.ismethoddescriptor)
    methods = select_values(test, mixin.__dict__)

    for name, method in methods.items():
        if hasattr(cls, name):
            setattr(cls._no_monkey, name, getattr(cls, name))
        setattr(cls, name, method)


@memoize
def stamp_fields(model):
    """
    Returns serialized description of model fields.
    """
    def _stamp(field):
        name, class_name, *_ = field.deconstruct()
        return name, class_name, field.attname, field.column

    stamp = str(sorted(map(_stamp, model._meta.fields)))
    return md5hex(stamp)


### Cache keys calculation

def obj_key(obj):
    if isinstance(obj, models.Model):
        return '%s.%s.%s' % (obj._meta.app_label, obj._meta.model_name, obj.pk)
    elif hasattr(obj, 'build_absolute_uri'):
        return obj.build_absolute_uri()  # Only vary HttpRequest by uri
    elif inspect.isfunction(obj):
        factors = [obj.__module__, obj.__name__]
        # Really useful to ignore this while code still in development
        if hasattr(obj, '__code__') and not obj.__globals__.get('CACHEOPS_DEBUG'):
            factors.append(obj.__code__.co_firstlineno)
        return factors
    else:
        return str(obj)

def get_cache_key(*factors):
    return md5hex(json.dumps(factors, sort_keys=True, default=obj_key))

def cached_view_fab(_cached):
    def force_render(response):
        if hasattr(response, 'render') and callable(response.render):
            response.render()
        return response

    def cached_view(*dargs, **dkwargs):
        def decorator(func):
            cached_func = _cached(*dargs, **dkwargs)(compose(force_render, func))

            @wraps(func)
            def wrapper(request, *args, **kwargs):
                assert isinstance(request, HttpRequest),                            \
                       "A view should be passed with HttpRequest as first argument"
                if request.method not in ('GET', 'HEAD'):
                    return func(request, *args, **kwargs)

                return cached_func(request, *args, **kwargs)

            if hasattr(cached_func, 'invalidate'):
                wrapper.invalidate = cached_func.invalidate
                wrapper.key = cached_func.key

            return wrapper
        return decorator
    return cached_view


### Whitespace handling for template tags

from django.utils.safestring import mark_safe

NEWLINE_BETWEEN_TAGS = mark_safe('>\n<')
SPACE_BETWEEN_TAGS = mark_safe('> <')

def carefully_strip_whitespace(text):
    def repl(m):
        return NEWLINE_BETWEEN_TAGS if '\n' in m.group(0) else SPACE_BETWEEN_TAGS
    text = re.sub(r'>\s{2,}<', repl, text)
    return text


### hashing helpers

import hashlib


class md5:
    def __init__(self, s=None):
        # set usedforsecurity for FIPS compliance
        kwargs = {'usedforsecurity': False} if sys.version_info >= (3, 9) else {}
        self.md5 = hashlib.md5(**kwargs)
        if s is not None:
            self.update(s)

    def update(self, s):
        return self.md5.update(s.encode('utf-8'))

    def hexdigest(self):
        return self.md5.hexdigest()


def md5hex(s):
    return md5(s).hexdigest()