1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
# Copyright 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from webob import exc
from neutron_lib._i18n import _
from neutron_lib.api import validators
from neutron_lib.exceptions import allowedaddresspairs as exceptions
def _validate_allowed_address_pairs(address_pairs, valid_values=None):
"""Validates a list of allowed address pair dicts.
Validation herein requires the caller to have registered the
max_allowed_address_pair oslo config option in the global CONF prior
to having this validator used.
:param address_pairs: A list of address pair dicts to validate.
:param valid_values: Not used.
:returns: None
:raises: AllowedAddressPairExhausted if the address pairs requested
exceeds cfg.CONF.max_allowed_address_pair. AllowedAddressPairsMissingIP
if any address pair dicts are missing and IP address.
DuplicateAddressPairInRequest if duplicated IPs are in the list of
address pair dicts. Otherwise a HTTPBadRequest is raised if any of
the address pairs are invalid.
"""
unique_check = {}
if not isinstance(address_pairs, list):
raise exc.HTTPBadRequest(
_("Allowed address pairs must be a list."))
if len(address_pairs) > cfg.CONF.max_allowed_address_pair:
raise exceptions.AllowedAddressPairExhausted(
quota=cfg.CONF.max_allowed_address_pair)
for address_pair in address_pairs:
msg = validators.validate_dict(address_pair)
if msg:
return msg
# mac_address is optional, if not set we use the mac on the port
if 'mac_address' in address_pair:
msg = validators.validate_mac_address(address_pair['mac_address'])
if msg:
raise exc.HTTPBadRequest(msg)
if 'ip_address' not in address_pair:
raise exceptions.AllowedAddressPairsMissingIP()
mac = address_pair.get('mac_address')
ip_address = address_pair['ip_address']
if (mac, ip_address) not in unique_check:
unique_check[(mac, ip_address)] = None
else:
raise exceptions.DuplicateAddressPairInRequest(
mac_address=mac, ip_address=ip_address)
invalid_attrs = set(address_pair.keys()) - {'mac_address',
'ip_address'}
if invalid_attrs:
msg = (_("Unrecognized attribute(s) '%s'") %
', '.join(set(address_pair.keys()) -
{'mac_address', 'ip_address'}))
raise exc.HTTPBadRequest(msg)
if '/' in ip_address:
msg = validators.validate_subnet(ip_address)
else:
msg = validators.validate_ip_address(ip_address)
if msg:
raise exc.HTTPBadRequest(msg)
|