1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
from copy import deepcopy
try:
from collections.abc import MutableSequence
except ImportError:
from collections import MutableSequence
from flask import current_app, request
from werkzeug.datastructures import MultiDict, FileStorage
from werkzeug import exceptions
import flask_restful
import decimal
class Namespace(dict):
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self[name] = value
_friendly_location = {
u'json': u'the JSON body',
u'form': u'the post body',
u'args': u'the query string',
u'values': u'the post body or the query string',
u'headers': u'the HTTP headers',
u'cookies': u'the request\'s cookies',
u'files': u'an uploaded file',
}
class Argument(object):
"""
:param name: Either a name or a list of option strings, e.g. foo or
-f, --foo.
:param default: The value produced if the argument is absent from the
request.
:param dest: The name of the attribute to be added to the object
returned by :meth:`~reqparse.RequestParser.parse_args()`.
:param bool required: Whether or not the argument may be omitted (optionals
only).
:param action: The basic type of action to be taken when this argument
is encountered in the request. Valid options are "store" and "append".
:param ignore: Whether to ignore cases where the argument fails type
conversion
:param type: The type to which the request argument should be
converted. If a type raises an exception, the message in the
error will be returned in the response. Defaults to :class:`unicode`
in python2 and :class:`str` in python3.
:param location: The attributes of the :class:`flask.Request` object
to source the arguments from (ex: headers, args, etc.), can be an
iterator. The last item listed takes precedence in the result set.
:param choices: A container of the allowable values for the argument.
:param help: A brief description of the argument, returned in the
response when the argument is invalid. May optionally contain
an "{error_msg}" interpolation token, which will be replaced with
the text of the error raised by the type converter.
:param bool case_sensitive: Whether argument values in the request are
case sensitive or not (this will convert all values to lowercase)
:param bool store_missing: Whether the arguments default value should
be stored if the argument is missing from the request.
:param bool trim: If enabled, trims whitespace around the argument.
:param bool nullable: If enabled, allows null value in argument.
"""
def __init__(self, name, default=None, dest=None, required=False,
ignore=False, type=str, location=('json', 'values',),
choices=(), action='store', help=None, operators=('=',),
case_sensitive=True, store_missing=True, trim=False,
nullable=True):
self.name = name
self.default = default
self.dest = dest
self.required = required
self.ignore = ignore
self.location = location
self.type = type
self.choices = choices
self.action = action
self.help = help
self.case_sensitive = case_sensitive
self.operators = operators
self.store_missing = store_missing
self.trim = trim
self.nullable = nullable
def __str__(self):
if len(self.choices) > 5:
choices = self.choices[0:3]
choices.append('...')
choices.append(self.choices[-1])
else:
choices = self.choices
return 'Name: {0}, type: {1}, choices: {2}'.format(self.name, self.type, choices)
def __repr__(self):
return "{0}('{1}', default={2}, dest={3}, required={4}, ignore={5}, location={6}, " \
"type=\"{7}\", choices={8}, action='{9}', help={10}, case_sensitive={11}, " \
"operators={12}, store_missing={13}, trim={14}, nullable={15})".format(
self.__class__.__name__, self.name, self.default, self.dest, self.required, self.ignore, self.location,
self.type, self.choices, self.action, self.help, self.case_sensitive,
self.operators, self.store_missing, self.trim, self.nullable)
def source(self, request):
"""Pulls values off the request in the provided location
:param request: The flask request object to parse arguments from
"""
if isinstance(self.location, str):
value = getattr(request, self.location, MultiDict())
if callable(value):
value = value()
if value is not None:
return value
else:
values = MultiDict()
for l in self.location:
value = getattr(request, l, None)
if callable(value):
value = value()
if value is not None:
values.update(value)
return values
return MultiDict()
def convert(self, value, op):
# Don't cast None
if value is None:
if self.nullable:
return None
else:
raise ValueError('Must not be null!')
# and check if we're expecting a filestorage and haven't overridden `type`
# (required because the below instantiation isn't valid for FileStorage)
elif isinstance(value, FileStorage) and self.type == FileStorage:
return value
try:
return self.type(value, self.name, op)
except TypeError:
try:
if self.type is decimal.Decimal:
return self.type(str(value))
else:
return self.type(value, self.name)
except TypeError:
return self.type(value)
def handle_validation_error(self, error, bundle_errors):
"""Called when an error is raised while parsing. Aborts the request
with a 400 status and an error message
:param error: the error that was raised
:param bundle_errors: do not abort when first error occurs, return a
dict with the name of the argument and the error message to be
bundled
"""
error_str = str(error)
error_msg = self.help.format(error_msg=error_str) if self.help else error_str
msg = {self.name: error_msg}
if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors:
return error, msg
flask_restful.abort(400, message=msg)
def parse(self, request, bundle_errors=False):
"""Parses argument value(s) from the request, converting according to
the argument's type.
:param request: The flask request object to parse arguments from
:param bundle_errors: Do not abort when first error occurs, return a
dict with the name of the argument and the error message to be
bundled
"""
source = self.source(request)
results = []
# Sentinels
_not_found = False
_found = True
for operator in self.operators:
name = self.name + operator.replace("=", "", 1)
if name in source:
# Account for MultiDict and regular dict
if hasattr(source, "getlist"):
values = source.getlist(name)
else:
values = source.get(name)
if not (isinstance(values, MutableSequence) and self.action == 'append'):
values = [values]
for value in values:
if hasattr(value, "strip") and self.trim:
value = value.strip()
if hasattr(value, "lower") and not self.case_sensitive:
value = value.lower()
if hasattr(self.choices, "__iter__"):
self.choices = [choice.lower()
for choice in self.choices]
try:
value = self.convert(value, operator)
except Exception as error:
if self.ignore:
continue
return self.handle_validation_error(error, bundle_errors)
if self.choices and value not in self.choices:
if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors:
return self.handle_validation_error(
ValueError(u"{0} is not a valid choice".format(
value)), bundle_errors)
self.handle_validation_error(
ValueError(u"{0} is not a valid choice".format(
value)), bundle_errors)
if name in request.unparsed_arguments:
request.unparsed_arguments.pop(name)
results.append(value)
if not results and self.required:
if isinstance(self.location, str):
error_msg = u"Missing required parameter in {0}".format(
_friendly_location.get(self.location, self.location)
)
else:
friendly_locations = [_friendly_location.get(loc, loc)
for loc in self.location]
error_msg = u"Missing required parameter in {0}".format(
' or '.join(friendly_locations)
)
if current_app.config.get("BUNDLE_ERRORS", False) or bundle_errors:
return self.handle_validation_error(ValueError(error_msg), bundle_errors)
self.handle_validation_error(ValueError(error_msg), bundle_errors)
if not results:
if callable(self.default):
return self.default(), _not_found
else:
return self.default, _not_found
if self.action == 'append':
return results, _found
if self.action == 'store' or len(results) == 1:
return results[0], _found
return results, _found
class RequestParser(object):
"""Enables adding and parsing of multiple arguments in the context of a
single request. Ex::
from flask_restful import reqparse
parser = reqparse.RequestParser()
parser.add_argument('foo')
parser.add_argument('int_bar', type=int)
args = parser.parse_args()
:param bool trim: If enabled, trims whitespace on all arguments in this
parser
:param bool bundle_errors: If enabled, do not abort when first error occurs,
return a dict with the name of the argument and the error message to be
bundled and return all validation errors
"""
def __init__(self, argument_class=Argument, namespace_class=Namespace,
trim=False, bundle_errors=False):
self.args = []
self.argument_class = argument_class
self.namespace_class = namespace_class
self.trim = trim
self.bundle_errors = bundle_errors
def add_argument(self, *args, **kwargs):
"""Adds an argument to be parsed.
Accepts either a single instance of Argument or arguments to be passed
into :class:`Argument`'s constructor.
See :class:`Argument`'s constructor for documentation on the
available options.
"""
if len(args) == 1 and isinstance(args[0], self.argument_class):
self.args.append(args[0])
else:
self.args.append(self.argument_class(*args, **kwargs))
# Do not know what other argument classes are out there
if self.trim and self.argument_class is Argument:
# enable trim for appended element
self.args[-1].trim = kwargs.get('trim', self.trim)
return self
def parse_args(self, req=None, strict=False, http_error_code=400):
"""Parse all arguments from the provided request and return the results
as a Namespace
:param req: Can be used to overwrite request from Flask
:param strict: if req includes args not in parser, throw 400 BadRequest exception
:param http_error_code: use custom error code for `flask_restful.abort()`
"""
if req is None:
req = request
namespace = self.namespace_class()
# A record of arguments not yet parsed; as each is found
# among self.args, it will be popped out
req.unparsed_arguments = dict(self.argument_class('').source(req)) if strict else {}
errors = {}
for arg in self.args:
value, found = arg.parse(req, self.bundle_errors)
if isinstance(value, ValueError):
errors.update(found)
found = None
if found or arg.store_missing:
namespace[arg.dest or arg.name] = value
if errors:
flask_restful.abort(http_error_code, message=errors)
if strict and req.unparsed_arguments:
raise exceptions.BadRequest('Unknown arguments: %s'
% ', '.join(req.unparsed_arguments.keys()))
return namespace
def copy(self):
""" Creates a copy of this RequestParser with the same set of arguments """
parser_copy = self.__class__(self.argument_class, self.namespace_class)
parser_copy.args = deepcopy(self.args)
parser_copy.trim = self.trim
parser_copy.bundle_errors = self.bundle_errors
return parser_copy
def replace_argument(self, name, *args, **kwargs):
""" Replace the argument matching the given name with a new version. """
new_arg = self.argument_class(name, *args, **kwargs)
for index, arg in enumerate(self.args[:]):
if new_arg.name == arg.name:
del self.args[index]
self.args.append(new_arg)
break
return self
def remove_argument(self, name):
""" Remove the argument matching the given name. """
for index, arg in enumerate(self.args[:]):
if name == arg.name:
del self.args[index]
break
return self
|