1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
# vim:et sw=4 sts=4 sw=4
#
# ibus-table - The Tables engine for IBus
#
# Copyright (c) 2008-2009 Yu Yuwei <acevery@gmail.com>
# Copyright (c) 2009-2014 Caius "kaio" CHANCE <me@kaio.net>
# Copyright (c) 2012-2015, 2021-2022 Mike FABIAN <mfabian@redhat.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>
'''
TabEngine Factory
'''
from typing import Dict
from typing import Optional
from typing import Callable
import os
import re
import logging
from gettext import dgettext
_: Callable[[str], str] = lambda a: dgettext("ibus-table", a)
N_: Callable[[str], str] = lambda a: a
from gi import require_version # type: ignore
# pylint: disable=wrong-import-position
require_version('IBus', '1.0')
from gi.repository import IBus # type: ignore
# pylint: enable=wrong-import-position
import table
import tabsqlitedb
LOGGER = logging.getLogger('ibus-table')
DEBUG_LEVEL = int(0)
class EngineFactory(IBus.Factory): # type: ignore
"""Table IM Engine Factory"""
def __init__(self, bus: IBus.Bus, db: str = '') -> None:
global DEBUG_LEVEL # pylint: disable=global-statement
try:
DEBUG_LEVEL = int(str(os.getenv('IBUS_TABLE_DEBUG_LEVEL')))
except (TypeError, ValueError):
DEBUG_LEVEL = int(0)
if DEBUG_LEVEL > 1:
LOGGER.debug('EngineFactory.__init__(bus=%s, db=%s)\n', bus, db)
self.db: Optional[tabsqlitedb.TabSqliteDb] = None
self.dbdict: Dict[str, tabsqlitedb.TabSqliteDb] = {}
# db is the full path to the sql database
if db:
self.dbusname = os.path.basename(db).replace('.db', '')
udb = os.path.basename(db).replace('.db', '-user.db')
self.db = tabsqlitedb.TabSqliteDb(filename=db, user_db=udb)
self.db.db.commit()
self.dbdict = {self.dbusname:self.db}
# init factory
self.bus = bus
super().__init__(connection=bus.get_connection(),
object_path=IBus.PATH_FACTORY)
self.engine_id = 0
self.engine_path = ''
def do_create_engine( # pylint: disable=arguments-differ
self, engine_name: str) -> table.TabEngine:
if DEBUG_LEVEL > 1:
LOGGER.debug(
'EngineFactory.do_create_engine(engine_name=%s)\n',
engine_name)
engine_name = re.sub(r'^table:', '', engine_name)
engine_base_path = "/com/redhat/IBus/engines/table/%s/engine/"
path_patt = re.compile(r'[^a-zA-Z0-9_/]')
self.engine_path = engine_base_path % path_patt.sub('_', engine_name)
try:
if not self.db:
# first check self.dbdict
if engine_name not in self.dbdict:
db_dir = '/usr/share/ibus-table/tables'
if os.getenv('IBUS_TABLE_LOCATION'):
db_dir = os.path.join(
str(os.getenv('IBUS_TABLE_LOCATION')), 'tables')
db = os.path.join(db_dir, engine_name+'.db')
udb = engine_name+'-user.db'
if not os.path.exists(db):
byo_db_dir = os.path.expanduser('~/.ibus/byo-tables')
db = os.path.join(byo_db_dir, engine_name + '.db')
_sq_db = tabsqlitedb.TabSqliteDb(filename=db, user_db=udb)
_sq_db.db.commit()
self.dbdict[engine_name] = _sq_db
engine = table.TabEngine(self.bus,
self.engine_path + str(self.engine_id),
self.dbdict[engine_name])
self.engine_id += 1
#return engine.get_dbus_object()
return engine
except Exception as error: # pylint: disable=broad-except
LOGGER.exception(
'Failed to create engine %s: %s: %s',
engine_name, error.__class__.__name__, error)
raise Exception from error # pylint: disable=broad-exception-raised
def do_destroy(self) -> None: # pylint: disable=arguments-differ
'''Destructor, which finish some task for IME'''
if DEBUG_LEVEL > 1:
LOGGER.debug('EngineFactory.do_destroy()\n')
#
## we need to sync the temp userdb in memory to the user_db on disk
for key, database in self.dbdict.items():
LOGGER.info('Syncing %s %s', key, database)
database.sync_usrdb()
##print "Have synced user db\n"
super().destroy()
|