File: hmm.py

package info (click to toggle)
sphinxtrain 5.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 32,572 kB
  • sloc: ansic: 94,052; perl: 8,939; python: 6,702; cpp: 2,044; makefile: 6
file content (416 lines) | stat: -rw-r--r-- 13,762 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# Copyright (c) 2007 Carnegie Mellon University
#
# You may copy and modify this freely under the same terms as
# Sphinx-III
"""Hidden Markov Model objects for training/decoding.

This module provides a basic HMM object and factory classes for
building HMMs from triphones and from sentences.
"""

__author__ = "David Huggins-Daines <dhdaines@gmail.com>"
__version__ = "$Revision$"

import numpy
import bisect


class HMMGraph(object):
    """
    A word or sentence HMM, represented as a directed acyclic graph
    of phoneme HMMs.  This object implements the same interface as
    individual HMMs for training, evaluation, and decoding.
    """
    def __init__(self, *hmms):
        self.hmms = []
        self.offsets = [0]
        self.hmmmap = {}
        if hmms:
            self.append(*hmms)

    def append(self, *hmms):
        """
        Append a list of HMMs or tuples of alternative HMMs (more
        interesting graph structures are possible, but not through
        this interface, yet).

        Each argument is either a single cmusphinx.hmm.HMM object or a
        tuple of multiple HMM objects.  In the latter case, the
        multiple HMMs will be added as alternatives with equal
        transition probabilities into them.
        """
        for h1, h2 in zip(hmms[:-1], hmms[1:]):
            if isinstance(h1, tuple):
                for h in h1:
                    self.hmms.append(h)
                    self.hmmmap[h] = self.offsets[-1]
                    self.offsets.append(self.offsets[-1] + len(h))
                    h.link(h2)
            else:
                self.hmms.append(h1)
                self.hmmmap[h1] = self.offsets[-1]
                self.offsets.append(self.offsets[-1] + len(h1))
                h1.link(h2)
        if isinstance(hmms[-1], tuple):
            for h in hmms[-1]:
                self.hmms.append(h)
                self.hmmmap[h] = self.offsets[-1]
                self.offsets.append(self.offsets[-1] + len(h))
            self.offsets.pop()  # Remove the extra offset
        else:
            self.hmms.append(hmms[-1])
            self.hmmmap[hmms[-1]] = self.offsets[-1]

    def get_hmm_idx(self, idx):
        """
        Get HMM and offset from state ID.

        @param idx: State ID in this graph
        @type idx: int
        @return: HMM object and ID of first state in HMM
        @rtype: (cmusphinx.hmm.HMM, int)
        """
        i = bisect.bisect(self.offsets, idx)
        return self.hmms[i - 1], idx - self.offsets[i - 1]

    def senid(self, idx):
        """
        Get senone ID from state ID.

        @param idx: State ID in this graph
        @type idx: int
        @return: Senone ID for this state
        @rtype: int
        """
        hmm, offset = self.get_hmm_idx(idx)
        return hmm[offset]

    def tprob(self, i, j):
        """
        Get transition probability from state i to state j.

        @param i: State ID for source state.
        @type i: int
        @param j: State ID for destination state.
        @type j: int
        @return: Transition log-probability (base e) from i to j.
        @rtype: float
        """
        ihmm, ioff = self.get_hmm_idx(i)
        jhmm, joff = self.get_hmm_idx(j)
        if ihmm == jhmm:
            return ihmm[ioff, joff]
        elif ioff == len(ihmm) - 1 and joff == 0 and jhmm in ihmm.links:
            return ihmm.links[jhmm]
        else:
            return 0

    def __getitem__(self, key):
        """
        Index this object.

        @param key: Either a single value, in which case the senone ID
                    for the given state ID is returned, or a tuple
                    (i,j), in which case the transition probability
                    from i to j is returned.
        """
        if isinstance(key, tuple):
            return self.tprob(*key)
        else:
            return self.senid(key)

    def iter_senones(self):
        """
        Iterate over senone IDs (in arbitrary order).

        @return: A generator over all senone IDs in this HMMGraph.
        @rtype: generator(int)
        """
        for h in self.hmms:
            for s in h.iter_senones():
                yield s

    def senones(self):
        """
        Return all senone IDs (in arbitrary order).

        @return: All senones in this HMMGraph
        @rtype: (int)
        """
        return tuple(self.iter_senones())

    def __len__(self):
        """Number of states in this HMM graph."""
        return sum([len(h) for h in self.hmms])

    def iter_statepairs(self):
        """
        Iterate over state pairs with non-zero transition
        probabilities.

        @return: A generator over state pairs in this HMMGraph.
                 Transitions out of non-emitting states are returned
                 first, followed by transitions between emitting
                 states, then finally transitions into non-emitting
                 states.
        @rtype: generator((int,int))
        """
        # Transitions out of non-emitting states come first (see
        # below)
        for hmm, offset in zip(self.hmms, self.offsets):
            for ohmm in hmm.links:
                # Transition from final state of this one to first
                # state of successor HMM
                yield offset + len(hmm) - 1, self.hmmmap[ohmm]
        # Transitions into non-emitting states come last (this should
        # happen automatically since they are always last in each
        # state sequence)
        for hmm, offset in zip(self.hmms, self.offsets):
            for i, j in hmm.iter_statepairs():
                yield i + offset, j + offset


