File: mvc_model.py

package info (click to toggle)
lirc 0.10.2-0.10
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 8,268 kB
  • sloc: ansic: 26,981; cpp: 9,187; sh: 5,875; python: 4,507; makefile: 1,049; xml: 63
file content (650 lines) | stat: -rw-r--r-- 22,640 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
''' Simple lirc setup tool - model part. '''

import configparser
import glob
import os
import os.path
import shutil
import subprocess
import sys
import tempfile
import urllib.error          # pylint: disable=no-name-in-module,F0401,E0611
import urllib.request        # pylint: disable=no-name-in-module,F0401,E0611

from lirc.database import Database
from lirc.database import Config
import config
import util

REMOTES_LIST = os.path.expanduser('~/.cache/remotes.list')
REMOTES_LIST_URL = "http://lirc-remotes.sourceforge.net/remotes.list"
RESULTS_DIR = 'lirc-setup.conf.d'
_MODPROBE_PATH = "lirc-modprobe.conf"
_OPTIONS_PATH = "/etc/lirc/lirc_options.conf"
_REMOTES_BASE_URI = "http://sf.net/p/lirc-remotes/code/ci/master/tree/remotes"
_MANUAL_REMOTE_INSTALL = '(manual install)'

_DEBUG = 'LIRC_DEBUG' in os.environ

README = """
This is some configuration files created by lirc-setup. To install them,
try the following commands:

sudo cp lirc_options.conf {sysconfdir}/lirc/lirc_options.conf
sudo cp lirc-modprobe.conf /etc/modprobe.d
sudo cp *.lircd.conf {sysconfdir}/lirc/lircd.conf.d
sudo cp lircmd.conf {sysconfdir}/lirc/lircmd.conf

Of course, if you already have a working configuration don't forget to
make backup copies as required! Note that all files are not always present.
"""

IRW_CMD = """
{lircd} --driver={driver} --device={device} --pidfile={wd}/lircd.pid \
--logfile={wd}/lircd.log --output={wd}/lircd.socket --debug=debug \
{plugindir} {lircd_conf}; {irw} {wd}/lircd.socket
"""


def _hasitem(dict_, key_):
    ''' Test if dict contains a non-null value for key. '''
    return key_ in dict_ and dict_[key_]


def _here(path):
    ' Return path added to current dir for __file__. '
    return os.path.join(os.path.dirname(os.path.abspath(__file__)), path)


def parse_options():
    ''' Parse command line options into a returned dict. '''
    options = {}
    if len(sys.argv) == 1:
        options['results_dir'] = os.path.abspath(RESULTS_DIR)
    elif len(sys.argv) == 2:
        options['results_dir'] = os.path.abspath(sys.argv[1])
    return options


def find_rc(lirc):
    ''' Return the /sys/class/rc/rc* device corresponding to lirc device. '''
    lirc = os.path.basename(lirc)
    for rc in glob.glob('/sys/class/rc/rc*'):
        if os.path.exists(os.path.join(rc, lirc)):
            return rc
    return None


def download_file(view, url, path):
    ''' Download location url to a file. '''
    try:
        urllib.request.urlretrieve(url, path)
    except urllib.error.HTTPError as ex:
        text = "Cannot download %s : %s" % (url, str(ex))
        view.show_warning('Download error', text)


def get_bundled_remotes(model):
    ''' Return the recommended remote file(s) for a device config dict,
    a possibly empty list of remote ids.
    '''

    found = []
    if model.config.lircd_conf:
        found.extend([model.config.lircd_conf])
    driver = model.config.driver
    remotes = model.db.remotes_by_driver(driver)
    if remotes:
        found.extend(remotes)
    return found


def get_dmesg_help(device):
    ''' Return dmesg lines matching device.'''
    lines = subprocess.check_output('dmesg').decode('utf-8').split('\n')
    rc = find_rc(device)
    if rc:
        rc = os.path.basename(rc)
    else:
        rc = 'fjdsk@$'
    dev = os.path.basename(device)
    return [l for l in lines if dev in l or rc in l]


def list_ttys():
    ''' List all currently used ttys on this host. '''

    def driver_path(s):
        ''' Path to driver directory for given /class/tty/x device,. '''
        return os.path.join(s, 'device', 'driver')

    # (Not tested)
    # def is_inactive_5250(s):
    #     try:
    #         driver = os.path.basename(os.readlink(driver_path(s)))
    #     except (IOError, OSError):
    #         return False
    #     return driver != 'serial5250'

    syslist = glob.glob("/sys/class/tty/*")
    syslist = [s for s in syslist if os.path.exists(driver_path(s))]
    # syslist = [s for s in syslist if not is_inactive_5250(s)]
    devices = ["/dev/" + os.path.basename(s) for s in syslist]
    return devices


