1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from sql.operators import Equal
from trytond.transaction import Transaction, without_check_access
from .modelsql import Exclude, ModelSQL
from .modelstorage import ModelStorage
class ModelSingleton(ModelStorage):
"""
Define a singleton model in Tryton.
"""
@classmethod
def __setup__(cls):
super().__setup__()
# Cache disable because it is used as a read by the client
cls.__rpc__['default_get'].cache = None
if issubclass(cls, ModelSQL):
table = cls.__table__()
cls._sql_constraints.append(
('singleton', Exclude(table, (table.id * 0, Equal)),
'ir.msg_singleton'))
@classmethod
def get_singleton(cls):
'''
Return the instance of the unique record if there is one.
'''
singletons = super(ModelSingleton, cls).search([], limit=1)
if singletons:
return singletons[0]
@classmethod
def create(cls, vlist):
assert len(vlist) == 1
singleton = cls.get_singleton()
if not singleton:
if issubclass(cls, ModelSQL):
cls.lock()
return super(ModelSingleton, cls).create(vlist)
cls.write([singleton], vlist[0])
return [singleton]
@classmethod
def read(cls, ids, fields_names):
singleton = cls.get_singleton()
if not singleton:
fname_no_rec_name = [
f for f in fields_names
if '.' not in f and not f.startswith('_')]
res = cls.default_get(fname_no_rec_name,
with_rec_name=len(fname_no_rec_name) != len(fields_names))
for field_name in fields_names:
if field_name not in res:
res[field_name] = None
res['id'] = ids[0]
res['_write'] = True
res['_delete'] = True
return [res]
res = super(ModelSingleton, cls).read([singleton.id], fields_names)
res[0]['id'] = ids[0]
return res
@classmethod
def write(cls, records, values, *args):
singleton = cls.get_singleton()
if not singleton:
with without_check_access():
singleton, = cls.create([values])
actions = (records, {}) + args
else:
actions = (records, values) + args
args = []
for values in actions[1:None:2]:
args.extend(([singleton], values))
super(ModelSingleton, cls).write(*args)
# Clean local cache of original records
for record in sum(actions[0:None:2], []):
record._local_cache.pop(record.id, None)
# Clean transaction cache of all ids
for cache in Transaction().cache.values():
if cls.__name__ in cache:
cache[cls.__name__].clear()
@classmethod
def delete(cls, records):
singleton = cls.get_singleton()
if singleton:
super(ModelSingleton, cls).delete([singleton])
# Clean transaction cache of all ids
for cache in Transaction().cache.values():
if cls.__name__ in cache:
cache[cls.__name__].clear()
@classmethod
def copy(cls, records, default=None):
if default:
cls.write(records, default)
return records
@classmethod
def search(cls, domain, offset=0, limit=None, order=None, count=False):
res = super(ModelSingleton, cls).search(domain, offset=offset,
limit=limit, order=order, count=count)
if not res and not domain:
if count:
return 1
return [cls(1)]
return res
@classmethod
def default_get(cls, fields_names, with_rec_name=True):
if '_timestamp' in fields_names:
fields_names = list(fields_names)
fields_names.remove('_timestamp')
default = super(ModelSingleton, cls).default_get(fields_names,
with_rec_name=with_rec_name)
singleton = cls.get_singleton()
if singleton:
if with_rec_name:
fields_names = fields_names[:]
for field in fields_names[:]:
if cls._fields[field]._type in ('many2one',):
fields_names.append(field + '.rec_name')
default, = cls.read([singleton.id], fields_names=fields_names)
del default['id']
return default
|