class HMM(object):
    """Class representing a single HMM"""
    def __init__(self, sseq, tmat, name=None):
        self.sseq = sseq
        self.tmat = tmat
        self.name = name
        self.links = {}

    def senid(self, idx):
        """
        Get senone ID from state ID.

        @param idx: State ID in this graph
        @type idx: int
        @return: Senone ID for this state
        @rtype: int
        """
        return self.sseq[idx]

    def tprob(self, i, j):
        """
        Get transition probability from state i to state j.

        @param i: State ID for source state.
        @type i: int
        @param j: State ID for destination state.
        @type j: int
        @return: Transition log-probability (base e) from i to j.
        @rtype: float
        """
        return self.tmat[i, j]

    def link(self, others, prob=1.0):
        """
        Add a link to one or more HMMs with total probability prob.
        If others is a tuple, prob will be divided uniformly among
        them (for the time being this is the only way).
        """
        if isinstance(others, tuple):
            for o in others:
                self.link(o, prob / len(others))
        else:
            self.links[others] = prob

    def __getitem__(self, key):
        """
        Index this object.

        @param key: Either a single value, in which case the senone ID
                    for the given state ID is returned, or a tuple
                    (i,j), in which case the transition probability
                    from i to j is returned.
        """
        if isinstance(key, tuple):
            return self.tprob(*key)
        else:
            return self.senid(key)

    def iter_senones(self):
        """
        Iterate over senone IDs (in arbitrary order).

        @return: A generator over all senone IDs in this HMM.
        @rtype: generator(int)
        """
        for s in self.sseq:
            if s != -1:
                yield s

    def senones(self):
        """
        Return all senone IDs (in arbitrary order).

        @return: All senones in this HMM.
        @rtype: (int)
        """
        return tuple(self.iter_senones())

    def __len__(self):
        """Number of states in this HMM."""
        return len(self.sseq)

    def iter_statepairs(self):
        """
        Iterate over state pairs with non-zero transition
        probabilities.

        @return: A generator over state pairs in this HMM.
                 Transitions between emitting states are returned
                 first, followed by transitions into non-emitting
                 states.
        @rtype: generator((int,int))
        """
        return iter(numpy.transpose(self.tmat.nonzero()))


def forward_evaluate(hmm, gmms, alpha=None):
    """
    Calculate the forward variable \\alpha over an HMM or HMMGraph
    for a frame of observations.  The forward variable is defined as::

      \\alpha_t(j) = P(o_1, ..., o_j, q_t = j | \\lambda)
      \\alpha_0(0) = 1.0
      \\alpha_t(j) = \\sum_i \\alpha_{t-1}(i) a_{ij} b_j(o_t)

    Or, for non-emitting states j_N::

      \\alpha_t(j_N) = \\sum_i \\alpha_{t}(i) a_{ij_N}

    Note that non-emitting states transition from the current frame,
    and thus we need to fully calculate \\alpha_{t}(i) for all their
    predecessors before calculating their alpha values.  In other
    words we need to make sure that transitions *into* non-emitting
    states are ordered *after* all others.

    @param hmm: HMM or HMMGraph to evaluate forward variable in
    @param gmms: Collection of GMM scores for current frame, indexed
                 by senone ID.
    @param alpha: List of arrays of previous frames' alpha variables,
                  or None to create a new one.
    @type alpha: [numpy.ndarray]
    @return: Updated list of alpha variables
    @rtype: [numpy.ndarray]
    """
    if alpha is None:
        alpha = numpy.zeros(len(hmm))
        alpha[0] = 1.  # Assume unique initial state
    new_alpha = numpy.zeros(len(alpha))
    for i, j in hmm.iter_statepairs():
        if hmm[j] == -1:
            new_alpha[j] += new_alpha[i] * hmm[i, j]
        else:
            new_alpha[j] += alpha[i] * hmm[i, j] * gmms[hmm[j]]
    return new_alpha