def write_results(model, result_dir, view):
    ''' Write the set of new configuration files into results directory, '''
    # pylint: disable=too-many-branches

    def write_modprobe(log):
        ''' Possibly update modprobe in the /etc/modprobe.d file. '''
        if not model.config.modprobe:
            return log

        modprobe = '# Generated by lirc-setup\n'
        if isinstance(model.config.modprobe, list):
            modprobe += "\n".join(model.config.modprobe) + '\n'
        else:
            modprobe += model.config.modprobe + '\n'
        path = os.path.join(result_dir, _MODPROBE_PATH)
        with open(path, 'w') as f:
            f.write(modprobe)
        log += \
            'Info: modprobe.d modprobe config: %s \n' % model.config.modprobe
        return log

    def write_modinit(options, log):
        """ Write the modinit section to config file and log. """
        if not options.has_section('modinit'):
            options.add_section('modinit')
        options.set('modinit', 'code', model.config.modinit)
        log += "%-12s: %s\n" % ('kernel setup', model.config.modinit)
        return log

    def write_options(options, log):
        ''' Update options in new lirc_options.conf. '''
        inited = False
        if not options.has_section('lircd'):
            options.add_section('lircd')
        if not model.config.lircd_conf:
            log += 'Warning: No lircd.conf found, required by lircd.\n'
        for opt in ['device', 'lircd_conf', 'lircmd_conf', 'driver']:
            if not getattr(model.config, opt):
                continue
            if not inited:
                log += 'Info: new values in lircd_options.conf\n'
                log += '<small><tt>'
                inited = True
            value = getattr(model.config, opt)
            if opt == 'driver':
                value = value['id']
                if value in model.db.kernel_drivers:
                    value = 'default'
            if not opt == 'device':
                value = os.path.basename(value)
            options.set('lircd', opt, value)
            log += "%-12s: %s\n" % (opt, value)
        if model.config.modinit:
            log = write_modinit(options, log)
        log += '</tt></small>'
        path = os.path.join(result_dir, 'lirc_options.conf')
        with open(path, 'w') as f:
            f.write("# Generated by lirc-setup\n")
            options.write(f)
        return log

    def get_configfiles(log):
        ''' Download lircd.conf and perhaps lircmd.conf, '''

        def error(ex, uri):
            ''' Handle download error. '''
            text = "Cannot download %s : %s" % (uri, str(ex))
            view.show_error("Download error", text)

        if not model.config.lircd_conf \
                or model.config.lircd_conf == _MANUAL_REMOTE_INSTALL:
            # pylint: disable=bad-indentation
            text = "No lircd.conf defined, skipping"
            view.show_warning("Download error", text)
            return log
        for item in ['lircd_conf', 'lircmd_conf']:
            if not getattr(model.config, item):
                continue
            src = getattr(model.config, item)
            if os.path.exists(src):
                shutil.copy2(src, '.')
                log += 'Info: Copied %s to %s\n' % (str(src), os.getcwd())
                continue
            uri = os.path.join(_REMOTES_BASE_URI, src)
            path = os.path.join(result_dir, os.path.basename(src))
            try:
                urllib.request.urlretrieve(uri + '?format=raw', path)
                log += 'Info: Downloaded %s to %s\n' % (str(uri), str(path))
            except urllib.error.HTTPError as ex:
                error(ex, uri)
        return log

    options = configparser.RawConfigParser()
    options.read(_OPTIONS_PATH)
    log = 'Writing installation files in %s\n' % result_dir
    if not os.path.exists(result_dir):
        os.makedirs(result_dir)    # FIXME
    log = write_options(options, log)
    log = write_modprobe(log)
    log = get_configfiles(log)
    path = os.path.join(result_dir, 'README')
    with open(path, 'w') as f:
        f.write(README.format(sysconfdir=config.SYSCONFDIR))
    return log


