1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from collections import namedtuple
from sql import For
DatabaseIntegrityError = None
DatabaseOperationalError = None
SQLType = namedtuple('SQLType', 'base type')
class DatabaseInterface(object):
'''
Define generic interface for database connection
'''
flavor = None
IN_MAX = 1000
def __new__(cls, name=''):
return object.__new__(cls)
def __init__(self, name=''):
self.name = name
def connect(self):
raise NotImplementedError
def get_connection(self, autocommit=False, readonly=False):
raise NotImplementedError
def put_connection(self, connection, close=False):
raise NotImplementedError
def close(self):
raise NotImplementedError
@classmethod
def create(cls, connection, database_name):
raise NotImplementedError
@classmethod
def drop(cls, connection, database_name):
raise NotImplementedError
def list(self, hostname=None):
raise NotImplementedError
def init(self):
raise NotImplementedError
def test(self, hostname=None):
'''
Test if it is a Tryton database.
'''
raise NotImplementedError
def nextid(self, connection, table, count=1):
pass
def setnextid(self, connection, table, value):
pass
def currid(self, connection, table):
pass
@classmethod
def lock(cls, connection, table):
raise NotImplementedError
@classmethod
def lock_records(cls, connection, table, ids):
raise NotImplementedError
def lock_id(self, id, timeout=None):
raise NotImplementedError
def has_constraint(self, constraint):
raise NotImplementedError
def has_returning(self):
return False
def has_multirow_insert(self):
return False
def has_select_for(self):
return False
def get_select_for_skip_locked(self):
return For
def has_window_functions(self):
return False
def has_unaccent(self):
return False
def has_unaccent_indexable(self):
return False
def unaccent(self, value):
return value
def has_similarity(self):
return False
def similarity(self, column, value):
raise NotImplementedError
def has_search_full_text(self):
return False
def format_full_text(self, *documents, language=None):
return '\n'.join(documents)
def format_full_text_query(self, query, language=None):
raise NotImplementedError
def search_full_text(self, document, query):
raise NotImplementedError
def rank_full_text(self, document, query, normalize=None):
"Return the expression that ranks query on document"
raise NotImplementedError
@classmethod
def has_sequence(cls):
return False
def sequence_exist(self, connection, name):
if not self.has_sequence():
return
raise NotImplementedError
def sequence_create(
self, connection, name, number_increment=1, start_value=1):
if not self.has_sequence():
return
raise NotImplementedError
def sequence_update(
self, connection, name, number_increment=1, start_value=1):
if not self.has_sequence():
return
raise NotImplementedError
def sequence_rename(self, connection, old_name, new_name):
if not self.has_sequence():
return
raise NotImplementedError
def sequence_delete(self, connection, name):
if not self.has_sequence():
return
raise NotImplementedError
def sequence_next_number(self, connection, name):
if not self.has_sequence():
return
raise NotImplementedError
def has_channel(self):
return False
def sql_type(self, type_):
pass
def sql_format(self, type_, value):
pass
def json_get(self, column, key=None):
raise NotImplementedError
def json_key_exists(self, column, key):
raise NotImplementedError
def json_any_keys_exist(self, column, keys):
raise NotImplementedError
def json_all_keys_exist(self, column, keys):
raise NotImplementedError
def json_contains(self, column, json):
raise NotImplementedError
|