File: iso-ir-multiple

package info (click to toggle)
libhinoko 1.0.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 544 kB
  • sloc: ansic: 2,557; python: 693; makefile: 9; javascript: 5
file content (86 lines) | stat: -rwxr-xr-x 2,495 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3

from threading import Timer
from struct import unpack

import gi
gi.require_versions({'GLib': '2.0', 'Hinoko': '1.0'})
from gi.repository import GLib, Hinoko

class IsoIrMultiple(Hinoko.FwIsoIrMultiple):
    def new(self, path, bytes_per_packet, syt_interval):
        channels = [i for i in range(4)]
        self.allocate(path, channels)

        channels = self.get_property('channels')
        bytes_per_chunk = len(channels) * bytes_per_packet * syt_interval

        self.map_buffer(4096, 32)

    def destroy(self):
        self.unmap_buffer()
        self.release()

    def begin(self, dispatcher, duration):
        _, self.__src = self.create_source()
        self.__src.attach(dispatcher.get_context())
        self.__dispatcher = dispatcher

        # Roughly finish event loop.
        self.__timer = Timer(duration, lambda dispatcher: dispatcher.quit(),
                             args=(dispatcher,))

        tags = Hinoko.FwIsoCtxMatchFlag.TAG0 | \
               Hinoko.FwIsoCtxMatchFlag.TAG1 | \
               Hinoko.FwIsoCtxMatchFlag.TAG2 | \
               Hinoko.FwIsoCtxMatchFlag.TAG3

        self.__timer.start()
        self.start(None, 0, tags, 4)

    def finish(self):
        self.__src.destroy()
        self.stop()

    def do_stopped(self, error):
        self.__dispatcher.quit()
        if error:
            self.__timer.cancel()
            print(error)

    @staticmethod
    def __ohci1394_tstamp_to_isoc_cycle(tstamp):
        sec = (tstamp & 0x0000e000) >> 13
        cycle = tstamp & 0x00001fff
        return (sec, cycle)

    def do_interrupted(self, count):
        for i in range(count):
            data = self.get_payload(i)

            iso_header = unpack('<I', data[0:4])[0]

            frames = unpack('>{0}I'.format(len(data[4:-4]) // 4), data[4:-4])

            tstamp = unpack('<I', data[-4:])[0]
            sec, cycle = self.__ohci1394_tstamp_to_isoc_cycle(tstamp)

            print('{0:d},{1:d},{2:08x},{3[0]:08x},{3[1]:08x},{4}'.format(sec, cycle, iso_header, frames, i))
            for i, frame in enumerate(frames):
                print('    {0:2d}: {1:08x}'.format(i, frame))

# 2 CIP headers + 19 quadlet per data block * 8 data block + iso header + tstamp
bytes_per_packet = (2 + 19 * 8 + 2) * 4

ctx = IsoIrMultiple()
ctx.new('/dev/fw0', bytes_per_packet, 8)

dispatcher = GLib.MainLoop.new(None, False)

ctx.begin(dispatcher, 4)
dispatcher.run()
ctx.finish()
ctx.destroy()

del dispatcher
del ctx