-
Notifications
You must be signed in to change notification settings - Fork 5
/
streaming-asr-google.eagi
184 lines (139 loc) · 6.6 KB
/
streaming-asr-google.eagi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/python3
DEBUG = False
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/var/lib/asterisk/agi-bin/google-cloud-credentials.json"
import sys
from asterisk.agi import *
from google.cloud import speech
from google.api_core import client_options
if DEBUG:
import logging
logging.basicConfig(level=logging.DEBUG, filename='/var/lib/asterisk/agi-bin/debug_python.log', filemode='w', format='[%(levelname)s]: %(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
logging.debug("Starting EAGI")
##### Start AGI connector
agi = AGI()
if DEBUG:
agi.verbose("EAGI started")
##### Read command line arguments
# Use "default" for default model, "enhanced" for enhanced default model or "phone_call" for model finetuned in phone calls. mandatory argument.
modelInput = sys.argv[1] # be careful because some languages do not have "phone_call" model: https://cloud.google.com/speech-to-text/docs/languages
model = None
enhancedModel = False
if modelInput == "enhanced":
enhancedModel = True
elif modelInput == "phone_call": # phone_call models just support single_utterance mode if use_enhanced is True: https://cloud.google.com/speech-to-text/docs/reference/rpc/google.cloud.speech.v1#google.cloud.speech.v1.StreamingRecognitionConfig
model = modelInput
enhancedModel = True
mainLanguage = sys.argv[2] # main recognition language. mandatory argument
alternativeLanguages = [] # alternative recognition languages (max 3 alternative languages). optional argument
if len(sys.argv[3:]) > 0 and model != "phone_call": # alternative languages is not supported in "phone_call" model: https://cloud.google.com/speech-to-text/docs/reference/rest/v1p1beta1/RecognitionConfig
alternativeLanguages = sys.argv[3:]
if DEBUG:
agi.verbose(str(enhancedModel) + " " + str(model) + " " + str(mainLanguage) + " " + str(alternativeLanguages))
##### Google Response Parser
class Parser:
"""Parse google responses, check if the user was silence and set AGI variable with response"""
def __init__(self, stream) -> None:
self.stream = stream
def isEndOfSpeech(self, response):
if str(response.speech_event_type) == "SpeechEventType.END_OF_SINGLE_UTTERANCE":
if DEBUG:
logging.debug("END_OF_SINGLE_UTTERANCE detected")
logging.debug(str(self.stream.getRecordingTime()))
return True
return False
def parseGoogleResponse(self, responses):
"""
The responses passed is a generator that will block until a response
is provided by the server.
"""
speechDetected = False
for response in responses:
if self.isEndOfSpeech(response):
if DEBUG:
agi.verbose("End of speech detected")
break
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
speechDetected = True
interimResults = result.alternatives[0].transcript
# reaching this point means interim results was returned and the user started saying something
if speechDetected: # get final result
response = next(responses)
transcript = response.results[0].alternatives[0].transcript
agi.set_variable("TRANSCRIPT", transcript)
if DEBUG:
logging.debug("Final transcript: " + transcript)
else:
agi.set_variable("TRANSCRIPT", "_SILENCE_")
if DEBUG:
logging.debug("No speech detected. User in silence")
##### Audio Streaming Generator
class AudioStream(object):
"""Opens a phone call stream as a generator yielding the audio chunks."""
def __init__(self):
self.close = False
self.samplerate = 8000
self.chunk = 1024
self.totalSignal = 0
def __enter__(self):
if DEBUG:
agi.verbose("Opening audio stream")
self.file = os.fdopen(3, 'rb') # open file descriptor 3 with phone call audio
return self
def __exit__(self, type=None, value=None, traceback=None):
if DEBUG:
logging.debug("Closing audio stream")
if type != None: # error detected. log to file because agi.verbose does not work anymore
logging.error("Error while streaming")
logging.error(value)
logging.error(agi._got_sighup) # if hang up, _got_sighup == True
self.close = True
os.close(3) # close file descriptor 3
def getRecordingTime(self):
return self.totalSignal / 16076.8 # approximately
def generator(self):
while not self.close:
audioData = self.file.read(self.chunk)
self.totalSignal += self.chunk
yield audioData
##### Configure google speech service
if DEBUG:
agi.verbose("Configuring Google STT model")
# specify europe region
client_options = client_options.ClientOptions(
api_endpoint="eu-speech.googleapis.com"
)
client = speech.SpeechClient(client_options=client_options)
# client = speech.SpeechClient()
# single utterance only works with normal model or phone_call + enhanced model
# check in google if the desired language has support for phone_call models (german does not have)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=8000,
language_code=mainLanguage,
alternative_language_codes=alternativeLanguages,
model=model,
use_enhanced=enhancedModel
)
streaming_config = speech.StreamingRecognitionConfig(config=config, interim_results=True, single_utterance=True)
##### Start to listen
if DEBUG:
agi.verbose("Creating audio stream and starting speech recognition")
with AudioStream() as stream:
audio_generator = stream.generator()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
) # requests is also a generator
responses = client.streaming_recognize(streaming_config, requests)
parser = Parser(stream)
# parser will process google responses until the for loop in "parseGoogleResponse" is break
parser.parseGoogleResponse(responses)
if DEBUG: # this point is reached only if there was no error in the speech recognition.
# if the user hangs up the call during this eagi, the error will be catched inside the exit function and this agi.verbose won't be executed
# if you want to continue the script even if error is present, you need to return True in the exit function
agi.verbose("Closed audio stream sucessfully. Finalising speech to text eagi script")