1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
from django.conf import settings
from django.contrib.messages import constants, utils
from django.utils.functional import SimpleLazyObject
LEVEL_TAGS = SimpleLazyObject(utils.get_level_tags)
class Message:
"""
Represent an actual message that can be stored in any of the supported
storage classes (typically session- or cookie-based) and rendered in a view
or template.
"""
def __init__(self, level, message, extra_tags=None):
self.level = int(level)
self.message = message
self.extra_tags = extra_tags
def _prepare(self):
"""
Prepare the message for serialization by forcing the ``message``
and ``extra_tags`` to str in case they are lazy translations.
"""
self.message = str(self.message)
self.extra_tags = str(self.extra_tags) if self.extra_tags is not None else None
def __eq__(self, other):
if not isinstance(other, Message):
return NotImplemented
return self.level == other.level and self.message == other.message
def __str__(self):
return str(self.message)
def __repr__(self):
extra_tags = f", extra_tags={self.extra_tags!r}" if self.extra_tags else ""
return f"Message(level={self.level}, message={self.message!r}{extra_tags})"
@property
def tags(self):
return " ".join(tag for tag in [self.extra_tags, self.level_tag] if tag)
@property
def level_tag(self):
return LEVEL_TAGS.get(self.level, "")
class BaseStorage:
"""
This is the base backend for temporary message storage.
This is not a complete class; to be a usable storage backend, it must be
subclassed and the two methods ``_get`` and ``_store`` overridden.
"""
def __init__(self, request, *args, **kwargs):
self.request = request
self._queued_messages = []
self.used = False
self.added_new = False
super().__init__(*args, **kwargs)
def __len__(self):
return len(self._loaded_messages) + len(self._queued_messages)
def __iter__(self):
self.used = True
if self._queued_messages:
self._loaded_messages.extend(self._queued_messages)
self._queued_messages = []
return iter(self._loaded_messages)
def __contains__(self, item):
return item in self._loaded_messages or item in self._queued_messages
def __repr__(self):
return f"<{self.__class__.__qualname__}: request={self.request!r}>"
@property
def _loaded_messages(self):
"""
Return a list of loaded messages, retrieving them first if they have
not been loaded yet.
"""
if not hasattr(self, "_loaded_data"):
messages, all_retrieved = self._get()
self._loaded_data = messages or []
return self._loaded_data
def _get(self, *args, **kwargs):
"""
Retrieve a list of stored messages. Return a tuple of the messages
and a flag indicating whether or not all the messages originally
intended to be stored in this storage were, in fact, stored and
retrieved; e.g., ``(messages, all_retrieved)``.
**This method must be implemented by a subclass.**
If it is possible to tell if the backend was not used (as opposed to
just containing no messages) then ``None`` should be returned in
place of ``messages``.
"""
raise NotImplementedError(
"subclasses of BaseStorage must provide a _get() method"
)
def _store(self, messages, response, *args, **kwargs):
"""
Store a list of messages and return a list of any messages which could
not be stored.
One type of object must be able to be stored, ``Message``.
**This method must be implemented by a subclass.**
"""
raise NotImplementedError(
"subclasses of BaseStorage must provide a _store() method"
)
def _prepare_messages(self, messages):
"""
Prepare a list of messages for storage.
"""
for message in messages:
message._prepare()
def update(self, response):
"""
Store all unread messages.
If the backend has yet to be iterated, store previously stored messages
again. Otherwise, only store messages added after the last iteration.
"""
self._prepare_messages(self._queued_messages)
if self.used:
return self._store(self._queued_messages, response)
elif self.added_new:
messages = self._loaded_messages + self._queued_messages
return self._store(messages, response)
def add(self, level, message, extra_tags=""):
"""
Queue a message to be stored.
The message is only queued if it contained something and its level is
not less than the recording level (``self.level``).
"""
if not message:
return
# Check that the message level is not less than the recording level.
level = int(level)
if level < self.level:
return
# Add the message.
self.added_new = True
message = Message(level, message, extra_tags=extra_tags)
self._queued_messages.append(message)
def _get_level(self):
"""
Return the minimum recorded level.
The default level is the ``MESSAGE_LEVEL`` setting. If this is
not found, the ``INFO`` level is used.
"""
if not hasattr(self, "_level"):
self._level = getattr(settings, "MESSAGE_LEVEL", constants.INFO)
return self._level
def _set_level(self, value=None):
"""
Set a custom minimum recorded level.
If set to ``None``, the default level will be used (see the
``_get_level`` method).
"""
if value is None and hasattr(self, "_level"):
del self._level
else:
self._level = int(value)
level = property(_get_level, _set_level, _set_level)
|