1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
# This file is part of Tryton. The COPYRIGHT file at the toplevel of this
# repository contains the full copyright notices and license terms.
import json
from functools import partial
from sql import Cast, CombiningQuery, Literal, Null, Select, operators
from trytond import backend
from trytond.pool import Pool
from trytond.protocols.jsonrpc import JSONDecoder, JSONEncoder
from trytond.tools import grouped_slice
from trytond.tools.immutabledict import ImmutableDict
from trytond.transaction import Transaction
from .field import SQL_OPERATORS, Field, domain_method, order_method
# Use canonical form
dumps = partial(
json.dumps, cls=JSONEncoder, separators=(',', ':'), sort_keys=True,
ensure_ascii=False)
class Dict(Field):
'Define dict field.'
_type = 'dict'
_sql_type = 'JSON'
_py_type = dict
def __init__(self, schema_model, string='', help='', required=False,
readonly=False, domain=None, states=None,
on_change=None, on_change_with=None, depends=None,
context=None, loading='lazy'):
super(Dict, self).__init__(string, help, required, readonly, domain,
states, on_change, on_change_with, depends, context, loading)
self.schema_model = schema_model
self.search_unaccented = True
def get(self, ids, model, name, values=None):
dicts = dict((id, None) for id in ids)
for value in values or []:
data = value[name]
if data:
# If stored as JSON conversion is done on backend
if isinstance(data, str):
data = json.loads(data, object_hook=JSONDecoder())
for key, val in data.items():
if isinstance(val, list):
data[key] = tuple(val)
dicts[value['id']] = ImmutableDict(data)
return dicts
def sql_format(self, value):
value = super().sql_format(value)
if isinstance(value, dict):
d = {}
for k, v in value.items():
if v is None:
continue
if self.schema_model and isinstance(v, (list, tuple)):
if not v:
continue
v = list(sorted(set(v)))
d[k] = v
value = dumps(d)
return value
def __set__(self, inst, value):
if value:
value = ImmutableDict(value)
super().__set__(inst, value)
def translated(self, name=None, type_='values'):
"Return a descriptor for the translated value of the field"
if name is None:
name = self.name
if name is None:
raise ValueError('Missing name argument')
return TranslatedDict(name, type_)
def _domain_column(self, operator, column, key=None):
database = Transaction().database
column = database.json_get(
super()._domain_column(operator, column), key)
if operator.endswith('like'):
column = Cast(column, database.sql_type('VARCHAR').base)
if self.search_unaccented and operator.endswith('ilike'):
column = database.unaccent(column)
return column
def _domain_value(self, operator, value):
if backend.name == 'sqlite' and isinstance(value, bool):
# json_extract returns 0 for JSON false and 1 for JSON true
value = int(value)
if isinstance(value, (Select, CombiningQuery)):
return value
if self.schema_model and isinstance(value, (list, tuple)):
value = sorted(set(value))
if operator.endswith('in'):
return [dumps(v) for v in value]
else:
value = dumps(value)
if self.search_unaccented and operator.endswith('ilike'):
database = Transaction().database
value = database.unaccent(value)
return value
def _domain_add_null(self, column, operator, value, expression):
expression = super()._domain_add_null(
column, operator, value, expression)
if value is None and operator.endswith('='):
if operator == '=':
expression |= (column == Null)
else:
expression &= (column != Null)
return expression
@domain_method
def convert_domain(self, domain, tables, Model):
name, operator, value = domain[:3]
if '.' not in name:
return super().convert_domain(domain, tables, Model)
database = Transaction().database
table, _ = tables[None]
name, key = name.split('.', 1)
Operator = SQL_OPERATORS[operator]
raw_column = self.sql_column(table)
column = self._domain_column(operator, raw_column, key)
expression = Operator(column, self._domain_value(operator, value))
if operator in {'=', '!='}:
# Try to use custom operators in case there is indexes
try:
if value is None:
expression = database.json_key_exists(
raw_column, key)
if operator == '=':
expression = operators.Not(expression)
# we compare on multi-selection by doing an equality check and
# not a contain check
elif not isinstance(value, (list, tuple)):
expression = database.json_contains(
raw_column, dumps({key: value}))
if operator == '!=':
expression = operators.Not(expression)
expression &= database.json_key_exists(
raw_column, key)
return expression
except NotImplementedError:
pass
elif operator.endswith('in'):
# Try to use custom operators in case there is indexes
if not value:
expression = Literal(operator.startswith('not'))
else:
op = '!=' if operator.startswith('not') else '='
try:
in_expr = Literal(False)
for v in value:
in_expr |= database.json_contains(
self._domain_column(op, raw_column, key),
dumps(v))
if operator.startswith('not'):
in_expr = ~in_expr
expression = in_expr
except NotImplementedError:
pass
expression = self._domain_add_null(column, operator, value, expression)
return expression
@order_method
def convert_order(self, name, tables, Model):
fname, _, key = name.partition('.')
if not key:
return super().convert_order(fname, tables, Model)
database = Transaction().database
table, _ = tables[None]
column = self.sql_column(table)
return [database.json_get(column, key)]
def definition(self, model, language):
definition = super().definition(model, language)
definition['schema_model'] = self.schema_model
return definition
class TranslatedDict(object):
'A descriptor for translated values of Dict field'
def __init__(self, name, type_):
assert type_ in ['keys', 'values']
self.name = name
self.type_ = type_
def __get__(self, inst, cls):
if inst is None:
return self
pool = Pool()
schema_model = getattr(cls, self.name).schema_model
SchemaModel = pool.get(schema_model)
value = getattr(inst, self.name)
if not value:
return value
domain = []
if self.type_ == 'values':
domain = [('type_', '=', 'selection')]
records = []
for key_names in grouped_slice(value.keys()):
records += SchemaModel.search([
('name', 'in', key_names),
] + domain)
keys = SchemaModel.get_keys(records)
if self.type_ == 'keys':
return {k['name']: k['string'] for k in keys}
elif self.type_ == 'values':
trans = {k['name']: dict(k['selection']) for k in keys}
return {k: v if k not in trans else trans[k].get(v, v)
for k, v in value.items()}
|