File: tokenizer.py

package info (click to toggle)
python-mido 1.3.3-0.2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 920 kB
  • sloc: python: 4,006; makefile: 127; sh: 4
file content (101 lines) | stat: -rw-r--r-- 2,886 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# SPDX-FileCopyrightText: 2017 Ole Martin Bjorndalen <ombdalen@gmail.com>
#
# SPDX-License-Identifier: MIT

from collections import deque
from numbers import Integral

from .messages.specs import SPEC_BY_STATUS, SYSEX_END, SYSEX_START


class Tokenizer:
    """
    Splits a MIDI byte stream into messages.
    """
    def __init__(self, data=None):
        """Create a new decoder."""

        self._status = 0
        self._bytes = []
        self._messages = deque()
        self._datalen = 0

        if data is not None:
            self.feed(data)

    def _feed_status_byte(self, status):
        if status == SYSEX_END:
            if self._status == SYSEX_START:
                self._bytes.append(SYSEX_END)
                self._messages.append(self._bytes)

            self._status = 0

        elif 0xf8 <= status <= 0xff:
            if self._status != SYSEX_START:
                # Realtime messages are only allowed inside sysex
                # messages. Reset parser.
                self._status = 0

            if status in SPEC_BY_STATUS:
                self._messages.append([status])

        elif status in SPEC_BY_STATUS:
            # New message.
            spec = SPEC_BY_STATUS[status]

            if spec['length'] == 1:
                self._messages.append([status])
                self._status = 0
            else:
                self._status = status
                self._bytes = [status]
                self._len = spec['length']
        else:
            # Undefined message. Reset parser.
            # (Undefined realtime messages are handled above.)
            # self._status = 0
            pass

    def _feed_data_byte(self, byte):
        if self._status:
            self._bytes.append(byte)
            if len(self._bytes) == self._len:
                # Complete message.
                self._messages.append(self._bytes)
                self._status = 0
        else:
            # Ignore stray data byte.
            pass

    def feed_byte(self, byte):
        """Feed MIDI byte to the decoder.

        Takes an int in range [0..255].
        """
        if not isinstance(byte, Integral):
            raise TypeError('message byte must be integer')

        if 0 <= byte <= 255:
            if byte <= 127:
                return self._feed_data_byte(byte)
            else:
                return self._feed_status_byte(byte)
        else:
            raise ValueError(f'invalid byte value {byte!r}')

    def feed(self, data):
        """Feed MIDI bytes to the decoder.

        Takes an iterable of ints in in range [0..255].
        """
        for byte in data:
            self.feed_byte(byte)

    def __len__(self):
        return len(self._messages)

    def __iter__(self):
        """Yield messages that have been parsed so far."""
        while len(self._messages):
            yield self._messages.popleft()