def check_resultsdir(dirpath):
    ''' Check that dirpath is ok, return possibly empty error message.'''
    if not os.path.exists(dirpath):
        try:
            os.makedirs(dirpath)
            return ''
        except os.error as err:
            return "Cannot create directory %s:" % dirpath + str(err)
    elif not os.listdir(dirpath):
        if os.access(dirpath, os.W_OK):
            return ''
        else:
            return "Directory %s is not writeable" % dirpath
    else:
        return "Directory %s is in the way. Please remove it or use" \
            " another directory." % dirpath


def check_modules(cf):
    ''' Check modules options, return results as a string. '''
    if 'modinit' not in cf or not cf.modinit:
        return ''
    return "Required kernel setup: " + cf.modinit


class DeviceListModel(object):
    ''' The list of devices corresponding to the device: wildcard in config.'''
    AUTO_CONFIG = "atilibusb"   # A driver configuration with device: auto.

    def __init__(self, driver):
        self.driver_id = driver.driver['id']
        self.config = driver
        self.device_pattern = driver.driver['device_hint']
        self.label_by_device = {}
        self.list_devices()
        if not self.label_by_device:
            self.label_by_device = {}

    def list_devices(self):
        ''' List all available devices. '''
        assert self is True, 'Invalid call to abstract list_devices()'

    def is_direct_installable(self):
        ''' Return True  if this can be installed without a dialog. '''
        return len(self.label_by_device) == 1

    def is_empty(self):
        ''' Return true if there is no matching device.'''
        return len(self.label_by_device) == 0


class GenericDeviceListModel(DeviceListModel):
    ''' Generic /dev/xxx* device list. '''

    def list_devices(self):
        ''' Return a dict label_by_device, labels for matching devices. '''
        self.label_by_device = {}
        for match in glob.glob(self.config.device):
            self.label_by_device[match] = match


class SerialDeviceListModel(DeviceListModel):
    ''' Let user select a device for a userspace serial driver. '''

    def list_devices(self):
        ''' Return a dict label_by_device, labels for matching devices. '''
        self.label_by_device = {}
        for dev in list_ttys():
            rc = find_rc(dev)
            if rc:
                self.label_by_device[dev] = "%s (%s)" % (dev, rc)
            else:
                self.label_by_device[dev] = dev


class DrvctlDeviceListModel(DeviceListModel):
    '''A device list for a driver supporting drvctl enumeration. '''

    def list_devices(self):
        ''' Return a dict label_by_device, labels for matching devices. '''
        self.label_by_device = {}
        trypath = os.path.abspath(_here("../mode2"))
        if not os.path.exists(trypath):
            trypath = os.path.join(config.BINDIR, "mode2")
            plugindir = None
        else:
            plugindir = "../../plugins/.libs"
        if not os.path.exists(trypath):
            raise FileNotFoundError("trypath")
        cmd = [trypath, "--driver", self.driver_id, "--list-devices"]
        if plugindir:
            cmd.extend(["-U", plugindir])
        try:
            result = subprocess.check_output(cmd, universal_newlines=True)
        except (OSError, subprocess.CalledProcessError):
            sys.stderr.write("Error invoking: " + " ".join(cmd))
            result = None
        if not result:
            self.label_by_device['dev_null'] = 'No devices found'
            return
        devices = result.split("\n")
        for device in devices:
            words = device.split(None, 1)
            if len(words) == 0:
                continue
            if len(words) == 1:
                self.label_by_device[words[0]] = words[0]
            else:
                self.label_by_device[words[0]] = " ".join(words)


class UdpPortDeviceList(DeviceListModel):
    ''' Dummy list for the udp driver port. '''

    def __init__(self, model):
        cf = model.db.find_config('id', 'udp')
        DeviceListModel.__init__(self, cf)

    def list_devices(self):
        self.label_by_device = {'8765': 'default port 8765'}

    def is_direct_installable(self):
        return False

    def is_empty(self):
        ''' Return true if there is no matching device.'''
        return False


class AutoDeviceList(DeviceListModel):
    ''' Dummy list for the driver: 'auto'  entries. '''

    def __init__(self, model):
        cf = model.db.find_config('id', self.AUTO_CONFIG)
        DeviceListModel.__init__(self, cf)

    def list_devices(self):
        self.label_by_device = \
            {'auto': 'automatically probed device'}

    def is_direct_installable(self):
        return True

    def is_empty(self):
        return False