def backward_evaluate(hmm, gmms, beta=None):
    """
    Calculate the backward variable \\beta over an HMM or HMMGraph
    for a frame of observations.  The backward variable is defined as::

      \\beta_t(i) = P(o_{t+1}, ..., o_T | q_t = i, \\lambda)
      \\beta_T(i) = 1.0 for all final states i
      \\beta_t(i) = \\sum_j \\beta_{t+1}(j) a_{ij} b_j(o_t+1)

    Or, for non-emitting states i_N::

      \\beta_t(i_N) = \\sum_j\\beta_{t}(j) a_{i_Nj} b_j(o_{t})

    Since we only have access to one frame of emissions at a time,
    this means that we must calculate beta_{t+1}(i) for non-emitting
    states in the same pass as beta_{t}(i) for emitting states, but
    before any of them.

    By comparison with forward_evaluate, here we need to make sure
    that transitions *out* of non-emitting states are ordered
    *before* all others.  Luckily that is not in conflict with the
    needs of forward evaluation and a single iter_statepairs() will
    work for both.

    @param hmm: HMM or HMMGraph to evaluate backward variable in
    @param gmms: Collection of GMM scores for current frame, indexed
                 by senone ID.
    @param beta: List of arrays of following frames' beta variables,
                 or None to create a new one.
    @type beta: [numpy.ndarray]
    @return: Updated list of beta variables
    @rtype: [numpy.ndarray]
    """
    new_beta = numpy.zeros(len(hmm))
    # FIXME: For some reason these will break the sum(alpha * beta)
    # invariant if we include them in the beta array.  Also, we don't
    # want to modify the beta argument.  So we store them separately.
    nonemit_beta = numpy.zeros(len(hmm))
    if beta is None:
        beta = numpy.zeros(len(hmm))
        nonemit_beta[-1] = 1.  # FIXME: Assumes final state is non-emitting
    for i, j in hmm.iter_statepairs():
        if hmm[i] == -1:  # FIXME: Assumes that hmm[j] != -1
            nonemit_beta[i] += beta[j] * hmm[i, j] * gmms[hmm[j]]
        elif hmm[j] == -1:
            new_beta[i] += nonemit_beta[j] * hmm[i, j]
        else:
            new_beta[i] += beta[j] * hmm[i, j] * gmms[hmm[j]]
    return new_beta


class PhoneHMMFactory(object):
    """
    Create single phone (triphone, etc) HMMs.
    """
    def __init__(self, acmod):
        """
        Build a PhoneHMMFactory.

        @param acmod: Acoustic model containing HMM definitions
        @type acmod: cmusphinx.s3model.S3Model
        """
        self.acmod = acmod

    def create(self, ci, lc='-', rc='-', wpos=None):
        """
        Create an HMM for a triphone (ci, lc, rc, wpos)

        @param ci: Base (context-independent) phone name
        @type ci: string
        @param lc: Left context phone name (or '-' for none)
        @type lc: string
        @param rc: Right context phone name (or '-' for none)
        @type rc: string
        @param wpos: Word position, one of:

          - i: Word-internal phone
          - b: Word-initial phone
          - e: Word-final phone
          - s: Single-phone word (both initial and final)

        @type wpos: string
        """
        pid = self.acmod.mdef.phone_id(ci, lc, rc, wpos)
        ssid = self.acmod.mdef.pid2ssid(pid)
        return HMM(self.acmod.mdef.sseq[ssid],
                   self.acmod.tmat[self.acmod.mdef.pid2tmat(pid)],
                   (ci, lc, rc, wpos))


class SentenceHMMFactory(object):
    """Create sentence HMMs"""
    def __init__(self, acmod, dictionary):
        self.acmod = acmod
        self.dictionary = dictionary
        self.phone_factory = PhoneHMMFactory(acmod)

    def create(self, words):
        """
        Create a sentence HMM from a list of words.

        FIXME: not implemented yet!

        @param words: sequence of word names
        @type words: (string)
        """
        # Parse words into a phone sequence
        # Build phone HMMs and concatenate them