1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
"""Common helper classes for flow Key-Value parsing."""
import functools
import re
from ovs.flow.decoders import decode_default
class ParseError(RuntimeError):
"""Exception raised when an error occurs during parsing."""
pass
class KeyMetadata(object):
"""Class for keeping key metadata.
Attributes:
kpos (int): The position of the keyword in the parent string.
vpos (int): The position of the value in the parent string.
kstring (string): The keyword string as found in the flow string.
vstring (string): The value as found in the flow string.
delim (string): Optional, the string used as delimiter between the key
and the value.
end_delim (string): Optional, the string used as end delimiter
"""
def __init__(self, kpos, vpos, kstring, vstring, delim="", end_delim=""):
"""Constructor."""
self.kpos = kpos
self.vpos = vpos
self.kstring = kstring
self.vstring = vstring
self.delim = delim
self.end_delim = end_delim
def __str__(self):
return "key: [{},{}), val:[{}, {})".format(
self.kpos,
self.kpos + len(self.kstring),
self.vpos,
self.vpos + len(self.vstring),
)
def __repr__(self):
return "{}('{}')".format(self.__class__.__name__, self)
class KeyValue(object):
"""Class for keeping key-value data.
Attributes:
key (str): The key string.
value (any): The value data.
meta (KeyMetadata): The key metadata.
"""
def __init__(self, key, value, meta=None):
"""Constructor."""
self.key = key
self.value = value
self.meta = meta
def __str__(self):
return "{}: {} ({})".format(self.key, str(self.value), str(self.meta))
def __repr__(self):
return "{}('{}')".format(self.__class__.__name__, self)
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.key == other.key and self.value == other.value
else:
return False
def __ne__(self, other):
return not self.__eq__(other)
class KVDecoders(object):
"""KVDecoders class is used by KVParser to select how to decode the value
of a specific keyword.
A decoder is simply a function that accepts a value string and returns
the value objects to be stored.
The returned value may be of any type.
Decoders may return a KeyValue instance to indicate that the keyword should
also be modified to match the one provided in the returned KeyValue.
The decoder to be used will be selected using the key as an index. If not
found, the default decoder will be used. If free keys are found (i.e:
keys without a value), the default_free decoder will be used. For that
reason, the default_free decoder, must return both the key and value to be
stored.
Globally defined "strict" variable controls what to do when decoders do not
contain a valid decoder for a key and a default function is not provided.
If set to True (default), a ParseError is raised.
If set to False, the value will be decoded as a string.
Args:
decoders (dict): Optional; A dictionary of decoders indexed by keyword.
default (callable): Optional; A function to use if a match is not
found in configured decoders. If not provided, the default behavior
depends on "strict". The function must accept a the key and a value
and return the decoded (key, value) tuple back.
default_free (callable): Optional; The decoder used if a match is not
found in configured decoders and it's a free value (e.g:
a value without a key) Defaults to returning the free value as
keyword and "True" as value.
The callable must accept a string and return a key-value pair.
"""
strict = True
def __init__(self, decoders=None, default=None, default_free=None,
ignore_case=False):
if not decoders:
self._decoders = dict()
elif ignore_case:
self._decoders = {k.lower(): v for k, v in decoders.items()}
else:
self._decoders = decoders
self._default = default
self._default_free = default_free or self._default_free_decoder
self._ignore_case = ignore_case
def decode(self, keyword, value_str):
"""Decode a keyword and value.
Args:
keyword (str): The keyword whose value is to be decoded.
value_str (str): The value string.
Returns:
The key (str) and value(any) to be stored.
"""
decoder = None
if self._ignore_case:
decoder = self._decoders.get(keyword.lower())
else:
decoder = self._decoders.get(keyword)
if decoder:
result = decoder(value_str)
if isinstance(result, KeyValue):
keyword = result.key
value = result.value
else:
value = result
return keyword, value
else:
if value_str:
if self._default:
return self._default(keyword, value_str)
if self.strict:
raise ParseError(
"Cannot parse key {}: No decoder found".format(keyword)
)
return keyword, decode_default(value_str)
return self._default_free(keyword)
@staticmethod
def _default_free_decoder(key):
"""Default decoder for free keywords."""
return key, True
delim_pattern = re.compile(r"(\(|=|:|,|\n|\r|\t)")
parenthesis = re.compile(r"(\(|\))")
end_pattern = re.compile(r"( |,|\n|\r|\t)")
separators = (" ", ",")
end_of_string = (",", "\n", "\t", "\r", "")
class KVParser(object):
"""KVParser parses a string looking for key-value pairs.
Args:
string (str): The string to parse.
decoders (KVDecoders): Optional; the KVDecoders instance to use.
"""
def __init__(self, string, decoders=None):
"""Constructor."""
self._decoders = decoders or KVDecoders()
self._keyval = list()
self._string = string
def keys(self):
return list(kv.key for kv in self._keyval)
def kv(self):
return self._keyval
def __iter__(self):
return iter(self._keyval)
def parse(self):
"""Parse the key-value pairs in string.
The input string is assumed to contain a list of comma (or space)
separated key-value pairs.
Key-values pairs can have multiple different delimiters, eg:
"key1:value1,key2=value2,key3(value3)".
Also, we can stumble upon a "free" keywords, e.g:
"key1=value1,key2=value2,free_keyword".
We consider this as keys without a value.
So, to parse the string we do the following until the end of the
string is found:
1 - Skip any leading comma's or spaces.
2 - Find the next delimiter (or end_of_string character).
3 - Depending on the delimiter, obtain the key and the value.
For instance, if the delimiter is "(", find the next matching
")".
4 - Use the KVDecoders to decode the key-value.
5 - Store the KeyValue object with the corresponding metadata.
Raises:
ParseError if any parsing error occurs.
"""
kpos = 0
while kpos < len(self._string) and self._string[kpos] != "\n":
keyword = ""
delimiter = ""
rest = ""
# 1. Skip separator characters.
if self._string[kpos] in separators:
kpos += 1
continue
# 2. Find the next delimiter or end of string character.
try:
keyword, delimiter, rest = delim_pattern.split(
self._string[kpos:], 1
)
except ValueError:
keyword = self._string[kpos:] # Free keyword
# 3. Extract the value from the rest of the string.
value_str = ""
vpos = kpos + len(keyword) + 1
end_delimiter = ""
if delimiter in ("=", ":"):
# If the delimiter is ':' or '=', the end of the value is the
# end of the string or a ', '.
value_parts = end_pattern.split(rest, 1)
value_str = value_parts[0]
next_kpos = vpos + len(value_str)
elif delimiter == "(":
# Find matching ")".
level = 1
index = 0
value_parts = parenthesis.split(rest)
for val in value_parts:
if val == "(":
level += 1
elif val == ")":
level -= 1
index += len(val)
if level == 0:
break
if level != 0:
raise ParseError(
"Error parsing string {}: "
"Failed to find matching ')' in {}".format(
self._string, rest
)
)
value_str = rest[: index - 1]
next_kpos = vpos + len(value_str) + 1
end_delimiter = ")"
# Exceptionally, if after the () we find -> {}, do not treat
# the content of the parenthesis as the value, consider
# ({})->{} as the string value.
if index < len(rest) - 2 and rest[index : index + 2] == "->":
extra_val = rest[index + 2 :].split(",")[0]
value_str = "({})->{}".format(value_str, extra_val)
# remove the first "(".
vpos -= 1
next_kpos = vpos + len(value_str)
end_delimiter = ""
elif delimiter in end_of_string:
# Key without a value.
next_kpos = kpos + len(keyword)
vpos = -1
# 4. Use KVDecoders to decode the key-value.
try:
key, val = self._decoders.decode(keyword, value_str)
except Exception as e:
raise ParseError(
"Error parsing key-value ({}, {})".format(
keyword, value_str
)
) from e
# Store the KeyValue object with the corresponding metadata.
meta = KeyMetadata(
kpos=kpos,
vpos=vpos,
kstring=keyword,
vstring=value_str,
delim=delimiter,
end_delim=end_delimiter,
)
self._keyval.append(KeyValue(key, val, meta))
kpos = next_kpos
def decode_nested_kv(decoders, value):
"""A key-value decoder that extracts nested key-value pairs and returns
them in a dictionary.
Args:
decoders (KVDecoders): The KVDecoders to use.
value (str): The value string to decode.
"""
if not value:
# Mark as flag
return True
parser = KVParser(value, decoders)
parser.parse()
return {kv.key: kv.value for kv in parser.kv()}
def decode_nested_kv_list(decoders, value):
"""A key-value decoder that extracts nested key-value pairs and returns
them in a list of dictionary.
Args:
decoders (KVDecoders): The KVDecoders to use.
value (str): The value string to decode.
"""
if not value:
# Mark as flag
return True
parser = KVParser(value, decoders)
parser.parse()
return [{kv.key: kv.value} for kv in parser.kv()]
def nested_kv_decoder(decoders=None, is_list=False):
"""Helper function that creates a nested kv decoder with given
KVDecoders."""
if is_list:
return functools.partial(decode_nested_kv_list, decoders)
return functools.partial(decode_nested_kv, decoders)
|