1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
|
from abc import abstractmethod
from typing import Any, Optional, Union
from moto.dynamodb.exceptions import (
IncorrectDataType,
IncorrectOperandType,
MockValidationException,
ProvidedKeyDoesNotExist,
)
from moto.dynamodb.models.dynamo_type import (
DDBType,
DDBTypeConversion,
DynamoType,
Item,
)
from moto.dynamodb.parsing.ast_nodes import ( # type: ignore
DDBTypedValue,
ExpressionAttribute,
ExpressionAttributeName,
ExpressionSelector,
Node,
UpdateExpressionAddAction,
UpdateExpressionDeleteAction,
UpdateExpressionPath,
UpdateExpressionRemoveAction,
UpdateExpressionSetAction,
)
from moto.dynamodb.parsing.validators import ExpressionPathResolver
class NodeExecutor:
def __init__(self, ast_node: Node, expression_attribute_names: dict[str, str]):
self.node = ast_node
self.expression_attribute_names = expression_attribute_names
@abstractmethod
def execute(self, item: Item) -> None:
pass
def get_item_part_for_path_nodes(
self, item: Item, path_nodes: list[Node]
) -> Union[DynamoType, dict[str, Any]]:
"""
For a list of path nodes travers the item by following the path_nodes
Args:
item(Item):
path_nodes(list):
Returns:
"""
if len(path_nodes) == 0:
return item.attrs
else:
return ExpressionPathResolver(
self.expression_attribute_names
).resolve_expression_path_nodes_to_dynamo_type(item, path_nodes)
def get_item_before_end_of_path(
self, item: Item
) -> Union[DynamoType, dict[str, Any]]:
"""
Get the part ot the item where the item will perform the action. For most actions this should be the parent. As
that element will need to be modified by the action.
Args:
item(Item):
Returns:
DynamoType or dict: The path to be set
"""
return self.get_item_part_for_path_nodes(
item, self.get_path_expression_nodes()[:-1]
)
def get_item_at_end_of_path(self, item: Item) -> Union[DynamoType, dict[str, Any]]:
"""
For a DELETE the path points at the stringset so we need to evaluate the full path.
Args:
item(Item):
Returns:
DynamoType or dict: The path to be set
"""
return self.get_item_part_for_path_nodes(item, self.get_path_expression_nodes())
# Get the part ot the item where the item will perform the action. For most actions this should be the parent. As
# that element will need to be modified by the action.
get_item_part_in_which_to_perform_action = get_item_before_end_of_path
def get_path_expression_nodes(self) -> list[Node]:
update_expression_path = self.node.children[0]
assert isinstance(update_expression_path, UpdateExpressionPath)
return update_expression_path.children
def get_element_to_action(self) -> Node:
return self.get_path_expression_nodes()[-1]
def get_action_value(self) -> DynamoType:
"""
Returns:
DynamoType: The value to be set
"""
ddb_typed_value = self.node.children[1]
assert isinstance(ddb_typed_value, DDBTypedValue)
dynamo_type_value = ddb_typed_value.children[0]
assert isinstance(dynamo_type_value, DynamoType)
return dynamo_type_value
class SetExecutor(NodeExecutor):
def execute(self, item: Item) -> None:
self.set(
item_part_to_modify_with_set=self.get_item_part_in_which_to_perform_action(
item
),
element_to_set=self.get_element_to_action(),
value_to_set=self.get_action_value(),
expression_attribute_names=self.expression_attribute_names,
)
@classmethod
def set( # type: ignore[misc]
cls,
item_part_to_modify_with_set: Union[DynamoType, dict[str, Any]],
element_to_set: Any,
value_to_set: Any,
expression_attribute_names: dict[str, str],
) -> None:
if isinstance(element_to_set, ExpressionAttribute):
attribute_name = element_to_set.get_attribute_name()
item_part_to_modify_with_set[attribute_name] = value_to_set
elif isinstance(element_to_set, ExpressionSelector):
index = element_to_set.get_index()
item_part_to_modify_with_set[index] = value_to_set
elif isinstance(element_to_set, ExpressionAttributeName):
attribute_name = expression_attribute_names[
element_to_set.get_attribute_name_placeholder()
]
item_part_to_modify_with_set[attribute_name] = value_to_set
else:
raise NotImplementedError(
f"Moto does not support setting {type(element_to_set)} yet"
)
class DeleteExecutor(NodeExecutor):
operator = "operator: DELETE"
def execute(self, item: Item) -> None:
string_set_to_remove = self.get_action_value()
assert isinstance(string_set_to_remove, DynamoType)
if not string_set_to_remove.is_set():
raise IncorrectOperandType(
self.operator,
DDBTypeConversion.get_human_type(string_set_to_remove.type),
)
try:
string_set = self.get_item_at_end_of_path(item)
except ProvidedKeyDoesNotExist:
# Deleting a non-empty key, or from a non-empty set, is not a problem
return
assert isinstance(string_set, DynamoType)
if string_set.type != string_set_to_remove.type:
raise IncorrectDataType()
# String set is currently implemented as a list
string_set_list = string_set.value
stringset_to_remove_list = string_set_to_remove.value
for value in stringset_to_remove_list:
try:
string_set_list.remove(value)
except (KeyError, ValueError):
# DynamoDB does not mind if value is not present
pass
# DynamoDB does not support empty sets. If we've deleted
# the last item in the set, we have to remove the attribute.
if not string_set_list:
element = self.get_element_to_action()
if isinstance(element, ExpressionAttributeName):
attribute_name = self.expression_attribute_names[
element.get_attribute_name_placeholder()
]
elif isinstance(element, ExpressionAttribute):
attribute_name = element.get_attribute_name()
else:
raise NotImplementedError(
f"Moto does not support deleting {type(element)} yet"
)
container = self.get_item_before_end_of_path(item)
del container[attribute_name] # type: ignore[union-attr]
class RemoveExecutor(NodeExecutor):
def execute(self, item: Item) -> None:
element_to_remove = self.get_element_to_action()
if isinstance(element_to_remove, ExpressionAttribute):
attribute_name = element_to_remove.get_attribute_name()
self.get_item_part_in_which_to_perform_action(item).pop(
attribute_name, None
)
elif isinstance(element_to_remove, ExpressionAttributeName):
attribute_name = self.expression_attribute_names[
element_to_remove.get_attribute_name_placeholder()
]
self.get_item_part_in_which_to_perform_action(item).pop(
attribute_name, None
)
elif isinstance(element_to_remove, ExpressionSelector):
index = element_to_remove.get_index()
try:
self.get_item_part_in_which_to_perform_action(item).pop(index)
except IndexError:
# DynamoDB does not care that index is out of bounds, it will just do nothing.
pass
else:
raise NotImplementedError(
f"Moto does not support setting {type(element_to_remove)} yet"
)
class AddExecutor(NodeExecutor):
def execute(self, item: Item) -> None:
value_to_add = self.get_action_value()
if isinstance(value_to_add, DynamoType):
if value_to_add.is_set():
if len(value_to_add.value) == 0:
raise MockValidationException(
"ExpressionAttributeValues contains invalid value: One or more parameter values were invalid: An string set may not be empty"
)
try:
current_string_set = self.get_item_at_end_of_path(item)
except ProvidedKeyDoesNotExist:
current_string_set = DynamoType({value_to_add.type: []})
SetExecutor.set(
item_part_to_modify_with_set=self.get_item_before_end_of_path(
item
),
element_to_set=self.get_element_to_action(),
value_to_set=current_string_set,
expression_attribute_names=self.expression_attribute_names,
)
assert isinstance(current_string_set, DynamoType)
if not current_string_set.type == value_to_add.type:
raise IncorrectDataType()
# Sets are implemented as list
for value in value_to_add.value:
if value in current_string_set.value:
continue
else:
current_string_set.value.append(value)
elif value_to_add.type == DDBType.NUMBER:
try:
existing_value = self.get_item_at_end_of_path(item)
except ProvidedKeyDoesNotExist:
existing_value = DynamoType({DDBType.NUMBER: "0"})
assert isinstance(existing_value, DynamoType)
if not existing_value.type == DDBType.NUMBER:
raise IncorrectDataType()
new_value = existing_value + value_to_add
SetExecutor.set(
item_part_to_modify_with_set=self.get_item_before_end_of_path(item),
element_to_set=self.get_element_to_action(),
value_to_set=new_value,
expression_attribute_names=self.expression_attribute_names,
)
else:
raise IncorrectDataType()
class UpdateExpressionExecutor:
execution_map = {
UpdateExpressionSetAction: SetExecutor,
UpdateExpressionAddAction: AddExecutor,
UpdateExpressionRemoveAction: RemoveExecutor,
UpdateExpressionDeleteAction: DeleteExecutor,
}
def __init__(
self, update_ast: Node, item: Item, expression_attribute_names: dict[str, str]
):
self.update_ast = update_ast
self.item = item
self.expression_attribute_names = expression_attribute_names
def execute(self, node: Optional[Node] = None) -> None:
"""
As explained in moto.dynamodb.parsing.expressions.NestableExpressionParserMixin._create_node the order of nodes
in the AST can be translated of the order of statements in the expression. As such we can start at the root node
and process the nodes 1-by-1. If no specific execution for the node type is defined we can execute the children
in order since it will be a container node that is expandable and left child will be first in the statement.
Note that, if `normalize()` is called before, the list of children will be flattened and sorted (if appropriate).
Args:
node(Node):
Returns:
None
"""
if node is None:
node = self.update_ast
node_executor = self.get_specific_execution(node)
if node_executor is None:
for n in node.children:
self.execute(n)
else:
node_executor(node, self.expression_attribute_names).execute(self.item)
def get_specific_execution(self, node: Node) -> Optional[type[NodeExecutor]]:
for node_class in self.execution_map:
if isinstance(node, node_class):
return self.execution_map[node_class]
return None
|