Skip to content

Commit

Permalink
fix: drop deprecated import
Browse files Browse the repository at this point in the history
https://github.com/OpenVoiceOS/ovos_skill_installer has been archived for a LONG time
  • Loading branch information
JarbasAl authored Nov 20, 2024
1 parent 432bdd0 commit 56f3e81
Showing 1 changed file with 110 additions and 3 deletions.
113 changes: 110 additions & 3 deletions ovos_stt_plugin_vosk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import json
import os
import shutil
import tarfile
import zipfile
from os import makedirs
from os.path import join, isdir, exists
from queue import Queue
from tempfile import mkstemp
from time import sleep
from os.path import join, exists

import requests
from ovos_plugin_manager.templates.stt import STT, StreamThread, StreamingSTT
from ovos_skill_installer import download_extract_zip, download_extract_tar
from ovos_utils.log import LOG
from ovos_utils.network_utils import is_connected
from ovos_utils.xdg_utils import xdg_data_home
from queue import Queue
from speech_recognition import AudioData
from vosk import Model as KaldiModel, KaldiRecognizer

Expand Down Expand Up @@ -238,3 +245,103 @@ def create_streaming_thread(self):
return VoskKaldiStreamThread(
self.queue, self.lang, self.model, self.verbose
)
def download(url, file=None, session=None):
"""
Pass file as a filename, open file object, or None to return the request bytes
Args:
url (str): URL of file to download
file (Union[str, io, None]): One of the following:
- Filename of output file
- File opened in binary write mode
- None: Return raw bytes instead
Returns:
Union[bytes, None]: Bytes of file if file is None
"""

if isinstance(file, str):
file = open(file, 'wb')
try:
if session:
content = session.get(url).content
else:
content = requests.get(url).content
if file:
file.write(content)
else:
return content
finally:
if file:
file.close()


def download_extract_tar(tar_url, folder, tar_filename='',
skill_folder_name=None, session=None):
"""
Download and extract the tar at the url to the given folder
Args:
tar_url (str): URL of tar file to download
folder (str): Location of parent directory to extract to. Doesn't have to exist
tar_filename (str): Location to download tar. Default is to a temp file
skill_folder_name (str): rename extracted skill folder to this
"""
try:
makedirs(folder)
except OSError:
if not isdir(folder):
raise
if not tar_filename:
fd, tar_filename = mkstemp('.tar.gz')
download(tar_url, os.fdopen(fd, 'wb'), session=session)
else:
download(tar_url, tar_filename, session=session)

with tarfile.open(tar_filename) as tar:
tar.extractall(path=folder)

if skill_folder_name:
with tarfile.open(tar_filename) as tar:
for p in tar.getnames():
original_folder = p.split("/")[0]
break
original_folder = join(folder, original_folder)
final_folder = join(folder, skill_folder_name)
shutil.move(original_folder, final_folder)


def download_extract_zip(zip_url, folder, zip_filename="",
skill_folder_name=None, session=None):
"""
Download and extract the zip at the url to the given folder
Args:
zip_url (str): URL of zip file to download
folder (str): Location of parent directory to extract to. Doesn't have to exist
zip_filename (str): Location to download zip. Default is to a temp file
skill_folder_name (str): rename extracted skill folder to this
"""
try:
makedirs(folder)
except OSError:
if not isdir(folder):
raise
if not zip_filename:
fd, zip_filename = mkstemp('.tar.gz')
download(zip_url, os.fdopen(fd, 'wb'), session=session)
else:
download(zip_url, zip_filename, session=session)

with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
zip_ref.extractall(folder)

if skill_folder_name:
with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
for p in zip_ref.namelist():
original_folder = p.split("/")[0]
break

original_folder = join(folder, original_folder)
final_folder = join(folder, skill_folder_name)
shutil.move(original_folder, final_folder)

0 comments on commit 56f3e81

Please sign in to comment.