def device_list_factory(driver, model):
    ''' Given a device_hint: wildcard from driver, return a DeviceList. '''

    # pylint: disable=redefined-variable-type

    if isinstance(driver, str):
        driver = model.db.drivers[driver]
    if 'device_hint' not in driver:
        driver['device_hint'] = \
            model.db.drivers[driver['id']]['device_hint']
    hint = driver['device_hint']
    print("device hint: " + hint)
    devicel_list_model = None
    if hint.startswith('/dev/tty'):
        devicel_list_model = SerialDeviceListModel(driver)
    elif hint == 'auto':
        devicel_list_model = AutoDeviceList(model)
    elif hint == 'udp_port':
        devicel_list_model = UdpPortDeviceList(model)
    elif hint == 'drvctl':
        devicel_list_model = DrvctlDeviceListModel(model)
    else:
        devicel_list_model = GenericDeviceListModel(driver)
    return devicel_list_model


class Model(object):
    ''' The basic model is the selected remote, driver and device. '''
    # pylint: disable=too-many-public-methods

    NO_SUCH_REMOTE = None
    NO_SUCH_DRIVER = 0
    DEVICE_ATTR = ['lircd_conf', 'lircmd_conf', 'driver',
                   'note', 'label', 'menu', 'id']
    DRIVER_ATTR = ['type', 'modinit', 'modprobe', 'device']

    def __init__(self):
        self.config = Config()
        self.db = Database()
        self.device_list = None
        self.remotes_index = None
        self._listeners = []

    def reset(self):
        ''' Reset to pristine state... '''
        Model.__init__(self)

    def _call_listeners(self):
        ''' Call all listeners. '''
        for listener in self._listeners:
            listener()

    def add_listener(self, listener):
        ''' Add function to be called when model changes.'''
        self._listeners.append(listener)

    def set_remote(self, remote):
        ''' Update the remote configuration file. '''
        self.config.lircd_conf = remote
        self._call_listeners()

    def clear_remote(self):
        ''' Unset the lircd_conf status. '''
        self.config.lircd_conf = ''
        self._call_listeners()

    def set_manual_remote(self):
        ''' Set the remote to 'no remote', i.e., manual install. '''
        self.set_remote(_MANUAL_REMOTE_INSTALL)

    def is_remote_manual(self):
        ''' return True if lircd.conf should be installed manually. '''
        return self.config.lircd_conf == _MANUAL_REMOTE_INSTALL

    def set_config(self, cf):
        ''' Given a config dict update selected capture device and config. '''
        if isinstance(cf, str):
            cf = self.db.configs[cf]
        self.config.config = cf
        if 'modprobe' in cf:
            self.config.modprobe = cf['modprobe']
        self.set_driver(cf['driver'])
        self._call_listeners()

    def set_serial_parms(self, irq, iobase, device):
        ''' Set modprobe  + modinit options for serial port. irq and
        iobase are hexadecimal numbers text strings, device is
        com1..com4.
        '''
        tty_by_com = {'com1': '/dev/ttyS0',
                      'com2': '/dev/ttyS1',
                      'com3': '/dev/ttyS2',
                      'com4': '/dev/ttyS3'}
        self.config.modprobe = \
            'lirc_serial: iobase=%s irq=%s\n' % (iobase, irq)
        if not device or device.lower() not in tty_by_com:
            return
        device = tty_by_com[device.lower()]
        self.config.modinit = \
            'setserial %s uart none; modprobe lirc_serial' % device
        try:
            rc_dir = util.get_rcdir_by_device(self.config.device)
        except LookupError:
            pass
        else:
            self.config.modinit += '; echo lirc > %s/protocols' % rc_dir
        self._call_listeners()

    def set_lpt_parms(self, irq, iobase, device):
        ''' Set modprobe  + modinit options for parallel port. irq and
        iobase are hexadecimal numbers text strings, device is
        lpt1..lpt4'''
        lp_by_lpt = {'lpt1': '/dev/lp0',
                     'lpt2': '/dev/lp1',
                     'lpt3': '/dev/lp2',
                     'lpt4': '/dev/lp3'}
        self.config.modinit = 'modprobe lirc_parallel'
        if not device or device.lower() not in lp_by_lpt:
            return
        self.config.modprobe = \
            'lirc_parallel: iobase=%s irq=%s\n' % (iobase, irq)
        device = lp_by_lpt[device.lower()]
        try:
            rc_dir = util.get_rcdir_by_device(self.config.device)
        except LookupError:
            pass
        else:
            self.config.modinit += '; echo lirc > %s/protocols' % rc_dir
        self._call_listeners()

    def set_driver(self, driver):
        ''' Update the selected driver dict. '''
        if isinstance(driver, str):
            driver = self.db.drivers[driver]
        self.config.driver = driver
        self.config.device = driver['device_hint']
        remotes = self.db.remotes_by_driver(driver)
        if remotes:
            self.config.lircd_conf = remotes[0]
        self._call_listeners()

    def clear_capture_device(self):
        ''' Indeed: unset the capture device. '''
        self.config.driver = {}
        self.config.device = ""
        self.config.config = {}
        self._call_listeners()
        self.config.label = ""

    def set_modinit(self, modinit):
        ''' Update the modinit part in config. '''
        self.config.modinit = modinit
        self._call_listeners()

    def set_device(self, device):
        ''' Update the device  + label part in config.'''
        self.config.device = device
        driver_id = self.config.driver['id']
        self.config.label = driver_id + ' on ' + device
        self._call_listeners()

    def get_remotes_list(self, view):
        ''' Download and return the directory file as a list of lines. '''
        if not self.remotes_index:
            if not os.path.exists(REMOTES_LIST):
                download_file(view, REMOTES_LIST_URL, REMOTES_LIST)
            with open(REMOTES_LIST) as f:
                list_ = f.read()
            self.remotes_index = [l for l in list_.split('\n') if l]
        return self.remotes_index

    def get_mode2(self):
        ''' Return mode2 command built from current driver and device. '''
        driver = self.config.driver['id']
        if driver in self.db.kernel_drivers.keys():
            driver = 'default'
        cmd = ['mode2',
               '--driver=' + driver,
               '--device=' + self.config.device]
        if os.path.exists(_here('../../plugins/.libs')):
            cmd[0] = '../../tools/mode2'
            cmd.extend(['--plugindir=../../plugins/.libs'])
        return ' '.join(cmd)

    def get_irwcheck(self):
        ''' Return strings to run lircd + irw. '''
        wd = tempfile.mkdtemp(prefix='var-')
        driver = self.config.driver['id']
        if driver in self.db.kernel_drivers.keys():
            driver = 'default'
        if os.path.exists(_here('../../plugins/.libs')):
            lircd = '../../daemons/lircd'
            irw = '../../tools/irw'
            plugindir = '--plugindir=../../plugins/.libs'
        else:
            lircd = 'lircd'
            irw = 'irw'
            plugindir = ''
        uri = os.path.join(_REMOTES_BASE_URI, self.config.lircd_conf)
        path = os.path.join(wd, os.path.basename(self.config.lircd_conf))
        try:
            urllib.request.urlretrieve(uri + '?format=raw', path)
        except urllib.error.HTTPError as ex:
            return False, str(ex), None
        cmd = IRW_CMD.format(wd=wd, irw=irw, lircd=lircd,
                             device=self.config.device,
                             driver=driver,
                             lircd_conf=path,
                             plugindir=plugindir)
        kill = "kill $(cat {wd}/lircd.pid); rm -rf {wd}".format(wd=wd)
        return True, cmd, kill

    def is_installable(self):
        ''' Do we have a configuration which can be installed? '''
        if not self.config.lircd_conf or not self.config.driver:
            return False
        conf = self.config.lircd_conf
        conf = conf.replace('(', '').replace(')', '').lower()
        return conf not in ['any', 'none']

    def is_testable(self):
        ''' Do we have a configuration which can be tested? '''
        if not self.is_installable():
            return False
        if 'manual' in self.config.lircd_conf:
            return False
        return True

    def get_bundled_driver(self, remote):
        ''' Return the bundled capture device file for remote,
        possibly None (no such driver)
        '''
        return self.db.driver_by_remote(remote)

    def driver_info(self):
        """ Current driver info,  possibly None. """
        if not self.config.driver:
            return None
        try:
            return self.config.driver['info']
        except KeyError:
            return None

    def has_lircd_conf(self):
        ''' Return True if there is a valid lircd_conf in config. '''
        cf = self.config.lircd_conf
        if not cf:
            return False
        cf = cf.replace('(', '').replace(')', '').replace(' ', '').lower()
        if cf in ['', 'any', 'none']:
            return False
        return True

    def has_label(self):
        ''' Test if there is a valid driver label in config. '''
        lbl = self.config.label
        if not lbl or lbl.lower().endswith('(none)'):
            return False
        return True


# vim: set expandtab ts=4 sw=4: