1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import warnings
from sql import Cast, Expression, Literal, Null, Query
from sql.functions import Position, Substring
from trytond.pool import Pool
from trytond.pyson import PYSONEncoder
from trytond.rpc import RPC
from trytond.tools import cached_property
from trytond.transaction import (
Transaction, inactive_records, without_check_access)
from .field import (
Field, context_validate, domain_method, domain_validate,
instantiate_context, search_order_validate)
from .selection import SelectionMixin
class Reference(SelectionMixin, Field):
'''
Define a reference field (``str``).
'''
_type = 'reference'
_sql_type = 'VARCHAR'
def __init__(self, string='', selection=None, sort=True,
selection_change_with=None, translate=True, search_order=None,
search_context=None, help='', help_selection=None, required=False,
readonly=False, domain=None, states=None,
on_change=None, on_change_with=None, depends=None, context=None,
loading='lazy', datetime_field=None):
'''
:param selection: A list or a function name that returns a list.
The list must be a list of tuples. First member is an internal name
of model and the second is the user name of model.
:param sort: A boolean to sort or not the selections.
:param datetime_field: The name of the field that contains the datetime
value to read the target records.
:param search_order: The order to use when searching for a record
:param search_context: The context to use when searching for a record
'''
super(Reference, self).__init__(string=string, help=help,
required=required, readonly=readonly, domain=domain, states=states,
on_change=on_change, on_change_with=on_change_with,
depends=depends, context=context, loading=loading)
self.datetime_field = datetime_field
if hasattr(selection, 'copy'):
self.selection = selection.copy()
else:
self.selection = selection
self.selection_change_with = set()
if selection_change_with:
warnings.warn('selection_change_with argument is deprecated, '
'use the depends decorator',
DeprecationWarning, stacklevel=2)
self.selection_change_with.update(selection_change_with)
self.sort = sort
self.translate_selection = translate
self.help_selection = help_selection
self.__search_order = None
self.search_order = search_order
self.__search_context = None
self.search_context = search_context or {}
__init__.__doc__ += Field.__init__.__doc__
@property
def domain(self):
return self.__domain
@domain.setter
def domain(self, value):
if value is None:
value = {}
assert isinstance(value, dict)
for domain in value.values():
domain_validate(domain)
self.__domain = value
@property
def search_order(self):
return self.__search_order
@search_order.setter
def search_order(self, value):
if value is None:
value = {}
assert isinstance(value, dict)
for order in value.values():
search_order_validate(order)
self.__search_order = value
@property
def search_context(self):
return self.__search_context
@search_context.setter
def search_context(self, value):
context_validate(value)
self.__search_context = value
@cached_property
def display_depends(self):
depends = super().display_depends
if self.datetime_field:
depends.add(self.datetime_field)
return depends
def set_rpc(self, model):
super(Reference, self).set_rpc(model)
if not isinstance(self.selection, (list, tuple)):
assert hasattr(model, self.selection), \
'Missing %s on model %s' % (self.selection, model.__name__)
instantiate = 0 if self.selection_change_with else None
cache = dict(days=1) if instantiate is None else None
model.__rpc__.setdefault(
self.selection, RPC(instantiate=instantiate, cache=cache))
def get(self, ids, model, name, values=None):
'''
Replace removed reference id by None.
'''
pool = Pool()
if values is None:
values = {}
res = {}
for i in values:
res[i['id']] = i[name]
ref_to_check = {}
for i in ids:
if not (i in res):
res[i] = None
continue
if not res[i]:
continue
ref_model, ref_id = res[i].split(',', 1)
if not ref_model:
continue
try:
ref_id = int(ref_id)
except ValueError:
continue
if ref_id < 0:
continue
res[i] = ref_model + ',' + str(ref_id)
ref_to_check.setdefault(ref_model, (set(), []))
ref_to_check[ref_model][0].add(ref_id)
ref_to_check[ref_model][1].append(i)
# Check if reference ids still exist
with inactive_records(), without_check_access():
for ref_model, (ref_ids, ids) in ref_to_check.items():
try:
pool.get(ref_model)
except KeyError:
res.update(dict((i, None) for i in ids))
continue
Ref = pool.get(ref_model)
refs = Ref.search([
('id', 'in', list(ref_ids)),
], order=[])
refs = list(map(str, refs))
for i in ids:
if res[i] not in refs:
res[i] = None
return res
def __set__(self, inst, value):
from ..model import Model
if not isinstance(value, (Model, type(None))):
if isinstance(value, str):
target, value = value.split(',')
else:
target, value = value
Target = Pool().get(target)
ctx = instantiate_context(self, inst)
with Transaction().set_context(ctx):
if isinstance(value, dict):
value = Target(**value)
else:
value = Target(value)
super(Reference, self).__set__(inst, value)
def sql_format(self, value):
if value and not isinstance(value, (str, Query, Expression)):
try:
value = '%s,%s' % tuple(value)
except TypeError:
value = str(value)
return super(Reference, self).sql_format(value)
def sql_id(self, column, Model):
"Return SQL expression for the id part of the field"
return Cast(Substring(
column, Position(',', column) + Literal(1)),
Model.id.sql_type().base)
@inactive_records
@domain_method
def convert_domain(self, domain, tables, Model):
if '.' not in domain[0]:
return super(Reference, self).convert_domain(domain, tables, Model)
pool = Pool()
name, operator, value, target = domain[:4]
Target = pool.get(target)
table, _ = tables[None]
name, target_name = name.split('.', 1)
assert name == self.name
column = self.sql_column(table)
target_domain = [(target_name,) + tuple(domain[1:3])
+ tuple(domain[4:])]
if 'active' in Target._fields:
target_domain.append(('active', 'in', [True, False]))
query = Target.search(target_domain, order=[], query=True)
expression = (
self.sql_id(column, Model).in_(query)
& column.like(target + ',%'))
if ((operator.startswith('not') and not (
operator == 'not in'
and None in value))
or (operator == '!=' and value is not None)):
expression |= column == Null
return expression
def definition(self, model, language):
encoder = PYSONEncoder()
definition = super().definition(model, language)
definition['datetime_field'] = self.datetime_field
definition['search_context'] = encoder.encode(self.search_context)
definition['search_order'] = encoder.encode(self.search_order)
return definition
|