Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spliting logic to a cleaner code #27

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Whisper also requires the command-line tool [`ffmpeg`](https://ffmpeg.org/) to b
```
# on Ubuntu or Debian
sudo apt update && sudo apt install ffmpeg
sudo apt-get install libasound-dev portaudio19-dev libportaudio2 libportaudiocpp0

# on Arch Linux
sudo pacman -S ffmpeg
Expand Down
14 changes: 14 additions & 0 deletions audio_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class AudioUtil:

@staticmethod
def write_temp_audio_file(temp_file,wav_data):
# Write wav data to the temporary file as bytes.
with open(temp_file, 'w+b') as f:
f.write(wav_data.read())

@staticmethod
def concat_data_to_current_audio(last_sample,data_queue):
while not data_queue.empty():
data = data_queue.get()
last_sample += data
return last_sample
Empty file modified demo.gif
100644 → 100755
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file modified requirements.txt
100644 → 100755
Empty file.
74 changes: 74 additions & 0 deletions system_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import argparse
import speech_recognition as sr

from sys import platform

class ParserValues:
model: str
non_english: bool
energy_threshold: int
record_timeout: float
silence_timeout: float
default_microphone: str

def __init__(self, model, non_english, energy_threshold, record_timeout, silence_timeout, default_microphone):
self.model = model
self.non_english = non_english
self.energy_threshold = energy_threshold
self.record_timeout = record_timeout
self.silence_timeout = silence_timeout
self.default_microphone = default_microphone

@classmethod
def parser_validation(cls,parser):
parser.add_argument("--model", default="medium", help="Model to use",
choices=["tiny", "base", "small", "medium", "large"])
parser.add_argument("--non_english", action='store_true',
help="Don't use the english model.")
parser.add_argument("--energy_threshold", default=1000,
help="Energy level for mic to detect.", type=int)
parser.add_argument("--record_timeout", default=2,
help="How real time the recording is in seconds.", type=float)
parser.add_argument("--silence_timeout", default=3,
help="How much empty space between recordings before we "
"consider it a new line in the transcription.", type=float)
if 'linux' in platform:
parser.add_argument("--default_microphone", default='pulse',
help="Default microphone name for SpeechRecognition. "
"Run this with 'list' to view available Microphones.", type=str)
args = parser.parse_args()
return args

@classmethod
def fromSystemArguments(cls):
parser = argparse.ArgumentParser()
args = cls.parser_validation(parser)
return cls(
model=args.model,
non_english=args.non_english,
energy_threshold=args.energy_threshold,
record_timeout=args.record_timeout,
silence_timeout=args.silence_timeout,
default_microphone=args.default_microphone
)

class AudioDeviceConfiguration:

@staticmethod
def get_microphone_device_index(mic_name):
#If is not a linux system, then return None
if not 'linux' in platform:
return None
#If is requesting fot the list, print it and exit the program
if not mic_name or mic_name == 'list':
print("Available microphone devices are: ")
for index, name in enumerate(sr.Microphone.list_microphone_names()):
print(f"Microphone with name \"{name}\" found")
exit()
#If non of the above, then return the microphone found or None
device_index = None
for index, name in enumerate(sr.Microphone.list_microphone_names()):
if mic_name in name:
device_index = index
break
return device_index
278 changes: 144 additions & 134 deletions transcribe_demo.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#! python3.7

import argparse
import io
import os
import speech_recognition as sr
Expand All @@ -11,142 +10,153 @@
from queue import Queue
from tempfile import NamedTemporaryFile
from time import sleep
from sys import platform


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--model", default="medium", help="Model to use",
choices=["tiny", "base", "small", "medium", "large"])
parser.add_argument("--non_english", action='store_true',
help="Don't use the english model.")
parser.add_argument("--energy_threshold", default=1000,
help="Energy level for mic to detect.", type=int)
parser.add_argument("--record_timeout", default=2,
help="How real time the recording is in seconds.", type=float)
parser.add_argument("--phrase_timeout", default=3,
help="How much empty space between recordings before we "
"consider it a new line in the transcription.", type=float)
if 'linux' in platform:
parser.add_argument("--default_microphone", default='pulse',
help="Default microphone name for SpeechRecognition. "
"Run this with 'list' to view available Microphones.", type=str)
args = parser.parse_args()

# The last time a recording was retreived from the queue.
phrase_time = None
# Current raw audio bytes.
last_sample = bytes()
# Thread safe Queue for passing data from the threaded recording callback.
data_queue = Queue()
# We use SpeechRecognizer to record our audio because it has a nice feauture where it can detect when speech ends.
recorder = sr.Recognizer()
recorder.energy_threshold = args.energy_threshold
# Definitely do this, dynamic energy compensation lowers the energy threshold dramtically to a point where the SpeechRecognizer never stops recording.
recorder.dynamic_energy_threshold = False

# Important for linux users.
# Prevents permanent application hang and crash by using the wrong Microphone
if 'linux' in platform:
mic_name = args.default_microphone
if not mic_name or mic_name == 'list':
print("Available microphone devices are: ")
for index, name in enumerate(sr.Microphone.list_microphone_names()):
print(f"Microphone with name \"{name}\" found")
return
else:
for index, name in enumerate(sr.Microphone.list_microphone_names()):
if mic_name in name:
source = sr.Microphone(sample_rate=16000, device_index=index)
break
else:
source = sr.Microphone(sample_rate=16000)

# Load / Download model
model = args.model
if args.model != "large" and not args.non_english:
model = model + ".en"
audio_model = whisper.load_model(model)

record_timeout = args.record_timeout
phrase_timeout = args.phrase_timeout

temp_file = NamedTemporaryFile().name
transcription = ['']

with source:
recorder.adjust_for_ambient_noise(source)

def record_callback(_, audio:sr.AudioData) -> None:
"""
Threaded callback function to recieve audio data when recordings finish.
audio: An AudioData containing the recorded bytes.
"""
# Grab the raw bytes and push it into the thread safe queue.
data = audio.get_raw_data()
data_queue.put(data)

# Create a background thread that will pass us raw audio bytes.
# We could do this manually but SpeechRecognizer provides a nice helper.
recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout)

# Cue the user that we're ready to go.
print("Model loaded.\n")

while True:
try:
now = datetime.utcnow()
# Pull raw recorded audio from the queue.
if not data_queue.empty():
phrase_complete = False
# If enough time has passed between recordings, consider the phrase complete.
# Clear the current working audio buffer to start over with the new data.
if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout):
last_sample = bytes()
phrase_complete = True
# This is the last time we received new audio data from the queue.
phrase_time = now

# Concatenate our current audio data with the latest audio data.
while not data_queue.empty():
data = data_queue.get()
last_sample += data

# Use AudioData to convert the raw data to wav data.
audio_data = sr.AudioData(last_sample, source.SAMPLE_RATE, source.SAMPLE_WIDTH)
wav_data = io.BytesIO(audio_data.get_wav_data())

# Write wav data to the temporary file as bytes.
with open(temp_file, 'w+b') as f:
f.write(wav_data.read())

# Read the transcription.
result = audio_model.transcribe(temp_file, fp16=torch.cuda.is_available())
text = result['text'].strip()

# If we detected a pause between recordings, add a new item to our transcripion.
# Otherwise edit the existing one.
if phrase_complete:
transcription.append(text)
else:
transcription[-1] = text
from system_configuration import ParserValues, AudioDeviceConfiguration
from audio_util import AudioUtil

class SpeechHandler:
def __init__(self):
self.args = ParserValues.fromSystemArguments()
# The last time a recording was retreived from the queue.
self.phrase_time = None

# Current raw audio bytes.
self.last_sample = bytes()

# Thread safe Queue for passing data from the threaded recording callback.
self.data_queue = Queue()

# We use SpeechRecognizer to record our audio because it has a nice feauture where it can detect when speech ends.
self.recorder = sr.Recognizer()

# Definitely do this, dynamic energy compensation lowers the energy threshold dramtically to a point where the SpeechRecognizer never stops recording.
self.recorder.dynamic_energy_threshold = False

# Important for linux users.
# Prevents permanent application hang and crash by using the wrong Microphone
self.device_index = AudioDeviceConfiguration.get_microphone_device_index(self.args.default_microphone)

# Getting names for the Temporary Files
self.temp_file = NamedTemporaryFile().name

# Load / Download model
self.audio_model = self.load_mode(self.args)

# Setting values according to the args
self.recorder.energy_threshold = self.args.energy_threshold
self.record_timeout = self.args.record_timeout
self.silence_timeout = self.args.silence_timeout
self.transcription = ['']

self.generate_audio_source()

# Cue the user that we're ready to go.
print("Model loaded.\n")

def load_mode(self,args):
ONLY_ENGLISH = False
model = args.model
if args.model != "large" and not args.non_english and ONLY_ENGLISH:
model = model + ".en"
return whisper.load_model(model)

def generate_audio_source(self):
self.source = sr.Microphone(sample_rate=16000,device_index=self.device_index)
with self.source:
self.recorder.adjust_for_ambient_noise(self.source)

def record_callback(_, audio:sr.AudioData) -> None:
"""
Threaded callback function to recieve audio data when recordings finish.
audio: An AudioData containing the recorded bytes.
"""
# Grab the raw bytes and push it into the thread safe queue.
data = audio.get_raw_data()
self.data_queue.put(data)

# Create a background thread that will pass us raw audio bytes.
# We could do this manually but SpeechRecognizer provides a nice helper.
self.recorder.listen_in_background(self.source, record_callback, phrase_time_limit=self.record_timeout)

def read_complete_audio(self):
# Read the transcription.
result = self.audio_model.transcribe(self.temp_file, fp16=torch.cuda.is_available())
self.transcription = self.result_transcription_handler(result,True)
self.show_transcription()

def execute(self):
is_speaking = False
while True:
try:
# Pull raw recorded audio from the queue.
if not self.data_queue.empty():
self.show_hearing()
# If enough time has passed between recordings, consider the phrase complete.
# Clear the current working audio buffer to start over with the new data.
has_silence_timeout = self.silence_time_is_up()
if(has_silence_timeout): self.last_sample = bytes()

# This is the last time we received new audio data from the queue.
is_speaking = True
self.phrase_time = datetime.utcnow()

# Concatenate our current audio data with the latest audio data.
self.last_sample = AudioUtil.concat_data_to_current_audio(self.last_sample,self.data_queue)

# Use AudioData to convert the raw data to wav data.
audio_data = sr.AudioData(self.last_sample, self.source.SAMPLE_RATE, self.source.SAMPLE_WIDTH)
wav_data = io.BytesIO(audio_data.get_wav_data())

# Write wav data to the temporary file as bytes.
AudioUtil.write_temp_audio_file(self.temp_file,wav_data)

# Clear the console to reprint the updated transcription.
os.system('cls' if os.name=='nt' else 'clear')
for line in transcription:
print(line)
# Flush stdout.
print('', end='', flush=True)
else:
if(is_speaking and self.silence_time_is_up()):
self.read_complete_audio()
is_speaking = False

except KeyboardInterrupt:
break
# Infinite loops are bad for processors, must sleep.
sleep(0.25)

print("\n\nTranscription:")
for line in self.transcription:
print(line)

def silence_time_is_up(self):
silence_timeout = self.silence_timeout
phrase_time = self.phrase_time
if(phrase_time is None): return False
now = datetime.utcnow()
elapsed_time_delta = now - phrase_time
has_silence_timeout = phrase_time and elapsed_time_delta > timedelta(seconds=silence_timeout)
return has_silence_timeout

def result_transcription_handler(self,result,has_silence_timeout):
text = result['text'].strip()
if(text is None or text == ""): return self.transcription
# If we detected a pause between recordings, add a new item to our transcripion.
# Otherwise edit the existing one.
if has_silence_timeout:
self.transcription.append(text)
else:
self.transcription[-1] = text
return self.transcription

# Infinite loops are bad for processors, must sleep.
sleep(0.25)
except KeyboardInterrupt:
break
def show_transcription(self):
# Clear the console to reprint the updated transcription.
os.system('cls' if os.name=='nt' else 'clear')
for line in self.transcription:
print(line)
# Flush stdout.
print('', end='', flush=True)

print("\n\nTranscription:")
for line in transcription:
print(line)
def show_hearing(self):
os.system('cls' if os.name=='nt' else 'clear')
print("Escuchando...")
print('', end='', flush=True)


if __name__ == "__main__":
main()
speechHandler = SpeechHandler()
speechHandler.execute()