1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
# -*- coding: utf-8
# This file is part of Cicero TTS.
# Cicero TTS: A Small, Fast and Free Text-To-Speech Engine.
# Copyright (C) 2003-2008 Nicolas Pitre <nico@cam.org>
# Copyright (C) 2003-2008 Stéphane Doyon <s.doyon@videotron.ca>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2.
# See the accompanying COPYING file for more details.
#
# This program comes with ABSOLUTELY NO WARRANTY.
# This module ties everything together. It handles the main loop and
# communication between components.
# profiling
from profiling import *
m = ProfMonitor('main.import')
from select import select
import phonetizer, mbrola_prog, sndoutput, version, config
import tracing
def trace(msg):
mod = 'main'
tracing.trace(mod+': '+msg)
~m
class SpeechServ:
def __init__(self, appFeedClass):
self.appfeed = appFeedClass()
self.phozer = None
self.mbrola = mbrola_prog.Mbrola()
self.samplingRate = self.mbrola.voiceRate()
self.expand = config.mbrola_t
self.sndoutput = sndoutput.SndOutput(self.samplingRate)
def roll(self):
wait = self.sndoutput.inputSleepTime()
if wait:
trace('select ignoring mbrola')
fds = [self.appfeed.selectfd()]
if self.phozer:
wait = min(wait, 0.15)
m = ProfMonitor('main.delibWait')
r,w,x = select(fds, [], fds, wait)
~m
else:
trace('selecting')
fds = [self.appfeed.selectfd(), self.mbrola.selectfd()]
if self.phozer:
t = 0.15
else:
t = None
r,w,x = select(fds, [], fds, t)
fds_desc = {
self.appfeed.selectfd(): 'appfeed',
self.mbrola.selectfd(): 'mbrola'}
trace('selected: [%s]'
% ','.join([fds_desc[f] for f in r+x]))
while self.appfeed.selectfd() in r+x:
action,input = self.appfeed.handle()
if action is 'M':
trace('resetting')
self.phozer = None
self.mbrola.reset()
self.sndoutput.reset()
elif action is 'S':
trace('new utterance: <%s>' % input)
if not self.phozer:
trace('new phozer')
self.phozer = phonetizer.Phonetizer(self.samplingRate)
self.phozer.feed(input)
wait = 0
elif action is 'T':
self.expand = input
if not self.appfeed.gotMore():
break
if self.phozer:
spokenSamples = self.sndoutput.spokenPos()
inx, finished = self.phozer.index(spokenSamples)
if inx is not None:
trace('send index %d' % inx)
self.appfeed.sendIndex(inx)
if finished:
trace('phozer says finished')
self.phozer = None
self.sndoutput.reset()
trace(profReport())
if self.mbrola.selectfd() in r+x:
trace('Getting from mbrola')
sound, nsamples = self.mbrola.get()
if sound and self.phozer:
trace('passing mbrola output')
self.sndoutput.give(sound)
if nsamples:
trace('mbrola finished')
self.phozer.produced(nsamples)
if self.phozer.isDone():
trace('phozer tells sndoutput that it\'s got '
'everything')
self.sndoutput.allGiven()
if not wait and self.phozer and self.mbrola.isFinished():
trace('Getting phos')
phos, dur = self.phozer.get(self.expand)
if phos:
trace('passing phonemes to mbrola')
self.mbrola.say(phos, dur,self.expand)
def run(self):
self.phozer = phonetizer.Phonetizer(self.samplingRate)
# FIXME: this introduces bogus indexes. They're filtered at
# the appFeed level but that's not so clean.
self.phozer.feed(version.banner)
while 1:
self.roll()
|