1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835
|
import base64
import datetime
import decimal
import inspect
import logging
import re
import six
import sys
import uuid
import weakref
try:
import ipaddress
except ImportError:
import ipaddr as ipaddress
from wsme import exc
log = logging.getLogger(__name__)
#: The 'str' (python 2) or 'bytes' (python 3) type.
#: Its use should be restricted to
#: pure ascii strings as the protocols will generally not be
#: be able to send non-unicode strings.
#: To transmit binary strings, use the :class:`binary` type
bytes = six.binary_type
#: Unicode string.
text = six.text_type
class ArrayType(object):
def __init__(self, item_type):
if iscomplex(item_type):
self._item_type = weakref.ref(item_type)
else:
self._item_type = item_type
def __hash__(self):
return hash(self.item_type)
def __eq__(self, other):
return isinstance(other, ArrayType) \
and self.item_type == other.item_type
def sample(self):
return [getattr(self.item_type, 'sample', self.item_type)()]
@property
def item_type(self):
if isinstance(self._item_type, weakref.ref):
return self._item_type()
else:
return self._item_type
def validate(self, value):
if value is None:
return
if not isinstance(value, list):
raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
self.item_type, type(value)
))
return [
validate_value(self.item_type, item)
for item in value
]
class DictType(object):
def __init__(self, key_type, value_type):
if key_type not in pod_types:
raise ValueError("Dictionnaries key can only be a pod type")
self.key_type = key_type
if iscomplex(value_type):
self._value_type = weakref.ref(value_type)
else:
self._value_type = value_type
def __hash__(self):
return hash((self.key_type, self.value_type))
def sample(self):
key = getattr(self.key_type, 'sample', self.key_type)()
value = getattr(self.value_type, 'sample', self.value_type)()
return {key: value}
@property
def value_type(self):
if isinstance(self._value_type, weakref.ref):
return self._value_type()
else:
return self._value_type
def validate(self, value):
if not isinstance(value, dict):
raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
self.key_type, self.value_type, type(value)
))
return dict((
(
validate_value(self.key_type, key),
validate_value(self.value_type, v)
) for key, v in value.items()
))
class UserType(object):
basetype = None
name = None
def validate(self, value):
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
def isusertype(class_):
return isinstance(class_, UserType)
class BinaryType(UserType):
"""
A user type that use base64 strings to carry binary data.
"""
basetype = bytes
name = 'binary'
def tobasetype(self, value):
if value is None:
return None
return base64.encodestring(value)
def frombasetype(self, value):
if value is None:
return None
return base64.decodestring(value)
#: The binary almost-native type
binary = BinaryType()
class IntegerType(UserType):
"""
A simple integer type. Can validate a value range.
:param minimum: Possible minimum value
:param maximum: Possible maximum value
Example::
Price = IntegerType(minimum=1)
"""
basetype = int
name = "integer"
def __init__(self, minimum=None, maximum=None):
self.minimum = minimum
self.maximum = maximum
@staticmethod
def frombasetype(value):
return int(value) if value is not None else None
def validate(self, value):
if self.minimum is not None and value < self.minimum:
error = 'Value should be greater or equal to %s' % self.minimum
raise ValueError(error)
if self.maximum is not None and value > self.maximum:
error = 'Value should be lower or equal to %s' % self.maximum
raise ValueError(error)
return value
class StringType(UserType):
"""
A simple string type. Can validate a length and a pattern.
:param min_length: Possible minimum length
:param max_length: Possible maximum length
:param pattern: Possible string pattern
Example::
Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')
"""
basetype = six.string_types
name = "string"
def __init__(self, min_length=None, max_length=None, pattern=None):
self.min_length = min_length
self.max_length = max_length
if isinstance(pattern, six.string_types):
self.pattern = re.compile(pattern)
else:
self.pattern = pattern
def validate(self, value):
if not isinstance(value, self.basetype):
error = 'Value should be string'
raise ValueError(error)
if self.min_length is not None and len(value) < self.min_length:
error = 'Value should have a minimum character requirement of %s' \
% self.min_length
raise ValueError(error)
if self.max_length is not None and len(value) > self.max_length:
error = 'Value should have a maximum character requirement of %s' \
% self.max_length
raise ValueError(error)
if self.pattern is not None and not self.pattern.search(value):
error = 'Value should match the pattern %s' % self.pattern
raise ValueError(error)
return value
class IPv4AddressType(UserType):
"""
A simple IPv4 type.
"""
basetype = six.string_types
name = "ipv4address"
@staticmethod
def validate(value):
try:
ipaddress.IPv4Address(value)
except ipaddress.AddressValueError:
error = 'Value should be IPv4 format'
raise ValueError(error)
class IPv6AddressType(UserType):
"""
A simple IPv6 type.
"""
basetype = six.string_types
name = "ipv6address"
@staticmethod
def validate(value):
try:
ipaddress.IPv6Address(value)
except ipaddress.AddressValueError:
error = 'Value should be IPv6 format'
raise ValueError(error)
class UuidType(UserType):
"""
A simple UUID type.
This type allows not only UUID having dashes but also UUID not
having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce'
and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid.
"""
basetype = six.string_types
name = "uuid"
@staticmethod
def validate(value):
try:
uuid.UUID(value)
except (TypeError, ValueError, AttributeError):
error = 'Value should be UUID format'
raise ValueError(error)
class Enum(UserType):
"""
A simple enumeration type. Can be based on any non-complex type.
:param basetype: The actual data type
:param values: A set of possible values
If nullable, 'None' should be added the values set.
Example::
Gender = Enum(str, 'male', 'female')
Specie = Enum(str, 'cat', 'dog')
"""
def __init__(self, basetype, *values, **kw):
self.basetype = basetype
self.values = set(values)
name = kw.pop('name', None)
if name is None:
name = "Enum(%s)" % ', '.join((six.text_type(v) for v in values))
self.name = name
def validate(self, value):
if value not in self.values:
raise ValueError("Value should be one of: %s" %
', '.join(map(six.text_type, self.values)))
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
class UnsetType(object):
if sys.version < '3':
def __nonzero__(self):
return False
else:
def __bool__(self):
return False
def __repr__(self):
return 'Unset'
Unset = UnsetType()
#: A special type that corresponds to the host framework request object.
#: It can only be used in the function parameters, and if so the request object
#: of the host framework will be passed to the function.
HostRequest = object()
pod_types = six.integer_types + (
bytes, text, float, bool)
dt_types = (datetime.date, datetime.time, datetime.datetime)
extra_types = (binary, decimal.Decimal)
native_types = pod_types + dt_types + extra_types
# The types for which we allow promotion to certain numbers.
_promotable_types = six.integer_types + (text, bytes)
def iscomplex(datatype):
return inspect.isclass(datatype) \
and '_wsme_attributes' in datatype.__dict__
def isarray(datatype):
return isinstance(datatype, ArrayType)
def isdict(datatype):
return isinstance(datatype, DictType)
def validate_value(datatype, value):
if value in (Unset, None):
return value
# Try to promote the data type to one of our complex types.
if isinstance(datatype, list):
datatype = ArrayType(datatype[0])
elif isinstance(datatype, dict):
datatype = DictType(*list(datatype.items())[0])
# If the datatype has its own validator, use that.
if hasattr(datatype, 'validate'):
return datatype.validate(value)
# Do type promotion/conversion and data validation for builtin
# types.
v_type = type(value)
if datatype in six.integer_types:
if v_type in _promotable_types:
try:
# Try to turn the value into an int
value = datatype(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is float and v_type in _promotable_types:
try:
value = float(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is text and isinstance(value, bytes):
value = value.decode()
elif datatype is bytes and isinstance(value, text):
value = value.encode()
if not isinstance(value, datatype):
raise ValueError(
"Wrong type. Expected '%s', got '%s'" % (
datatype, v_type
))
return value
class wsproperty(property):
"""
A specialised :class:`property` to define typed-property on complex types.
Example::
class MyComplexType(wsme.types.Base):
def get_aint(self):
return self._aint
def set_aint(self, value):
assert avalue < 10 # Dummy input validation
self._aint = value
aint = wsproperty(int, get_aint, set_aint, mandatory=True)
"""
def __init__(self, datatype, fget, fset=None,
mandatory=False, doc=None, name=None):
property.__init__(self, fget, fset)
#: The property name in the parent python class
self.key = None
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
#: property data type
self.datatype = datatype
#: True if the property is mandatory
self.mandatory = mandatory
class wsattr(object):
"""
Complex type attribute definition.
Example::
class MyComplexType(wsme.types.Base):
optionalvalue = int
mandatoryvalue = wsattr(int, mandatory=True)
named_value = wsattr(int, name='named.value')
After inspection, the non-wsattr attributes will be replaced, and
the above class will be equivalent to::
class MyComplexType(wsme.types.Base):
optionalvalue = wsattr(int)
mandatoryvalue = wsattr(int, mandatory=True)
"""
def __init__(self, datatype, mandatory=False, name=None, default=Unset,
readonly=False):
#: The attribute name in the parent python class.
#: Set by :func:`inspect_class`
self.key = None # will be set by class inspection
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
self._datatype = (datatype,)
#: True if the attribute is mandatory
self.mandatory = mandatory
#: Default value. The attribute will return this instead
#: of :data:`Unset` if no value has been set.
self.default = default
#: If True value cannot be set from json/xml input data
self.readonly = readonly
self.complextype = None
def _get_dataholder(self, instance):
dataholder = getattr(instance, '_wsme_dataholder', None)
if dataholder is None:
dataholder = instance._wsme_DataHolderClass()
instance._wsme_dataholder = dataholder
return dataholder
def __get__(self, instance, owner):
if instance is None:
return self
return getattr(
self._get_dataholder(instance),
self.key,
self.default
)
def __set__(self, instance, value):
try:
value = validate_value(self.datatype, value)
except ValueError as e:
raise exc.InvalidInput(self.name, value, six.text_type(e))
dataholder = self._get_dataholder(instance)
if value is Unset:
if hasattr(dataholder, self.key):
delattr(dataholder, self.key)
else:
setattr(dataholder, self.key, value)
def __delete__(self, instance):
self.__set__(instance, Unset)
def _get_datatype(self):
if isinstance(self._datatype, tuple):
self._datatype = \
self.complextype().__registry__.resolve_type(self._datatype[0])
if isinstance(self._datatype, weakref.ref):
return self._datatype()
if isinstance(self._datatype, list):
return [
item() if isinstance(item, weakref.ref) else item
for item in self._datatype
]
return self._datatype
def _set_datatype(self, datatype):
self._datatype = datatype
#: attribute data type. Can be either an actual type,
#: or a type name, in which case the actual type will be
#: determined when needed (generally just before scanning the api).
datatype = property(_get_datatype, _set_datatype)
def iswsattr(attr):
if inspect.isfunction(attr) or inspect.ismethod(attr):
return False
if isinstance(attr, property) and not isinstance(attr, wsproperty):
return False
return True
def sort_attributes(class_, attributes):
"""Sort a class attributes list.
3 mechanisms are attempted :
#. Look for a _wsme_attr_order attribute on the class_. This allow
to define an arbitrary order of the attributes (useful for
generated types).
#. Access the object source code to find the declaration order.
#. Sort by alphabetically"""
if not len(attributes):
return
attrs = dict((a.key, a) for a in attributes)
if hasattr(class_, '_wsme_attr_order'):
names_order = class_._wsme_attr_order
else:
names = attrs.keys()
names_order = []
try:
lines = []
for cls in inspect.getmro(class_):
if cls is object:
continue
lines[len(lines):] = inspect.getsourcelines(cls)[0]
for line in lines:
line = line.strip().replace(" ", "")
if '=' in line:
aname = line[:line.index('=')]
if aname in names and aname not in names_order:
names_order.append(aname)
if len(names_order) < len(names):
names_order.extend((
name for name in names if name not in names_order))
assert len(names_order) == len(names)
except (TypeError, IOError):
names_order = list(names)
names_order.sort()
attributes[:] = [attrs[name] for name in names_order]
def inspect_class(class_):
"""Extract a list of (name, wsattr|wsproperty) for the given class_"""
attributes = []
for name, attr in inspect.getmembers(class_, iswsattr):
if name.startswith('_'):
continue
if inspect.isroutine(attr):
continue
if isinstance(attr, (wsattr, wsproperty)):
attrdef = attr
else:
if attr not in native_types and (
inspect.isclass(attr)
or isinstance(attr, (list, dict))):
register_type(attr)
attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
attrdef.key = name
if attrdef.name is None:
attrdef.name = name
attrdef.complextype = weakref.ref(class_)
attributes.append(attrdef)
setattr(class_, name, attrdef)
sort_attributes(class_, attributes)
return attributes
def list_attributes(class_):
"""
Returns a list of a complex type attributes.
"""
if not iscomplex(class_):
raise TypeError("%s is not a registered type")
return class_._wsme_attributes
def make_dataholder(class_):
# the slots are computed outside the class scope to avoid
# 'attr' to pullute the class namespace, which leads to weird
# things if one of the slots is named 'attr'.
slots = [attr.key for attr in class_._wsme_attributes]
class DataHolder(object):
__slots__ = slots
DataHolder.__name__ = class_.__name__ + 'DataHolder'
return DataHolder
class Registry(object):
def __init__(self):
self._complex_types = []
self.array_types = set()
self.dict_types = set()
@property
def complex_types(self):
return [t() for t in self._complex_types if t()]
def register(self, class_):
"""
Make sure a type is registered.
It is automatically called by :class:`expose() <wsme.expose>`
and :class:`validate() <wsme.validate>`.
Unless you want to control when the class inspection is done there
is no need to call it.
"""
if class_ is None or \
class_ in native_types or \
isusertype(class_) or iscomplex(class_) or \
isarray(class_) or isdict(class_):
return class_
if isinstance(class_, list):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = ArrayType(class_[0])
self.register(dt.item_type)
self.array_types.add(dt)
return dt
if isinstance(class_, dict):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = DictType(*list(class_.items())[0])
self.register(dt.value_type)
self.dict_types.add(dt)
return dt
class_._wsme_attributes = None
class_._wsme_attributes = inspect_class(class_)
class_._wsme_DataHolderClass = make_dataholder(class_)
class_.__registry__ = self
self._complex_types.append(weakref.ref(class_))
return class_
def reregister(self, class_):
"""Register a type which may already have been registered.
"""
self._unregister(class_)
return self.register(class_)
def _unregister(self, class_):
"""Remove a previously registered type.
"""
# Clear the existing attribute reference so it is rebuilt if
# the class is registered again later.
if hasattr(class_, '_wsme_attributes'):
del class_._wsme_attributes
# FIXME(dhellmann): This method does not recurse through the
# types like register() does. Should it?
if isinstance(class_, list):
at = ArrayType(class_[0])
try:
self.array_types.remove(at)
except KeyError:
pass
elif isinstance(class_, dict):
key_type, value_type = list(class_.items())[0]
self.dict_types = set(
dt for dt in self.dict_types
if (dt.key_type, dt.value_type) != (key_type, value_type)
)
# We can't use remove() here because the items in
# _complex_types are weakref objects pointing to the classes,
# so we can't compare with them directly.
self._complex_types = [
ct for ct in self._complex_types
if ct() is not class_
]
def lookup(self, typename):
log.debug('Lookup %s' % typename)
modname = None
if '.' in typename:
modname, typename = typename.rsplit('.', 1)
for ct in self._complex_types:
ct = ct()
if ct is not None and typename == ct.__name__ and (
modname is None or modname == ct.__module__):
return ct
def resolve_type(self, type_):
if isinstance(type_, six.string_types):
return self.lookup(type_)
if isinstance(type_, list):
type_ = ArrayType(type_[0])
if isinstance(type_, dict):
type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
if isinstance(type_, ArrayType):
type_ = ArrayType(self.resolve_type(type_.item_type))
self.array_types.add(type_)
elif isinstance(type_, DictType):
type_ = DictType(
type_.key_type,
self.resolve_type(type_.value_type)
)
self.dict_types.add(type_)
else:
type_ = self.register(type_)
return type_
# Default type registry
registry = Registry()
def register_type(class_):
return registry.register(class_)
class BaseMeta(type):
def __new__(cls, name, bases, dct):
if bases and bases[0] is not object and '__registry__' not in dct:
dct['__registry__'] = registry
return type.__new__(cls, name, bases, dct)
def __init__(cls, name, bases, dct):
if bases and bases[0] is not object:
cls.__registry__.register(cls)
class Base(six.with_metaclass(BaseMeta)):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)
class File(Base):
"""A complex type that represents a file.
In the particular case of protocol accepting form encoded data as
input, File can be loaded from a form file field.
"""
#: The file name
filename = wsattr(text)
#: Mime type of the content
contenttype = wsattr(text)
def _get_content(self):
if self._content is None and self._file:
self._content = self._file.read()
return self._content
def _set_content(self, value):
self._content = value
self._file = None
#: File content
content = wsproperty(binary, _get_content, _set_content)
def __init__(self, filename=None, file=None, content=None,
contenttype=None, fieldstorage=None):
self.filename = filename
self.contenttype = contenttype
self._file = file
self._content = content
if fieldstorage is not None:
if fieldstorage.file:
self._file = fieldstorage.file
self.filename = fieldstorage.filename
self.contenttype = text(fieldstorage.type)
else:
self._content = fieldstorage.value
@property
def file(self):
if self._file is None and self._content:
self._file = six.BytesIO(self._content)
return self._file
class DynamicBase(Base):
"""Base type for complex types for which all attributes are not
defined when the class is constructed.
This class is meant to be used as a base for types that have
properties added after the main class is created, such as by
loading plugins.
"""
@classmethod
def add_attributes(cls, **attrs):
"""Add more attributes
The arguments should be valid Python attribute names
associated with a type for the new attribute.
"""
for n, t in attrs.items():
setattr(cls, n, t)
cls.__registry__.reregister(cls)
|