1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
from ipaddress import IPv4Address, IPv6Address, AddressValueError
from .. import errors
from .exceptions import ColumnTypeMismatchException
from .stringcolumn import ByteFixedString
from .intcolumn import UInt32Column
class IPv4Column(UInt32Column):
ch_type = "IPv4"
py_types = (str, IPv4Address, int)
def __init__(self, types_check=False, **kwargs):
# UIntColumn overrides before_write_item and check_item
# in its __init__ when types_check is True so we force
# __init__ without it then add the appropriate check method for IPv4
super(UInt32Column, self).__init__(types_check=False, **kwargs)
self.types_check_enabled = types_check
if types_check:
def check_item(value):
if isinstance(value, int) and value < 0:
raise ColumnTypeMismatchException(value)
if not isinstance(value, IPv4Address):
try:
value = IPv4Address(value)
except AddressValueError:
# Cannot parse input in a valid IPv4
raise ColumnTypeMismatchException(value)
self.check_item = check_item
def after_read_items(self, items, nulls_map=None):
if nulls_map is None:
return tuple(IPv4Address(item) for item in items)
else:
return tuple(
(None if is_null else IPv4Address(items[i]))
for i, is_null in enumerate(nulls_map)
)
def before_write_items(self, items, nulls_map=None):
null_value = self.null_value
for i, item in enumerate(items):
if nulls_map and nulls_map[i]:
items[i] = null_value
continue
# allow Ipv4 in integer, string or IPv4Address object
try:
if isinstance(item, int):
continue
if not isinstance(item, IPv4Address):
item = IPv4Address(item)
items[i] = int(item)
except AddressValueError:
raise errors.CannotParseDomainError(
"Cannot parse IPv4 '{}'".format(item)
)
class IPv6Column(ByteFixedString):
ch_type = "IPv6"
py_types = (str, IPv6Address, bytes)
def __init__(self, types_check=False, **kwargs):
super(IPv6Column, self).__init__(16, types_check=types_check, **kwargs)
if types_check:
def check_item(value):
if isinstance(value, bytes) and len(value) != 16:
raise ColumnTypeMismatchException(value)
if not isinstance(value, IPv6Address):
try:
value = IPv6Address(value)
except AddressValueError:
# Cannot parse input in a valid IPv6
raise ColumnTypeMismatchException(value)
self.check_item = check_item
def after_read_items(self, items, nulls_map=None):
if nulls_map is None:
return tuple(IPv6Address(item) for item in items)
else:
return tuple(
(None if is_null else IPv6Address(items[i]))
for i, is_null in enumerate(nulls_map)
)
def before_write_items(self, items, nulls_map=None):
null_value = self.null_value
for i, item in enumerate(items):
if nulls_map and nulls_map[i]:
items[i] = null_value
continue
# allow Ipv6 in bytes or python IPv6Address
# this is raw bytes (not encoded) in order to fit FixedString(16)
try:
if isinstance(item, bytes):
continue
if not isinstance(item, IPv6Address):
item = IPv6Address(item)
items[i] = item.packed
except AddressValueError:
raise errors.CannotParseDomainError(
"Cannot parse IPv6 '{}'".format(item)
)
|