1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
from django.core.files.uploadedfile import UploadedFile
from django.utils.datastructures import MultiValueDict
from .exceptions import NoFileStorageConfigured
class BaseStorage:
step_key = 'step'
step_data_key = 'step_data'
step_files_key = 'step_files'
extra_data_key = 'extra_data'
def __init__(self, prefix, request=None, file_storage=None):
self.prefix = 'wizard_%s' % prefix
self.request = request
self.file_storage = file_storage
self._files = {}
self._tmp_files = []
def init_data(self):
self.data = {
self.step_key: None,
self.step_data_key: {},
self.step_files_key: {},
self.extra_data_key: {},
}
def reset(self):
# Store unused temporary file names in order to delete them
# at the end of the response cycle through a callback attached in
# `update_response`.
wizard_files = self.data[self.step_files_key]
for step_files in wizard_files.values():
for step_file in step_files.values():
self._tmp_files.append(step_file['tmp_name'])
self.init_data()
def _get_current_step(self):
return self.data[self.step_key]
def _set_current_step(self, step):
self.data[self.step_key] = step
@property
def current_step(self):
return self._get_current_step()
@current_step.setter
def current_step(self, value):
return self._set_current_step(value)
def _get_extra_data(self):
return self.data[self.extra_data_key]
def _set_extra_data(self, extra_data):
self.data[self.extra_data_key] = extra_data
@property
def extra_data(self):
return self._get_extra_data()
@extra_data.setter
def extra_data(self, value):
return self._set_extra_data(value)
def get_step_data(self, step):
# When reading the serialized data, upconvert it to a MultiValueDict,
# some serializers (json) don't preserve the type of the object.
values = self.data[self.step_data_key].get(step, None)
if values is not None:
values = MultiValueDict(values)
return values
def set_step_data(self, step, cleaned_data):
# If the value is a MultiValueDict, convert it to a regular dict of the
# underlying contents. Some serializers call the public API on it (as
# opposed to the underlying dict methods), in which case the content
# can be truncated (__getitem__ returns only the first item).
if isinstance(cleaned_data, MultiValueDict):
cleaned_data = dict(cleaned_data.lists())
self.data[self.step_data_key][step] = cleaned_data
@property
def current_step_data(self):
return self.get_step_data(self.current_step)
def get_step_files(self, step):
wizard_files = self.data[self.step_files_key].get(step, {})
if wizard_files and not self.file_storage:
raise NoFileStorageConfigured(
"You need to define 'file_storage' in your "
"wizard view in order to handle file uploads.")
files = {}
for field, field_dict in wizard_files.items():
field_dict = field_dict.copy()
tmp_name = field_dict.pop('tmp_name')
if (step, field) not in self._files:
self._files[(step, field)] = UploadedFile(
file=self.file_storage.open(tmp_name), **field_dict)
files[field] = self._files[(step, field)]
return files or None
def set_step_files(self, step, files):
if files and not self.file_storage:
raise NoFileStorageConfigured(
"You need to define 'file_storage' in your "
"wizard view in order to handle file uploads.")
if step not in self.data[self.step_files_key]:
self.data[self.step_files_key][step] = {}
for field, field_file in (files or {}).items():
tmp_filename = self.file_storage.save(field_file.name, field_file)
file_dict = {
'tmp_name': tmp_filename,
'name': field_file.name,
'content_type': field_file.content_type,
'size': field_file.size,
'charset': field_file.charset
}
self.data[self.step_files_key][step][field] = file_dict
@property
def current_step_files(self):
return self.get_step_files(self.current_step)
def update_response(self, response):
def post_render_callback(response):
for file in self._files.values():
if not file.closed:
file.close()
for tmp_file in self._tmp_files:
self.file_storage.delete(tmp_file)
if hasattr(response, 'render'):
response.add_post_render_callback(post_render_callback)
else:
post_render_callback(response)
|