1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
import copy
from abc import ABC, abstractmethod # pylint: disable=[no-name-in-module]
from typing import Any, List, Literal, Optional, Tuple, Union
from knot_resolver.utils.modeling.base_schema import BaseSchema, map_object
from knot_resolver.utils.modeling.json_pointer import json_ptr_resolve
class PatchError(Exception):
pass
class Op(BaseSchema, ABC):
@abstractmethod
def eval(self, fakeroot: Any) -> Any:
"""
modifies the given fakeroot, returns a new one
"""
def _resolve_ptr(self, fakeroot: Any, ptr: str) -> Tuple[Any, Any, Union[str, int, None]]:
# Lookup tree part based on the given JSON pointer
parent, obj, token = json_ptr_resolve(fakeroot["root"], ptr)
# the lookup was on pure data, wrap the results in QueryTree
if parent is None:
parent = fakeroot
token = "root"
assert token is not None
return parent, obj, token
class AddOp(Op):
op: Literal["add"]
path: str
value: Any
def eval(self, fakeroot: Any) -> Any:
parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
if isinstance(parent, dict):
parent[token] = self.value
elif isinstance(parent, list):
if token == "-":
parent.append(self.value)
else:
assert isinstance(token, int)
parent.insert(token, self.value)
else:
raise AssertionError("never happens")
return fakeroot
class RemoveOp(Op):
op: Literal["remove"]
path: str
def eval(self, fakeroot: Any) -> Any:
parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
del parent[token]
return fakeroot
class ReplaceOp(Op):
op: Literal["replace"]
path: str
value: str
def eval(self, fakeroot: Any) -> Any:
parent, obj, token = self._resolve_ptr(fakeroot, self.path)
if obj is None:
raise PatchError("the value you are trying to replace is null")
parent[token] = self.value
return fakeroot
class MoveOp(Op):
op: Literal["move"]
source: str
path: str
def _source(self, source):
if "from" not in source:
raise ValueError("missing property 'from' in 'move' JSON patch operation")
return str(source["from"])
def eval(self, fakeroot: Any) -> Any:
if self.path.startswith(self.source):
raise PatchError("can't move value into itself")
_parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
newobj = copy.deepcopy(obj)
fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot)
return AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
class CopyOp(Op):
op: Literal["copy"]
source: str
path: str
def _source(self, source):
if "from" not in source:
raise ValueError("missing property 'from' in 'copy' JSON patch operation")
return str(source["from"])
def eval(self, fakeroot: Any) -> Any:
_parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
newobj = copy.deepcopy(obj)
return AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
class TestOp(Op):
op: Literal["test"]
path: str
value: Any
def eval(self, fakeroot: Any) -> Any:
_parent, obj, _token = self._resolve_ptr(fakeroot, self.path)
if obj != self.value:
raise PatchError("test failed")
return fakeroot
def query(
original: Any, method: Literal["get", "delete", "put", "patch"], ptr: str, payload: Any
) -> Tuple[Any, Optional[Any]]:
########################################
# Prepare data we will be working on
# First of all, we consider the original data to be immutable. So we need to make a copy
# in order to freely mutate them
dataroot = copy.deepcopy(original)
# To simplify referencing the root, create a fake root node
fakeroot = {"root": dataroot}
#########################################
# Handle the actual requested operation
# get = return what the path selector picks
if method == "get":
parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
return fakeroot["root"], obj
if method == "delete":
fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot)
return fakeroot["root"], None
if method == "put":
parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
assert parent is not None # we know this due to the fakeroot
if isinstance(parent, list) and token == "-":
parent.append(payload)
else:
parent[token] = payload
return fakeroot["root"], None
if method == "patch":
tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]]
transaction: tp = map_object(tp, payload)
for i, op in enumerate(transaction):
try:
fakeroot = op.eval(fakeroot)
except PatchError as e:
raise ValueError(f"json patch transaction failed on step {i}") from e
return fakeroot["root"], None
raise AssertionError("invalid operation, never happens")
|