diff --git a/api/enums.py b/api/enums.py index 2b9a833d..70f32e2c 100644 --- a/api/enums.py +++ b/api/enums.py @@ -130,6 +130,7 @@ class SummarizeProvider(Enum): class KeyboardRecordingType(Enum): SINGLE = "single" MACRO = "macro" + MACRO_ADVANCED = "macro_advanced" class WingmanProRegion(Enum): diff --git a/api/interface.py b/api/interface.py index 512692f8..56aa6fcc 100644 --- a/api/interface.py +++ b/api/interface.py @@ -333,9 +333,17 @@ class CommandKeyboardConfig(BaseModel): hotkey_codes: Optional[list[int]] = None """The hotkey codes. Can be a single key like 65 or a combination like 162+160+65. Optional.""" + hotkey_extended: Optional[bool] = None + """Whether the hotkey is an extended key. Optional.""" + hold: Optional[float] = None """The duration the key will be pressed in seconds. Optional.""" + press: Optional[bool] = None + """Whether to press the key. Optional.""" + + release: Optional[bool] = None + """Whether to release the key. Optional.""" class CommandMouseConfig(BaseModel): button: Optional[str] = None diff --git a/keyboard/keyboard/__init__.py b/keyboard/keyboard/__init__.py index ba55f582..3d9e2c86 100644 --- a/keyboard/keyboard/__init__.py +++ b/keyboard/keyboard/__init__.py @@ -506,6 +506,12 @@ def send(hotkey, do_press=True, do_release=True): _listener.is_replaying = False +def direct_event(scancode, event_type): + """ + Sends a key event directly to the OS, without any processing. + """ + _os_keyboard.direct_event(scancode, event_type) + # Alias. press_and_release = send diff --git a/keyboard/keyboard/_darwinkeyboard.py b/keyboard/keyboard/_darwinkeyboard.py index 31884d0a..6803cb68 100644 --- a/keyboard/keyboard/_darwinkeyboard.py +++ b/keyboard/keyboard/_darwinkeyboard.py @@ -472,6 +472,13 @@ def handler(self, proxy, e_type, event, refcon): def init(): key_controller = KeyController() +def direct_event(scan_code, event_type): + """ Sends a key event directly, without any processing """ + if event_type == 0 or event_type == 1: + key_controller.press(scan_code) + elif event_type == 2 or event_type == 3: + key_controller.release(scan_code) + def press(scan_code): """ Sends a 'down' event for the specified scan code """ key_controller.press(scan_code) diff --git a/keyboard/keyboard/_keyboard_event.py b/keyboard/keyboard/_keyboard_event.py index 4a3b4cf3..525d9588 100644 --- a/keyboard/keyboard/_keyboard_event.py +++ b/keyboard/keyboard/_keyboard_event.py @@ -20,20 +20,22 @@ class KeyboardEvent(object): device = None modifiers = None is_keypad = None + is_extended = None - def __init__(self, event_type, scan_code, name=None, time=None, device=None, modifiers=None, is_keypad=None): + def __init__(self, event_type, scan_code, name=None, time=None, device=None, modifiers=None, is_keypad=None, is_extended=None): self.event_type = event_type self.scan_code = scan_code self.time = now() if time is None else time self.device = device self.is_keypad = is_keypad self.modifiers = modifiers + self.is_extended = is_extended if name: self.name = normalize_name(name) def to_json(self, ensure_ascii=False): attrs = dict( - (attr, getattr(self, attr)) for attr in ['event_type', 'scan_code', 'name', 'time', 'device', 'is_keypad', 'modifiers'] + (attr, getattr(self, attr)) for attr in ['event_type', 'scan_code', 'name', 'time', 'device', 'is_keypad', 'modifiers', 'is_extended'] if not attr.startswith('_') ) return json.dumps(attrs, ensure_ascii=ensure_ascii) diff --git a/keyboard/keyboard/_nixkeyboard.py b/keyboard/keyboard/_nixkeyboard.py index 21e18977..dabaa74d 100644 --- a/keyboard/keyboard/_nixkeyboard.py +++ b/keyboard/keyboard/_nixkeyboard.py @@ -152,6 +152,9 @@ def listen(callback): is_keypad = scan_code in keypad_scan_codes callback(KeyboardEvent(event_type=event_type, scan_code=scan_code, name=name, time=time, device=device_id, is_keypad=is_keypad, modifiers=pressed_modifiers_tuple)) +def direct_event(scan_code, event_type): + write_event(scan_code, event_type == 0 or event_type == 1) + def write_event(scan_code, is_down): build_device() device.write_event(EV_KEY, scan_code, int(is_down)) diff --git a/keyboard/keyboard/_winkeyboard.py b/keyboard/keyboard/_winkeyboard.py index bd25f8d0..63b40eb8 100644 --- a/keyboard/keyboard/_winkeyboard.py +++ b/keyboard/keyboard/_winkeyboard.py @@ -5,16 +5,10 @@ and can introduce very unpythonic failure modes, such as segfaults and low level memory leaks. But it is also dependency-free, very performant well documented on Microsoft's website and scattered examples. - -# TODO: -- Keypad numbers still print as numbers even when numlock is off. -- No way to specify if user wants a keypad key or not in `map_char`. """ from __future__ import unicode_literals -import re import atexit import traceback -import pydirectinput from threading import Lock from collections import defaultdict @@ -26,23 +20,10 @@ except NameError: pass -# This part is just declaring Win32 API structures using ctypes. In C -# this would be simply #include "windows.h". - import ctypes -from ctypes import c_short, c_char, c_uint8, c_int32, c_int, c_uint, c_uint32, c_long, Structure, WINFUNCTYPE, POINTER -from ctypes.wintypes import WORD, DWORD, BOOL, HHOOK, MSG, LPWSTR, WCHAR, WPARAM, LPARAM, LONG, HMODULE, LPCWSTR, HINSTANCE, HWND -LPMSG = POINTER(MSG) -ULONG_PTR = POINTER(DWORD) - -kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) -GetModuleHandleW = kernel32.GetModuleHandleW -GetModuleHandleW.restype = HMODULE -GetModuleHandleW.argtypes = [LPCWSTR] +from ctypes import wintypes -#https://github.com/boppreh/mouse/issues/1 -#user32 = ctypes.windll.user32 -user32 = ctypes.WinDLL('user32', use_last_error = True) +LPMSG = ctypes.POINTER(wintypes.MSG) VK_PACKET = 0xE7 @@ -50,36 +31,55 @@ INPUT_KEYBOARD = 1 INPUT_HARDWARE = 2 +KEYEVENTF_KEYDOWN = 0x00 +KEYEVENTF_EXTENDED = 0x01 KEYEVENTF_KEYUP = 0x02 KEYEVENTF_UNICODE = 0x04 +KEYEVENTF_SCANCODE = 0x08 -class KBDLLHOOKSTRUCT(Structure): - _fields_ = [("vk_code", DWORD), - ("scan_code", DWORD), - ("flags", DWORD), - ("time", c_int), - ("dwExtraInfo", ULONG_PTR)] +# https://msdn.microsoft.com/en-us/library/windows/desktop/ms646307(v=vs.85).aspx +MAPVK_VK_TO_CHAR = 2 +MAPVK_VK_TO_VSC = 0 +MAPVK_VSC_TO_VK = 1 +MAPVK_VK_TO_VSC_EX = 4 +MAPVK_VSC_TO_VK_EX = 3 + +LLKHF_INJECTED = 0x00000010 + +user32 = ctypes.WinDLL('user32', use_last_error = True) +kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) +GetModuleHandleW = kernel32.GetModuleHandleW +GetModuleHandleW.restype = wintypes.HMODULE +GetModuleHandleW.argtypes = [wintypes.LPCWSTR] +wintypes.ULONG_PTR = wintypes.WPARAM + +class KBDLLHOOKSTRUCT(ctypes.Structure): + _fields_ = [("vk_code", wintypes.DWORD), + ("scan_code", wintypes.DWORD), + ("flags", wintypes.DWORD), + ("time", ctypes.c_int), + ("dwExtraInfo", wintypes.ULONG_PTR)] # Included for completeness. class MOUSEINPUT(ctypes.Structure): - _fields_ = (('dx', LONG), - ('dy', LONG), - ('mouseData', DWORD), - ('dwFlags', DWORD), - ('time', DWORD), - ('dwExtraInfo', ULONG_PTR)) + _fields_ = (('dx', wintypes.LONG), + ('dy', wintypes.LONG), + ('mouseData', wintypes.DWORD), + ('dwFlags', wintypes.DWORD), + ('time', wintypes.DWORD), + ('dwExtraInfo', wintypes.ULONG_PTR)) class KEYBDINPUT(ctypes.Structure): - _fields_ = (('wVk', WORD), - ('wScan', WORD), - ('dwFlags', DWORD), - ('time', DWORD), - ('dwExtraInfo', ULONG_PTR)) + _fields_ = (("wVk", wintypes.WORD), + ("wScan", wintypes.WORD), + ("dwFlags", wintypes.DWORD), + ("time", wintypes.DWORD), + ("dwExtraInfo", wintypes.ULONG_PTR)) class HARDWAREINPUT(ctypes.Structure): - _fields_ = (('uMsg', DWORD), - ('wParamL', WORD), - ('wParamH', WORD)) + _fields_ = (('uMsg', wintypes.DWORD), + ('wParamL', wintypes.WORD), + ('wParamH', wintypes.WORD)) class _INPUTunion(ctypes.Union): _fields_ = (('mi', MOUSEINPUT), @@ -87,84 +87,51 @@ class _INPUTunion(ctypes.Union): ('hi', HARDWAREINPUT)) class INPUT(ctypes.Structure): - _fields_ = (('type', DWORD), - ('union', _INPUTunion)) + class _INPUT(ctypes.Union): + _fields_ = (("ki", KEYBDINPUT), + ("mi", MOUSEINPUT), + ("hi", HARDWAREINPUT)) + _anonymous_ = ("_input",) + _fields_ = (("type", wintypes.DWORD), + ("_input", _INPUT)) -LowLevelKeyboardProc = WINFUNCTYPE(c_int, WPARAM, LPARAM, POINTER(KBDLLHOOKSTRUCT)) +LowLevelKeyboardProc = ctypes.WINFUNCTYPE(ctypes.c_int, wintypes.WPARAM, wintypes.LPARAM, ctypes.POINTER(KBDLLHOOKSTRUCT)) SetWindowsHookEx = user32.SetWindowsHookExW -SetWindowsHookEx.argtypes = [c_int, LowLevelKeyboardProc, HINSTANCE , DWORD] -SetWindowsHookEx.restype = HHOOK +SetWindowsHookEx.argtypes = [ctypes.c_int, LowLevelKeyboardProc, wintypes.HINSTANCE , wintypes.DWORD] +SetWindowsHookEx.restype = wintypes.HHOOK CallNextHookEx = user32.CallNextHookEx -#CallNextHookEx.argtypes = [c_int , c_int, c_int, POINTER(KBDLLHOOKSTRUCT)] -CallNextHookEx.restype = c_int +CallNextHookEx.restype = ctypes.c_int UnhookWindowsHookEx = user32.UnhookWindowsHookEx -UnhookWindowsHookEx.argtypes = [HHOOK] -UnhookWindowsHookEx.restype = BOOL +UnhookWindowsHookEx.argtypes = [wintypes.HHOOK] +UnhookWindowsHookEx.restype = wintypes.BOOL GetMessage = user32.GetMessageW -GetMessage.argtypes = [LPMSG, HWND, c_uint, c_uint] -GetMessage.restype = BOOL +GetMessage.argtypes = [LPMSG, wintypes.HWND, ctypes.c_uint, ctypes.c_uint] +GetMessage.restype = wintypes.BOOL TranslateMessage = user32.TranslateMessage TranslateMessage.argtypes = [LPMSG] -TranslateMessage.restype = BOOL +TranslateMessage.restype = wintypes.BOOL DispatchMessage = user32.DispatchMessageA DispatchMessage.argtypes = [LPMSG] - -keyboard_state_type = c_uint8 * 256 +keyboard_state_type = ctypes.c_uint8 * 256 GetKeyboardState = user32.GetKeyboardState GetKeyboardState.argtypes = [keyboard_state_type] -GetKeyboardState.restype = BOOL +GetKeyboardState.restype = wintypes.BOOL GetKeyNameText = user32.GetKeyNameTextW -GetKeyNameText.argtypes = [c_long, LPWSTR, c_int] -GetKeyNameText.restype = c_int - -MapVirtualKey = user32.MapVirtualKeyW -MapVirtualKey.argtypes = [c_uint, c_uint] -MapVirtualKey.restype = c_uint +GetKeyNameText.argtypes = [ctypes.c_long, wintypes.LPWSTR, ctypes.c_int] +GetKeyNameText.restype = ctypes.c_int ToUnicode = user32.ToUnicode -ToUnicode.argtypes = [c_uint, c_uint, keyboard_state_type, LPWSTR, c_int, c_uint] -ToUnicode.restype = c_int - -SendInput = user32.SendInput -SendInput.argtypes = [c_uint, POINTER(INPUT), c_int] -SendInput.restype = c_uint - -# https://msdn.microsoft.com/en-us/library/windows/desktop/ms646307(v=vs.85).aspx -MAPVK_VK_TO_CHAR = 2 -MAPVK_VK_TO_VSC = 0 -MAPVK_VSC_TO_VK = 1 -MAPVK_VK_TO_VSC_EX = 4 -MAPVK_VSC_TO_VK_EX = 3 - -VkKeyScan = user32.VkKeyScanW -VkKeyScan.argtypes = [WCHAR] -VkKeyScan.restype = c_short - -LLKHF_INJECTED = 0x00000010 - -WM_KEYDOWN = 0x0100 -WM_KEYUP = 0x0101 -WM_SYSKEYDOWN = 0x104 # Used for ALT key -WM_SYSKEYUP = 0x105 - - -# This marks the end of Win32 API declarations. The rest is ours. - -keyboard_event_types = { - WM_KEYDOWN: KEY_DOWN, - WM_KEYUP: KEY_UP, - WM_SYSKEYDOWN: KEY_DOWN, - WM_SYSKEYUP: KEY_UP, -} +ToUnicode.argtypes = [ctypes.c_uint, ctypes.c_uint, keyboard_state_type, wintypes.LPWSTR, ctypes.c_int, ctypes.c_uint] +ToUnicode.restype = ctypes.c_int # List taken from the official documentation, but stripped of the OEM-specific keys. # Keys are virtual key codes, values are pairs (name, is_keypad). @@ -364,10 +331,6 @@ def get_event_names(scan_code, vk, is_extended, modifiers): unicode_ret = ToUnicode(vk, scan_code, keyboard_state, unicode_buffer, len(unicode_buffer), 0) if unicode_ret and unicode_buffer.value: yield unicode_buffer.value - # unicode_ret == -1 -> is dead key - # ToUnicode has the side effect of setting global flags for dead keys. - # Therefore we need to call it twice to clear those flags. - # If your 6 and 7 keys are named "^6" and "^7", this is the reason. ToUnicode(vk, scan_code, keyboard_state, unicode_buffer, len(unicode_buffer), 0) name_ret = GetKeyNameText(scan_code << 16 | is_extended << 24, name_buffer, 1024) @@ -527,7 +490,7 @@ def process_key(event_type, vk, scan_code, is_extended): altgr_is_pressed = event_type == KEY_DOWN is_keypad = (scan_code, vk, is_extended) in keypad_keys - return callback(KeyboardEvent(event_type=event_type, scan_code=scan_code or -vk, name=name, is_keypad=is_keypad)) + return callback(KeyboardEvent(event_type=event_type, scan_code=scan_code or -vk, name=name, is_keypad=is_keypad, is_extended=is_extended)) def low_level_keyboard_handler(nCode, wParam, lParam): try: @@ -548,10 +511,10 @@ def low_level_keyboard_handler(nCode, wParam, lParam): return CallNextHookEx(None, nCode, wParam, lParam) - WH_KEYBOARD_LL = c_int(13) + WH_KEYBOARD_LL = ctypes.c_int(13) keyboard_callback = LowLevelKeyboardProc(low_level_keyboard_handler) handle = GetModuleHandleW(None) - thread_id = DWORD(0) + thread_id = wintypes.DWORD(0) keyboard_hook = SetWindowsHookEx(WH_KEYBOARD_LL, keyboard_callback, handle, thread_id) # Register to remove the hook when the interpreter exits. Unfortunately a @@ -575,26 +538,51 @@ def map_name(name): scan_code, vk, is_extended, modifiers = entry yield scan_code or -vk, modifiers +def direct_event(code, event_type): + _send_event(code, event_type) + def _send_event(code, event_type): - if code == 541 or code == 57400: - # Alt-gr is difficult to simulate. pydirectinput does work, so we are using it in this case. - if event_type == 0: - pydirectinput.keyDown("altright") - elif event_type == 2: - pydirectinput.keyUp("altright") - elif code > 0: - vk = scan_code_to_vk.get(code, 0) + def _send_event_new(code, vk, event_type, attach_input: bool = False): + # doesnt work without wVk + if not vk: + return 0 + + if attach_input: + fore_thread = user32.GetWindowThreadProcessId(user32.GetForegroundWindow(), None) + app_thread = kernel32.GetCurrentThreadId() + user32.AttachThreadInput(app_thread, fore_thread, True) + + i = INPUT(type=INPUT_KEYBOARD,ki=KEYBDINPUT(wVk=vk,wScan=code,dwFlags=event_type)) + result = user32.SendInput(1, ctypes.byref(i), ctypes.sizeof(i)) + + if attach_input: + user32.AttachThreadInput(app_thread, fore_thread, False) + + return result + + ## keybd_event does work but is deprecated + def _send_event_old(code, vk, event_type): user32.keybd_event(vk, code, event_type, 0) + + # alt gr is 541, but it's actually right alt (56, extended) + if code == 541: + code = 56 + event_type = event_type+1 + vk = 0 + if code < 0: + vk = -code else: - # Negative scan code is a way to indicate we don't have a scan code, - # and the value actually contains the Virtual key code. - user32.keybd_event(-code, 0, event_type, 0) + vk = scan_code_to_vk.get(code, 0) -def press(code): - _send_event(code, 0) + result = _send_event_new(code, vk, event_type, False) + if result == 0: + return _send_event_old(code, vk, event_type) + +def press(code, extended: bool = False): + _send_event(code, 0+int(extended)) -def release(code): - _send_event(code, 2) +def release(code, extended: bool = False): + _send_event(code, 2+int(extended)) def type_unicode(character): # This code and related structures are based on @@ -604,20 +592,17 @@ def type_unicode(character): releases = [] for i in range(0, len(surrogates), 2): higher, lower = surrogates[i:i+2] - structure = KEYBDINPUT(0, (lower << 8) + higher, KEYEVENTF_UNICODE, 0, None) - presses.append(INPUT(INPUT_KEYBOARD, _INPUTunion(ki=structure))) - structure = KEYBDINPUT(0, (lower << 8) + higher, KEYEVENTF_UNICODE | KEYEVENTF_KEYUP, 0, None) - releases.append(INPUT(INPUT_KEYBOARD, _INPUTunion(ki=structure))) + presses.append(INPUT(type=INPUT_KEYBOARD, ki=KEYBDINPUT(wVk=0, wScan=(lower << 8) + higher, dwFlags=KEYEVENTF_UNICODE))) + releases.append(INPUT(type=INPUT_KEYBOARD, ki=KEYBDINPUT(wVk=0, wScan=(lower << 8) + higher, dwFlags=KEYEVENTF_UNICODE | KEYEVENTF_KEYUP))) inputs = presses + releases nInputs = len(inputs) LPINPUT = INPUT * nInputs pInputs = LPINPUT(*inputs) - cbSize = c_int(ctypes.sizeof(INPUT)) - SendInput(nInputs, pInputs, cbSize) + cbSize = ctypes.c_int(ctypes.sizeof(INPUT)) + user32.SendInput(nInputs, pInputs, cbSize) if __name__ == '__main__': _setup_name_tables() import pprint pprint.pprint(to_name) - pprint.pprint(from_name) - #listen(lambda e: print(e.to_json()) or True) + pprint.pprint(from_name) \ No newline at end of file diff --git a/main.py b/main.py index 22efac61..15ebf951 100644 --- a/main.py +++ b/main.py @@ -178,6 +178,10 @@ def custom_openapi(): ) # if a class adds GET/POST endpoints, add them here: app.include_router(core.router) +app.include_router(core.config_service.router) +app.include_router(core.settings_service.router) +app.include_router(core.voice_service.router) + app.include_router(version_check.router) app.include_router(secret_keeper.router) @@ -199,7 +203,7 @@ async def websocket_endpoint(websocket: WebSocket): async def start_secrets(secrets: dict[str, Any]): secret_keeper.post_secrets(secrets) core.startup_errors = [] - await core.load_config() + await core.config_service.load_config() @app.get("/ping", tags=["main"]) @@ -208,9 +212,9 @@ async def ping(): async def async_main(host: str, port: int, sidecar: bool): - errors, config_info = await core.load_config() + await core.config_service.load_config() saved_secrets: list[str] = [] - for error in errors: + for error in core.tower_errors: if ( not sidecar # running standalone and error.error_type == WingmanInitializationErrorType.MISSING_SECRET diff --git a/services/audio_player.py b/services/audio_player.py index a94a4ec5..0e637c2a 100644 --- a/services/audio_player.py +++ b/services/audio_player.py @@ -117,13 +117,13 @@ def finished_callback(): await self.notify_playback_started(wingman_name) async def notify_playback_started(self, wingman_name: str): + await self.playback_events.publish("started", wingman_name) if callable(self.on_playback_started): - self.playback_events.publish("started", wingman_name) await self.on_playback_started(wingman_name) async def notify_playback_finished(self, wingman_name: str): + await self.playback_events.publish("finished", wingman_name) if callable(self.on_playback_finished): - self.playback_events.publish("finished", wingman_name) await self.on_playback_finished(wingman_name) def play_beep(self): diff --git a/services/command_handler.py b/services/command_handler.py index a4362913..67935931 100644 --- a/services/command_handler.py +++ b/services/command_handler.py @@ -1,5 +1,6 @@ import json import asyncio +import math from fastapi import WebSocket import keyboard.keyboard as keyboard from api.commands import ( @@ -48,8 +49,10 @@ async def dispatch(self, message, websocket: WebSocket): RecordMouseActionsCommand(**command), websocket ) elif command_name == "stop_recording": + # Get Enum from string + recording_type = KeyboardRecordingType(command["recording_type"]) await self.handle_stop_recording( - StopRecordingCommand(**command), websocket + StopRecordingCommand(**command), websocket, recording_type ) else: raise ValueError("Unknown command") @@ -109,13 +112,20 @@ async def handle_record_keyboard_actions( # Start timeout # self.timeout_task = WebSocketUser.ensure_async(self._start_timeout(10)) + self.recorded_keys = [] def _on_key_event(event): + if event.event_type == "down" and event.name == "esc": + WebSocketUser.ensure_async(self.handle_stop_recording(None, None, command.recording_type)) + if event.scan_code == 58 or event.scan_code == 70 or (event.scan_code == 69 and event.is_extended): + # let capslock, numlock or scrolllock through, as it changes following keypresses + keyboard.direct_event(event.scan_code, (0 if event.event_type == "down" else 2)+int(event.is_extended)) + self.recorded_keys.append(event) if command.recording_type == KeyboardRecordingType.SINGLE and self._is_hotkey_recording_finished(self.recorded_keys): - WebSocketUser.ensure_async(self.handle_stop_recording(None, None)) + WebSocketUser.ensure_async(self.handle_stop_recording(None, None, command.recording_type)) - self.hook_callback = keyboard.hook(_on_key_event) + self.hook_callback = keyboard.hook(_on_key_event, suppress=True) async def handle_record_mouse_actions( self, command: RecordMouseActionsCommand, websocket: WebSocket @@ -130,7 +140,7 @@ async def handle_record_mouse_actions( ) async def handle_stop_recording( - self, command: StopRecordingCommand, websocket: WebSocket + self, command: StopRecordingCommand, websocket: WebSocket, recording_type: KeyboardRecordingType = KeyboardRecordingType.SINGLE ): if self.hook_callback: keyboard.unhook(self.hook_callback) @@ -138,7 +148,7 @@ async def handle_stop_recording( if self.timeout_task: self.timeout_task.cancel() - actions = self._get_actions_from_recorded_keys(recorded_keys) + actions = self._get_actions_from_recorded_keys(recorded_keys) if recording_type == KeyboardRecordingType.MACRO_ADVANCED else self._get_actions_from_recorded_hotkey(recorded_keys) command = ActionsRecordedCommand(command="actions_recorded", actions=actions) await self.connection_manager.broadcast(command) @@ -159,6 +169,90 @@ async def _start_timeout(self, timeout): def _get_actions_from_recorded_keys(self, recorded): actions: list[CommandActionConfig] = [] + def add_action(name, code, extended, press, release, hold): + if(press or release): + hold = None # reduces yaml size + else: + hold = round(hold, 2) + if(hold < 0.1): + hold = 0.1 # 100ms min hold time + else: + hold = round(round(hold / 0.05) * 0.05, 3) + + if(not extended): + extended = None # reduces yaml size + + # add keyboard action + actions.append(CommandActionConfig(keyboard=CommandKeyboardConfig( + hotkey=name, + hotkey_codes=[code], + hotkey_extended=extended, + press=press, + release=release, + hold=hold) + )) + + def add_wait(duration): + if not duration: + return + duration = round(duration, 2) + if duration < 0.05: + duration = 0.05 # 50ms min wait time + else : + duration = round(round(duration / 0.05) * 0.05, 3) + actions.append(CommandActionConfig(wait=duration)) + + last_last_key_data = [] + last_key_data = [] + key_data = [] + + # Process recorded key events to calculate press durations and wait times + # We are trying to compress press and release events into one action. + # This reduces the amount of actions and increases readability of yaml files. + # Tradeoff is a bit confusing logic below. + for key in recorded: + key_data = [key.name.lower(), key.scan_code, bool(key.is_extended), key.event_type, key.time, 0] + + if(last_key_data): + if key_data[1] == last_key_data[1] and key_data[2] == last_key_data[2] and key_data[3] == last_key_data[3]: + # skip double actions + continue + key_data[5] = key_data[4] - last_key_data[4] # set time diff + + # check if last key was down event + if last_key_data and last_key_data[3] == "down": + # same key? + if key_data[1] == last_key_data[1] and key_data[2] == last_key_data[2]: + # write as compressed action + add_wait(last_key_data[5]) + add_action(key_data[0], key_data[1], key_data[2], None, None, key_data[5]) + else: + # write as separate action + add_wait(last_key_data[5]) + add_action(last_key_data[0], last_key_data[1], last_key_data[2], True, None, 0) + + if last_key_data and last_key_data[3] == "up": + if(last_last_key_data and last_last_key_data[1] != last_key_data[1] or last_last_key_data[2] != last_key_data[2]): + add_wait(last_key_data[5]) + add_action(last_key_data[0], last_key_data[1], last_key_data[2], None, True, 0) + + last_last_key_data = last_key_data + last_key_data = key_data + + # add last action + if key_data and key_data[3] == "down": + add_wait(key_data[5]) + add_action(key_data[0], key_data[1], key_data[2], True, None, 0) + elif key_data and last_key_data and last_last_key_data and key_data[3] == "up" and (last_last_key_data[1] != last_key_data[1] or last_last_key_data[2] != last_key_data[2]): + add_wait(key_data[5]) + add_action(key_data[0], key_data[1], key_data[2], None, True, 0) + + return actions + + def _get_actions_from_recorded_hotkey(self, recorded): + # legacy function used for single key recording + actions: list[CommandActionConfig] = [] + key_down_time = {} # Track initial down times for keys last_up_time = None # Track the last up time to measure durations of inactivity keys_pressed = [] # Track the keys currently pressed in the order they were pressed @@ -211,8 +305,8 @@ def _get_actions_from_recorded_keys(self, recorded): key_config = CommandActionConfig() key_config.keyboard = CommandKeyboardConfig(hotkey=hotkey_name) - key_config.keyboard.hotkey_codes = [key.scan_code for key in keys_pressed] + key_config.keyboard.hotkey_extended = bool(key.is_extended) if press_duration > 0.2 and len(keys_pressed) == 1: key_config.keyboard.hold = round(press_duration, 2) diff --git a/services/config_service.py b/services/config_service.py new file mode 100644 index 00000000..f79aea35 --- /dev/null +++ b/services/config_service.py @@ -0,0 +1,299 @@ +from typing import Optional +from fastapi import APIRouter +from api.interface import ( + ConfigDirInfo, + ConfigWithDirInfo, + ConfigsInfo, + NewWingmanTemplate, + WingmanConfig, + WingmanConfigFileInfo, + WingmanInitializationError, +) +from services.config_manager import ConfigManager +from services.printr import Printr +from services.pub_sub import PubSub + + +class ConfigService: + def __init__(self, config_manager: ConfigManager): + self.printr = Printr() + self.config_manager = config_manager + self.config_events = PubSub() + + self.current_config_dir: ConfigDirInfo = ( + self.config_manager.find_default_config() + ) + self.current_config = None + + self.router = APIRouter() + tags = ["config"] + self.router.add_api_route( + methods=["GET"], + path="/configs", + endpoint=self.get_config_dirs, + response_model=ConfigsInfo, + tags=tags, + ) + self.router.add_api_route( + methods=["GET"], + path="/configs/templates", + endpoint=self.get_config_templates, + response_model=list[ConfigDirInfo], + tags=tags, + ) + self.router.add_api_route( + methods=["GET"], + path="/config", + endpoint=self.get_config, + response_model=ConfigWithDirInfo, + tags=tags, + ) + self.router.add_api_route( + methods=["GET"], + path="/config-dir-path", + endpoint=self.get_config_dir_path, + response_model=str, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], path="/config", endpoint=self.load_config, tags=tags + ) + self.router.add_api_route( + methods=["DELETE"], path="/config", endpoint=self.delete_config, tags=tags + ) + self.router.add_api_route( + methods=["GET"], + path="/config/wingmen", + endpoint=self.get_wingmen_config_files, + response_model=list[WingmanConfigFileInfo], + tags=tags, + ) + self.router.add_api_route( + methods=["GET"], + path="/config/new-wingman", + endpoint=self.get_new_wingmen_template, + response_model=NewWingmanTemplate, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/config/new-wingman", + endpoint=self.add_new_wingman, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/config/wingman/default", + endpoint=self.set_default_wingman, + tags=tags, + ) + self.router.add_api_route( + methods=["DELETE"], + path="/config/wingman", + endpoint=self.delete_wingman_config, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/config/create", + endpoint=self.create_config, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/config/rename", + endpoint=self.rename_config, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/config/default", + endpoint=self.set_default_config, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/config/save-wingman", + endpoint=self.save_wingman_config, + tags=tags, + ) + + # GET /configs + def get_config_dirs(self): + return ConfigsInfo( + config_dirs=self.config_manager.get_config_dirs(), + current_config_dir=self.current_config_dir, + ) + + # GET /configs/templates + def get_config_templates(self): + return self.config_manager.get_template_dirs() + + # GET /config + async def get_config(self, config_name: Optional[str] = "") -> ConfigWithDirInfo: + if config_name and len(config_name) > 0: + config_dir = self.config_manager.get_config_dir(config_name) + + config_info = await self.load_config(config_dir) + + return config_info + + # GET /config-dir-path + def get_config_dir_path(self, config_name: Optional[str] = ""): + return self.config_manager.get_config_dir_path(config_name) + + # POST /config + async def load_config( + self, config_dir: Optional[ConfigDirInfo] = None + ) -> tuple[list[WingmanInitializationError], ConfigWithDirInfo]: + try: + loaded_config_dir, config = self.config_manager.load_config(config_dir) + except Exception as e: + self.printr.toast_error(str(e)) + raise e + + self.current_config_dir = loaded_config_dir + self.current_config = config + + config_dir_info = ConfigWithDirInfo(config=config, config_dir=loaded_config_dir) + await self.config_events.publish("config_loaded", config_dir_info) + + return config_dir_info + + # POST config/create + async def create_config( + self, config_name: str, template: Optional[ConfigDirInfo] = None + ): + new_dir = self.config_manager.create_config( + config_name=config_name, template=template + ) + await self.load_config(new_dir) + + # POST config/rename + async def rename_config(self, config_dir: ConfigDirInfo, new_name: str): + new_config_dir = self.config_manager.rename_config( + config_dir=config_dir, new_name=new_name + ) + if new_config_dir and config_dir.name == self.current_config_dir.name: + await self.load_config(new_config_dir) + + # POST config/default + def set_default_config(self, config_dir: ConfigDirInfo): + self.config_manager.set_default_config(config_dir=config_dir) + + # DELETE config + async def delete_config(self, config_dir: ConfigDirInfo): + self.config_manager.delete_config(config_dir=config_dir) + if config_dir.name == self.current_config_dir.name: + await self.load_config() + + # GET config/wingmen + async def get_wingmen_config_files(self, config_name: str): + config_dir = self.config_manager.get_config_dir(config_name) + return self.config_manager.get_wingmen_configs(config_dir) + + # DELETE config/wingman + async def delete_wingman_config( + self, config_dir: ConfigDirInfo, wingman_file: WingmanConfigFileInfo + ): + self.config_manager.delete_wingman_config(config_dir, wingman_file) + await self.load_config(config_dir) # refresh + + # GET config/new-wingman/ + async def get_new_wingmen_template(self): + return self.config_manager.get_new_wingman_template() + + # POST config/new-wingman + async def add_new_wingman( + self, config_dir: ConfigDirInfo, wingman_config: WingmanConfig, avatar: str + ): + wingman_file = WingmanConfigFileInfo( + name=wingman_config.name, + file=f"{wingman_config.name}.yaml", + is_deleted=False, + avatar=avatar, + ) + + await self.save_wingman_config( + config_dir=config_dir, + wingman_file=wingman_file, + wingman_config=wingman_config, + auto_recover=False, + ) + + # POST config/save-wingman + async def save_wingman_config( + self, + config_dir: ConfigDirInfo, + wingman_file: WingmanConfigFileInfo, + wingman_config: WingmanConfig, + auto_recover: bool = False, + silent: bool = False, + ): + self.config_manager.save_wingman_config( + config_dir=config_dir, + wingman_file=wingman_file, + wingman_config=wingman_config, + ) + try: + if not silent: + await self.load_config(config_dir) + self.printr.toast("Wingman saved successfully.") + except Exception: + error_message = "Invalid Wingman configuration." + if auto_recover: + deleted = self.config_manager.delete_wingman_config( + config_dir, wingman_file + ) + if deleted: + self.config_manager.create_configs_from_templates() + + await self.load_config(config_dir) + + restored_message = ( + "Deleted broken config (and restored default if there is a template for it)." + if deleted + else "" + ) + self.printr.toast_error(f"{error_message} {restored_message}") + else: + self.printr.toast_error(f"{error_message}") + + # POST config/wingman/default + async def set_default_wingman( + self, + config_dir: ConfigDirInfo, + wingman_name: str, + ): + _dir, config = self.config_manager.load_config(config_dir) + wingman_config_files = await self.get_wingmen_config_files(config_dir.name) + + # Check if the wingman_name is already the default + already_default = any( + ( + config.wingmen[file.name].name == wingman_name + and config.wingmen[file.name].is_voice_activation_default + ) + for file in wingman_config_files + ) + + for wingman_config_file in wingman_config_files: + wingman_config = config.wingmen[wingman_config_file.name] + + if already_default: + # If wingman_name is already default, undefault it + wingman_config.is_voice_activation_default = False + else: + # Set the new default + wingman_config.is_voice_activation_default = ( + wingman_config.name == wingman_name + ) + + await self.save_wingman_config( + config_dir=config_dir, + wingman_file=wingman_config_file, + wingman_config=wingman_config, + silent=True, + ) + + await self.load_config(config_dir) diff --git a/services/pub_sub.py b/services/pub_sub.py index 5a723224..0326069a 100644 --- a/services/pub_sub.py +++ b/services/pub_sub.py @@ -1,3 +1,6 @@ +import asyncio + + class PubSub: def __init__(self): self.subscribers = {} @@ -11,7 +14,10 @@ def unsubscribe(self, event_type, fn): if event_type in self.subscribers: self.subscribers[event_type].remove(fn) - def publish(self, event_type, data): + async def publish(self, event_type, data): if event_type in self.subscribers: for fn in self.subscribers[event_type]: - fn(data) + if asyncio.iscoroutinefunction(fn): + await fn(data) + else: + fn(data) diff --git a/services/settings_service.py b/services/settings_service.py new file mode 100644 index 00000000..02ab6789 --- /dev/null +++ b/services/settings_service.py @@ -0,0 +1,187 @@ +from typing import Optional +from fastapi import APIRouter +from api.enums import LogType, ToastType, VoiceActivationSttProvider, WingmanProRegion +from api.interface import ( + AudioSettings, + AzureSttConfig, + SettingsConfig, + WhispercppSttConfig, +) +from services.config_manager import ConfigManager +from services.config_service import ConfigService +from services.printr import Printr +from services.pub_sub import PubSub + + +class SettingsService: + def __init__(self, config_manager: ConfigManager, config_service: ConfigService): + self.printr = Printr() + self.config_manager = config_manager + self.config_service = config_service + self.settings = self.get_settings() + self.settings_events = PubSub() + + self.router = APIRouter() + tags = ["settings"] + + self.router.add_api_route( + methods=["GET"], + path="/settings", + endpoint=self.get_settings, + response_model=SettingsConfig, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/settings/audio-devices", + endpoint=self.set_audio_devices, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/settings/voice-activation", + endpoint=self.set_voice_activation, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/settings/mute-key", + endpoint=self.set_mute_key, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/settings/wingman-pro", + endpoint=self.set_wingman_pro_settings, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/settings/wingman-pro/make-default", + endpoint=self.set_wingman_pro_as_default, + tags=tags, + ) + + # GET /settings + def get_settings(self): + return self.config_manager.settings_config + + # POST /settings/audio-devices + async def set_audio_devices( + self, output_device: Optional[int] = None, input_device: Optional[int] = None + ): + self.config_manager.settings_config.audio = AudioSettings( + input=input_device, + output=output_device, + ) + + if self.config_manager.save_settings_config(): + self.printr.print( + "Audio devices updated.", toast=ToastType.NORMAL, color=LogType.POSITIVE + ) + await self.settings_events.publish( + "audio_devices_changed", (output_device, input_device) + ) + return output_device, input_device + + # POST /settings/voice-activation + async def set_voice_activation(self, is_enabled: bool): + self.config_manager.settings_config.voice_activation.enabled = is_enabled + + if self.config_manager.save_settings_config(): + self.printr.print( + f"Voice activation {'enabled' if is_enabled else 'disabled'}.", + toast=ToastType.NORMAL, + color=LogType.POSITIVE, + ) + await self.settings_events.publish("voice_activation_changed", is_enabled) + + # POST /settings/mute-key + def set_mute_key(self, key: str, keycodes: Optional[list[int]] = None): + self.config_manager.settings_config.voice_activation.mute_toggle_key = key + self.config_manager.settings_config.voice_activation.mute_toggle_key_codes = ( + keycodes + ) + + if self.config_manager.save_settings_config(): + self.printr.print( + "Mute key saved.", + toast=ToastType.NORMAL, + color=LogType.POSITIVE, + ) + + # POST /settings/wingman-pro + async def set_wingman_pro_settings( + self, + base_url: str, + region: WingmanProRegion, + stt_provider: VoiceActivationSttProvider, + azure: AzureSttConfig, + whispercpp: WhispercppSttConfig, + va_energy_threshold: float, + ): + self.config_manager.settings_config.wingman_pro.base_url = base_url + self.config_manager.settings_config.wingman_pro.region = region + + self.config_manager.settings_config.voice_activation.stt_provider = stt_provider + self.config_manager.settings_config.voice_activation.azure = azure + self.config_manager.settings_config.voice_activation.whispercpp = whispercpp + + old_va_threshold = ( + self.config_manager.settings_config.voice_activation.energy_threshold + ) + self.config_manager.settings_config.voice_activation.energy_threshold = ( + va_energy_threshold + ) + + if self.config_manager.save_settings_config(): + await self.config_service.load_config() + self.printr.print( + "Wingman Pro settings updated.", + toast=ToastType.NORMAL, + color=LogType.POSITIVE, + ) + if old_va_threshold != va_energy_threshold: + await self.settings_events.publish( + "va_treshold_changed", va_energy_threshold + ) + + # POST /settings/wingman-pro/make-default + async def set_wingman_pro_as_default(self, patch_existing_wingmen: bool): + self.config_manager.default_config.features.conversation_provider = ( + "wingman_pro" + ) + self.config_manager.default_config.features.summarize_provider = "wingman_pro" + self.config_manager.default_config.features.tts_provider = "wingman_pro" + self.config_manager.default_config.features.stt_provider = "wingman_pro" + + self.config_manager.save_defaults_config() + + if patch_existing_wingmen: + config_dirs = self.config_service.get_config_dirs() + for config_dir in config_dirs.config_dirs: + wingman_config_files = ( + await self.config_service.get_wingmen_config_files(config_dir.name) + ) + for wingman_config_file in wingman_config_files: + wingman_config = self.config_manager.load_wingman_config( + config_dir=config_dir, wingman_file=wingman_config_file + ) + if wingman_config: + wingman_config.features.conversation_provider = "wingman_pro" + wingman_config.features.summarize_provider = "wingman_pro" + wingman_config.features.tts_provider = "wingman_pro" + wingman_config.features.stt_provider = "wingman_pro" + + self.config_manager.save_wingman_config( + config_dir=config_dir, + wingman_file=wingman_config_file, + wingman_config=wingman_config, + ) + await self.config_service.load_config(self.config_service.current_config_dir) + + self.printr.print( + "Have fun using Wingman Pro!", + toast=ToastType.NORMAL, + color=LogType.POSITIVE, + ) diff --git a/services/system_manager.py b/services/system_manager.py index 34afb0d4..226f9e08 100644 --- a/services/system_manager.py +++ b/services/system_manager.py @@ -4,7 +4,7 @@ from packaging import version from api.interface import SystemCore, SystemInfo -LOCAL_VERSION = "1.0.0" +LOCAL_VERSION = "1.1.0" VERSION_ENDPOINT = "https://shipbit.de/wingman.json" diff --git a/services/tower.py b/services/tower.py index 2be6ae52..e7505abc 100644 --- a/services/tower.py +++ b/services/tower.py @@ -100,3 +100,9 @@ def get_wingman_from_text(self, text: str) -> Wingman | None: return wingman return None + + def get_wingman_by_name(self, wingman_name: str): + for wingman in self.wingmen: + if wingman.name == wingman_name: + return wingman + return None diff --git a/services/voice_service.py b/services/voice_service.py new file mode 100644 index 00000000..86c44d75 --- /dev/null +++ b/services/voice_service.py @@ -0,0 +1,240 @@ +from fastapi import APIRouter +from api.enums import AzureRegion, OpenAiTtsVoice +from api.interface import ( + AzureTtsConfig, + EdgeTtsConfig, + ElevenlabsConfig, + SoundConfig, + VoiceInfo, + XVASynthTtsConfig, +) +from providers.edge import Edge +from providers.elevenlabs import ElevenLabs +from providers.open_ai import OpenAi, OpenAiAzure +from providers.wingman_pro import WingmanPro +from providers.xvasynth import XVASynth +from services.audio_player import AudioPlayer +from services.config_manager import ConfigManager + + +class VoiceService: + def __init__(self, config_manager: ConfigManager, audio_player: AudioPlayer): + self.config_manager = config_manager + self.audio_player = audio_player + + self.router = APIRouter() + tags = ["voice"] + self.router.add_api_route( + methods=["GET"], + path="/voices/elevenlabs", + endpoint=self.get_elevenlabs_voices, + response_model=list[VoiceInfo], + tags=tags, + ) + self.router.add_api_route( + methods=["GET"], + path="/voices/azure", + endpoint=self.get_azure_voices, + response_model=list[VoiceInfo], + tags=tags, + ) + self.router.add_api_route( + methods=["GET"], + path="/voices/azure/wingman-pro", + endpoint=self.get_wingman_pro_azure_voices, + response_model=list[VoiceInfo], + tags=tags, + ) + + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/openai", + endpoint=self.play_openai_tts, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/azure", + endpoint=self.play_azure_tts, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/elevenlabs", + endpoint=self.play_elevenlabs_tts, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/edgetts", + endpoint=self.play_edge_tts, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/xvasynth", + endpoint=self.play_xvasynth_tts, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/wingman-pro/azure", + endpoint=self.play_wingman_pro_azure, + tags=tags, + ) + self.router.add_api_route( + methods=["POST"], + path="/voices/preview/wingman-pro/openai", + endpoint=self.play_wingman_pro_openai, + tags=tags, + ) + + def __convert_azure_voice(self, voice): + # retrieved from Wingman Pro as serialized dict + if isinstance(voice, dict): + return VoiceInfo( + id=voice.get("short_name"), + name=voice.get("local_name"), + gender=voice.get("gender"), + locale=voice.get("locale"), + ) + # coming directly from Azure API as a voice object + else: + return VoiceInfo( + id=voice.short_name, + name=voice.local_name, + gender=voice.gender.name, + locale=voice.locale, + ) + + # GET /voices/elevenlabs + def get_elevenlabs_voices(self, api_key: str): + elevenlabs = ElevenLabs(api_key=api_key, wingman_name="") + voices = elevenlabs.get_available_voices() + convert = lambda voice: VoiceInfo(id=voice.voiceID, name=voice.name) + result = [convert(voice) for voice in voices] + + return result + + # GET /voices/azure + def get_azure_voices(self, api_key: str, region: AzureRegion, locale: str = ""): + azure = OpenAiAzure() + voices = azure.get_available_voices( + api_key=api_key, region=region.value, locale=locale + ) + result = [self.__convert_azure_voice(voice) for voice in voices] + return result + + # GET /voices/azure/wingman-pro + def get_wingman_pro_azure_voices(self, locale: str = ""): + wingman_pro = WingmanPro( + wingman_name="", settings=self.config_manager.settings_config.wingman_pro + ) + voices = wingman_pro.get_available_voices(locale=locale) + if not voices: + return [] + result = [self.__convert_azure_voice(voice) for voice in voices] + return result + + # POST /play/openai + async def play_openai_tts( + self, text: str, api_key: str, voice: OpenAiTtsVoice, sound_config: SoundConfig + ): + openai = OpenAi(api_key=api_key) + await openai.play_audio( + text=text, + voice=voice, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + ) + + # POST /play/azure + async def play_azure_tts( + self, text: str, api_key: str, config: AzureTtsConfig, sound_config: SoundConfig + ): + azure = OpenAiAzure() + await azure.play_audio( + text=text, + api_key=api_key, + config=config, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + ) + + # POST /play/elevenlabs + async def play_elevenlabs_tts( + self, + text: str, + api_key: str, + config: ElevenlabsConfig, + sound_config: SoundConfig, + ): + elevenlabs = ElevenLabs(api_key=api_key, wingman_name="") + await elevenlabs.play_audio( + text=text, + config=config, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + stream=False, + ) + + # POST /play/edgetts + async def play_edge_tts( + self, text: str, config: EdgeTtsConfig, sound_config: SoundConfig + ): + edge = Edge() + await edge.play_audio( + text=text, + config=config, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + ) + + # POST /play/xvasynth + async def play_xvasynth_tts( + self, text: str, config: XVASynthTtsConfig, sound_config: SoundConfig + ): + xvasynth = XVASynth(wingman_name="") + await xvasynth.play_audio( + text=text, + config=config, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + ) + + # POST /play/wingman-pro/azure + async def play_wingman_pro_azure( + self, text: str, config: AzureTtsConfig, sound_config: SoundConfig + ): + wingman_pro = WingmanPro( + wingman_name="system", + settings=self.config_manager.settings_config.wingman_pro, + ) + await wingman_pro.generate_azure_speech( + text=text, + config=config, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + ) + + # POST /play/wingman-pro/azure + async def play_wingman_pro_openai( + self, text: str, voice: OpenAiTtsVoice, sound_config: SoundConfig + ): + wingman_pro = WingmanPro( + wingman_name="system", + settings=self.config_manager.settings_config.wingman_pro, + ) + await wingman_pro.generate_openai_speech( + text=text, + voice=voice, + sound_config=sound_config, + audio_player=self.audio_player, + wingman_name="system", + ) diff --git a/wingman_core.py b/wingman_core.py index 8c66fadf..ba3f12ca 100644 --- a/wingman_core.py +++ b/wingman_core.py @@ -12,38 +12,23 @@ AzureRegion, CommandTag, LogType, - OpenAiTtsVoice, ToastType, VoiceActivationSttProvider, - WingmanProRegion, ) from api.interface import ( AudioDevice, - AudioSettings, AzureSttConfig, - AzureTtsConfig, - ConfigDirInfo, ConfigWithDirInfo, - ConfigsInfo, - EdgeTtsConfig, - ElevenlabsConfig, - NewWingmanTemplate, - SettingsConfig, - SoundConfig, - VoiceInfo, - WhispercppSttConfig, - WingmanConfig, WingmanConfigFileInfo, WingmanInitializationError, - XVASynthTtsConfig, ) -from providers.edge import Edge -from providers.elevenlabs import ElevenLabs -from providers.open_ai import OpenAi, OpenAiAzure +from providers.open_ai import OpenAi from providers.whispercpp import Whispercpp from providers.wingman_pro import WingmanPro -from providers.xvasynth import XVASynth from wingmen.wingman import Wingman +from services.voice_service import VoiceService +from services.settings_service import SettingsService +from services.config_service import ConfigService from services.audio_player import AudioPlayer from services.audio_recorder import AudioRecorder from services.config_manager import ConfigManager @@ -52,241 +37,58 @@ from services.tower import Tower from services.websocket_user import WebSocketUser -printr = Printr() - class WingmanCore(WebSocketUser): def __init__(self, config_manager: ConfigManager): - self.router = APIRouter() - self.router.add_api_route( - methods=["GET"], - path="/configs", - endpoint=self.get_config_dirs, - response_model=ConfigsInfo, - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/configs/templates", - endpoint=self.get_config_templates, - response_model=list[ConfigDirInfo], - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/config", - endpoint=self.get_config, - response_model=ConfigWithDirInfo, - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/config-dir-path", - endpoint=self.get_config_dir_path, - response_model=str, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config", - endpoint=self.load_config, - tags=["core"], - ) - self.router.add_api_route( - methods=["DELETE"], - path="/config", - endpoint=self.delete_config, - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/config/wingmen", - endpoint=self.get_wingmen_config_files, - response_model=list[WingmanConfigFileInfo], - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/config/new-wingman", - endpoint=self.get_new_wingmen_template, - response_model=NewWingmanTemplate, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config/new-wingman", - endpoint=self.add_new_wingman, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config/wingman/default", - endpoint=self.set_default_wingman, - tags=["core"], - ) - self.router.add_api_route( - methods=["DELETE"], - path="/config/wingman", - endpoint=self.delete_wingman_config, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config/create", - endpoint=self.create_config, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config/rename", - endpoint=self.rename_config, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config/default", - endpoint=self.set_default_config, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/config/save-wingman", - endpoint=self.save_wingman_config, - tags=["core"], - ) + self.printr = Printr() - self.router.add_api_route( - methods=["GET"], - path="/settings", - endpoint=self.get_settings, - response_model=SettingsConfig, - tags=["core"], - ) + self.router = APIRouter() + tags = ["core"] self.router.add_api_route( methods=["GET"], path="/audio-devices", endpoint=self.get_audio_devices, response_model=list[AudioDevice], - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/settings/audio-devices", - endpoint=self.set_audio_devices, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/settings/voice-activation", - endpoint=self.set_voice_activation, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/settings/mute-key", - endpoint=self.set_mute_key, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/settings/wingman-pro", - endpoint=self.set_wingman_pro_settings, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/settings/wingman-pro/make-default", - endpoint=self.set_wingman_pro_as_default, - tags=["core"], + tags=tags, ) self.router.add_api_route( methods=["POST"], path="/voice-activation/mute", endpoint=self.start_voice_recognition, - tags=["core"], + tags=tags, ) - self.router.add_api_route( methods=["GET"], path="/startup-errors", endpoint=self.get_startup_errors, response_model=list[WingmanInitializationError], - tags=["core"], - ) - - self.router.add_api_route( - methods=["GET"], - path="/voices/elevenlabs", - endpoint=self.get_elevenlabs_voices, - response_model=list[VoiceInfo], - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/voices/azure", - endpoint=self.get_azure_voices, - response_model=list[VoiceInfo], - tags=["core"], - ) - self.router.add_api_route( - methods=["GET"], - path="/voices/azure/wingman-pro", - endpoint=self.get_wingman_pro_azure_voices, - response_model=list[VoiceInfo], - tags=["core"], - ) - - self.router.add_api_route( - methods=["POST"], - path="/play/openai", - endpoint=self.play_openai_tts, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/play/azure", - endpoint=self.play_azure_tts, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/play/elevenlabs", - endpoint=self.play_elevenlabs_tts, - tags=["core"], + tags=tags, ) self.router.add_api_route( methods=["POST"], - path="/play/edgetts", - endpoint=self.play_edge_tts, - tags=["core"], - ) - self.router.add_api_route( - methods=["POST"], - path="/play/xvasynth", - endpoint=self.play_xvasynth_tts, - tags=["core"], + path="/stop-playback", + endpoint=self.stop_playback, + tags=tags, ) self.router.add_api_route( methods=["POST"], - path="/play/wingman-pro/azure", - endpoint=self.play_wingman_pro_azure, - tags=["core"], + path="/send-text-to-wingman", + endpoint=self.send_text_to_wingman, + tags=tags, ) self.router.add_api_route( methods=["POST"], - path="/play/wingman-pro/openai", - endpoint=self.play_wingman_pro_openai, - tags=["core"], + path="/reset_conversation_history", + endpoint=self.reset_conversation_history, + tags=tags, ) - self.router.add_api_route( - methods=["POST"], - path="/stop-playback", - endpoint=self.stop_playback, - tags=["core"], + self.config_manager = config_manager + self.config_service = ConfigService(config_manager=config_manager) + self.config_service.config_events.subscribe( + "config_loaded", self.initialize_tower ) - self.config_manager = config_manager self.secret_keeper: SecretKeeper = SecretKeeper() self.event_queue = asyncio.Queue() @@ -298,14 +100,11 @@ def __init__(self, config_manager: ConfigManager): self.tower: Tower = None - self.current_config_dir: ConfigDirInfo = ( - self.config_manager.find_default_config() - ) - self.current_config = None self.active_recording = {"key": "", "wingman": None} self.is_started = False self.startup_errors: list[WingmanInitializationError] = [] + self.tower_errors: list[WingmanInitializationError] = [] self.azure_speech_recognizer: speechsdk.SpeechRecognizer = None self.is_listening = False @@ -314,22 +113,48 @@ def __init__(self, config_manager: ConfigManager): self.key_events = {} + self.settings_service = SettingsService( + config_manager=config_manager, config_service=self.config_service + ) + self.settings_service.settings_events.subscribe( + "audio_devices_changed", self.on_audio_devices_changed + ) + self.settings_service.settings_events.subscribe( + "voice_activation_changed", self.set_voice_activation + ) + self.settings_service.settings_events.subscribe( + "va_treshold_changed", self.on_va_treshold_changed + ) + + self.voice_service = VoiceService( + config_manager=self.config_manager, audio_player=self.audio_player + ) + # restore settings - self.settings = self.get_settings() self.audio_recorder = AudioRecorder( on_speech_recorded=self.on_audio_recorder_speech_recorded ) - if self.settings.audio: - input_device = self.settings.audio.input - output_device = self.settings.audio.output + if self.settings_service.settings.audio: + input_device = self.settings_service.settings.audio.input + output_device = self.settings_service.settings.audio.output sd.default.device = (input_device, output_device) self.audio_recorder.update_input_stream() async def startup(self): - if self.settings.voice_activation.enabled: + if self.settings_service.settings.voice_activation.enabled: await self.set_voice_activation(is_enabled=True) + async def initialize_tower(self, config_dir_info: ConfigWithDirInfo): + self.tower = Tower( + config=config_dir_info.config, audio_player=self.audio_player + ) + self.tower_errors = await self.tower.instantiate_wingmen( + self.config_manager.settings_config + ) + for error in self.tower_errors: + self.printr.toast_error(error.message) + def is_hotkey_pressed(self, hotkey: list[int] | str) -> bool: codes = [] @@ -347,8 +172,8 @@ def is_hotkey_pressed(self, hotkey: list[int] | str) -> bool: def on_press(self, key=None, button=None): is_mute_hotkey_pressed = self.is_hotkey_pressed( - self.settings.voice_activation.mute_toggle_key_codes - or self.settings.voice_activation.mute_toggle_key + self.settings_service.settings.voice_activation.mute_toggle_key_codes + or self.settings_service.settings.voice_activation.mute_toggle_key ) if is_mute_hotkey_pressed: self.toggle_voice_recognition() @@ -368,7 +193,10 @@ def on_press(self, key=None, button=None): self.active_recording = dict(key=button, wingman=wingman) self.was_listening_before_ptt = self.is_listening - if self.settings.voice_activation.enabled and self.is_listening: + if ( + self.settings_service.settings.voice_activation.enabled + and self.is_listening + ): self.start_voice_recognition(mute=True) self.audio_recorder.start_recording(wingman_name=wingman.name) @@ -386,7 +214,7 @@ def on_release(self, key=None, button=None): self.active_recording = {"key": "", "wingman": None} if ( - self.settings.voice_activation.enabled + self.settings_service.settings.voice_activation.enabled and not self.is_listening and self.was_listening_before_ptt ): @@ -437,17 +265,18 @@ def run_async_process(): finally: loop.close() - provider = self.settings.voice_activation.stt_provider + provider = self.settings_service.settings.voice_activation.stt_provider text = None if provider == VoiceActivationSttProvider.WINGMAN_PRO: wingman_pro = WingmanPro( - wingman_name="system", settings=self.settings.wingman_pro + wingman_name="system", + settings=self.settings_service.settings.wingman_pro, ) transcription = wingman_pro.transcribe_azure_speech( filename=recording_file, config=AzureSttConfig( - languages=self.settings.voice_activation.azure.languages, + languages=self.settings_service.settings.voice_activation.azure.languages, # unused as Wingman Pro sets this at API level - just for Pydantic: region=AzureRegion.WESTEUROPE, ), @@ -473,11 +302,11 @@ def filter_and_clean_text(text): whisperccp = Whispercpp(wingman_name="system") transcription = whisperccp.transcribe( filename=recording_file, - config=self.settings.voice_activation.whispercpp, + config=self.settings_service.settings.voice_activation.whispercpp, ) cleaned, text = filter_and_clean_text(transcription.text) if cleaned: - printr.print( + self.printr.print( f"Cleaned original transcription: {transcription.text}", server_only=True, color=LogType.SUBTLE, @@ -494,10 +323,26 @@ def filter_and_clean_text(text): play_thread = threading.Thread(target=run_async_process) play_thread.start() else: - printr.print( + self.printr.print( "ignored empty transcription - probably just noise.", server_only=True ) + async def on_audio_devices_changed(self, devices: tuple[int | None, int | None]): + # devices: [output_device, input_device] + sd.default.device = devices + self.audio_recorder.update_input_stream() + + async def set_voice_activation(self, is_enabled: bool): + if is_enabled: + if ( + self.settings_service.settings.voice_activation.stt_provider + == VoiceActivationSttProvider.AZURE + and not self.azure_speech_recognizer + ): + await self.__init_azure_voice_activation() + else: + self.azure_speech_recognizer = None + # called when Azure Speech Recognizer recognized voice def on_azure_voice_recognition(self, voice_event): def run_async_process(): @@ -515,7 +360,7 @@ def run_async_process(): play_thread.start() async def __init_azure_voice_activation(self): - if self.azure_speech_recognizer or not self.current_config: + if self.azure_speech_recognizer or not self.config_service.current_config: return key = await self.secret_keeper.retrieve( @@ -525,13 +370,12 @@ async def __init_azure_voice_activation(self): ) speech_config = speechsdk.SpeechConfig( - region=self.settings.voice_activation.azure.region.value, subscription=key + region=self.settings_service.settings.voice_activation.azure.region.value, + subscription=key, ) - auto_detect_source_language_config = ( - speechsdk.languageconfig.AutoDetectSourceLanguageConfig( - languages=self.settings.voice_activation.azure.languages - ) + auto_detect_source_language_config = speechsdk.languageconfig.AutoDetectSourceLanguageConfig( + languages=self.settings_service.settings.voice_activation.azure.languages ) self.azure_speech_recognizer = speechsdk.SpeechRecognizer( @@ -541,25 +385,28 @@ async def __init_azure_voice_activation(self): self.azure_speech_recognizer.recognized.connect(self.on_azure_voice_recognition) async def on_playback_started(self, wingman_name: str): - await printr.print_async( + await self.printr.print_async( text=f"Playback started ({wingman_name})", source_name=wingman_name, command_tag=CommandTag.PLAYBACK_STARTED, ) self.was_listening_before_playback = self.is_listening - if self.settings.voice_activation.enabled and self.is_listening: + if ( + self.settings_service.settings.voice_activation.enabled + and self.is_listening + ): self.start_voice_recognition(mute=True) async def on_playback_finished(self, wingman_name: str): - await printr.print_async( + await self.printr.print_async( text=f"Playback finished ({wingman_name})", source_name=wingman_name, command_tag=CommandTag.PLAYBACK_STOPPED, ) if ( - self.settings.voice_activation.enabled + self.settings_service.settings.voice_activation.enabled and not self.is_listening and self.was_listening_before_playback ): @@ -570,336 +417,12 @@ async def process_events(self): callback, wingman_name = await self.event_queue.get() await callback(wingman_name) - # GET /configs - def get_config_dirs(self): - return ConfigsInfo( - config_dirs=self.config_manager.get_config_dirs(), - current_config_dir=self.current_config_dir, - ) - - # GET /configs/templates - def get_config_templates(self): - return self.config_manager.get_template_dirs() - - # GET /config - async def get_config(self, config_name: Optional[str] = "") -> ConfigWithDirInfo: - if config_name and len(config_name) > 0: - config_dir = self.config_manager.get_config_dir(config_name) - - errors, config_info = await self.load_config(config_dir) - - for error in errors: - printr.toast_error(error.message) - - return config_info - - # GET /config-dir-path - def get_config_dir_path(self, config_name: Optional[str] = ""): - return self.config_manager.get_config_dir_path(config_name) - - # POST config - async def load_config( - self, config_dir: Optional[ConfigDirInfo] = None - ) -> tuple[list[WingmanInitializationError], ConfigWithDirInfo]: - try: - loaded_config_dir, config = self.config_manager.load_config(config_dir) - except Exception as e: - printr.toast_error(str(e)) - raise e - - self.current_config_dir = loaded_config_dir - self.current_config = config - self.tower = Tower(config=config, audio_player=self.audio_player) - - errors = await self.tower.instantiate_wingmen( - self.config_manager.settings_config - ) - - return errors, ConfigWithDirInfo(config=config, config_dir=loaded_config_dir) - - # POST config/create - async def create_config( - self, config_name: str, template: Optional[ConfigDirInfo] = None - ): - new_dir = self.config_manager.create_config( - config_name=config_name, template=template - ) - await self.load_config(new_dir) - - # POST config/rename - async def rename_config(self, config_dir: ConfigDirInfo, new_name: str): - new_config_dir = self.config_manager.rename_config( - config_dir=config_dir, new_name=new_name - ) - if new_config_dir and config_dir.name == self.current_config_dir.name: - await self.load_config(new_config_dir) - - # POST config/default - def set_default_config(self, config_dir: ConfigDirInfo): - self.config_manager.set_default_config(config_dir=config_dir) - - # DELETE config - async def delete_config(self, config_dir: ConfigDirInfo): - self.config_manager.delete_config(config_dir=config_dir) - if config_dir.name == self.current_config_dir.name: - await self.load_config() - - # GET config/wingmen - async def get_wingmen_config_files(self, config_name: str): - config_dir = self.config_manager.get_config_dir(config_name) - return self.config_manager.get_wingmen_configs(config_dir) - - # DELETE config/wingman - async def delete_wingman_config( - self, config_dir: ConfigDirInfo, wingman_file: WingmanConfigFileInfo - ): - self.config_manager.delete_wingman_config(config_dir, wingman_file) - await self.load_config(config_dir) # refresh - - # GET config/new-wingman/ - async def get_new_wingmen_template(self): - return self.config_manager.get_new_wingman_template() - - # POST config/new-wingman - async def add_new_wingman( - self, config_dir: ConfigDirInfo, wingman_config: WingmanConfig, avatar: str - ): - wingman_file = WingmanConfigFileInfo( - name=wingman_config.name, - file=f"{wingman_config.name}.yaml", - is_deleted=False, - avatar=avatar, - ) - - await self.save_wingman_config( - config_dir=config_dir, - wingman_file=wingman_file, - wingman_config=wingman_config, - auto_recover=False, - ) - - # POST config/save-wingman - async def save_wingman_config( - self, - config_dir: ConfigDirInfo, - wingman_file: WingmanConfigFileInfo, - wingman_config: WingmanConfig, - auto_recover: bool = False, - silent: bool = False, - ): - self.config_manager.save_wingman_config( - config_dir=config_dir, - wingman_file=wingman_file, - wingman_config=wingman_config, - ) - try: - if not silent: - await self.load_config(config_dir) - printr.toast("Wingman saved successfully.") - except Exception: - error_message = "Invalid Wingman configuration." - if auto_recover: - deleted = self.config_manager.delete_wingman_config( - config_dir, wingman_file - ) - if deleted: - self.config_manager.create_configs_from_templates() - - await self.load_config(config_dir) - - restored_message = ( - "Deleted broken config (and restored default if there is a template for it)." - if deleted - else "" - ) - printr.toast_error(f"{error_message} {restored_message}") - else: - printr.toast_error(f"{error_message}") - - # POST config/wingman/default - async def set_default_wingman( - self, - config_dir: ConfigDirInfo, - wingman_name: str, - ): - _dir, config = self.config_manager.load_config(config_dir) - wingman_config_files = await self.get_wingmen_config_files(config_dir.name) - - # Check if the wingman_name is already the default - already_default = any( - ( - config.wingmen[file.name].name == wingman_name - and config.wingmen[file.name].is_voice_activation_default - ) - for file in wingman_config_files - ) - - for wingman_config_file in wingman_config_files: - wingman_config = config.wingmen[wingman_config_file.name] - - if already_default: - # If wingman_name is already default, undefault it - wingman_config.is_voice_activation_default = False - else: - # Set the new default - wingman_config.is_voice_activation_default = ( - wingman_config.name == wingman_name - ) - - await self.save_wingman_config( - config_dir=config_dir, - wingman_file=wingman_config_file, - wingman_config=wingman_config, - silent=True, - ) - - await self.load_config(config_dir) - - # GET /audio-devices - def get_audio_devices(self): - audio_devices = sd.query_devices() - return audio_devices - - # GET /settings - def get_settings(self): - return self.config_manager.settings_config - - # POST /settings/audio-devices - def set_audio_devices( - self, output_device: Optional[int] = None, input_device: Optional[int] = None - ): - # set the devices - sd.default.device = input_device, output_device - self.audio_recorder.update_input_stream() - - # save settings - self.config_manager.settings_config.audio = AudioSettings( - input=input_device, - output=output_device, - ) - - if self.config_manager.save_settings_config(): - printr.print( - "Audio devices updated.", toast=ToastType.NORMAL, color=LogType.POSITIVE - ) - - # POST /settings/voice-activation - async def set_voice_activation(self, is_enabled: bool): - if is_enabled: - if ( - self.settings.voice_activation.stt_provider - == VoiceActivationSttProvider.AZURE - and not self.azure_speech_recognizer - ): - await self.__init_azure_voice_activation() - else: - self.azure_speech_recognizer = None - - self.start_voice_recognition( - mute=not is_enabled, adjust_for_ambient_noise=is_enabled - ) - - self.config_manager.settings_config.voice_activation.enabled = is_enabled - - if self.config_manager.save_settings_config(): - printr.print( - f"Voice activation {'enabled' if is_enabled else 'disabled'}.", - toast=ToastType.NORMAL, - color=LogType.POSITIVE, - ) - - # POST /settings/mute-key - def set_mute_key(self, key: str, keycodes: Optional[list[int]] = None): - self.config_manager.settings_config.voice_activation.mute_toggle_key = key - self.config_manager.settings_config.voice_activation.mute_toggle_key_codes = ( - keycodes - ) - - if self.config_manager.save_settings_config(): - printr.print( - "Mute key saved.", - toast=ToastType.NORMAL, - color=LogType.POSITIVE, - ) - - # POST /settings/wingman-pro - async def set_wingman_pro_settings( - self, - base_url: str, - region: WingmanProRegion, - stt_provider: VoiceActivationSttProvider, - azure: AzureSttConfig, - whispercpp: WhispercppSttConfig, - va_energy_threshold: float, - ): - self.config_manager.settings_config.wingman_pro.base_url = base_url - self.config_manager.settings_config.wingman_pro.region = region - - self.config_manager.settings_config.voice_activation.stt_provider = stt_provider - self.config_manager.settings_config.voice_activation.azure = azure - self.config_manager.settings_config.voice_activation.whispercpp = whispercpp - - old_va_threshold = ( - self.config_manager.settings_config.voice_activation.energy_threshold - ) - self.config_manager.settings_config.voice_activation.energy_threshold = ( - va_energy_threshold - ) - - if self.config_manager.save_settings_config(): - await self.load_config() - printr.print( - "Wingman Pro settings updated.", - toast=ToastType.NORMAL, - color=LogType.POSITIVE, - ) - + def on_va_treshold_changed(self, _va_energy_threshold: float): # restart VA with new settings - if self.is_listening and old_va_threshold != va_energy_threshold: + if self.is_listening: self.start_voice_recognition(mute=True) self.start_voice_recognition(mute=False, adjust_for_ambient_noise=True) - # POST /settings/wingman-pro/make-default - async def set_wingman_pro_as_default(self, patch_existing_wingmen: bool): - self.config_manager.default_config.features.conversation_provider = ( - "wingman_pro" - ) - self.config_manager.default_config.features.summarize_provider = "wingman_pro" - self.config_manager.default_config.features.tts_provider = "wingman_pro" - self.config_manager.default_config.features.stt_provider = "wingman_pro" - - self.config_manager.save_defaults_config() - - if patch_existing_wingmen: - config_dirs = self.get_config_dirs() - for config_dir in config_dirs.config_dirs: - wingman_config_files = await self.get_wingmen_config_files( - config_dir.name - ) - for wingman_config_file in wingman_config_files: - wingman_config = self.config_manager.load_wingman_config( - config_dir=config_dir, wingman_file=wingman_config_file - ) - if wingman_config: - wingman_config.features.conversation_provider = "wingman_pro" - wingman_config.features.summarize_provider = "wingman_pro" - wingman_config.features.tts_provider = "wingman_pro" - wingman_config.features.stt_provider = "wingman_pro" - - self.config_manager.save_wingman_config( - config_dir=config_dir, - wingman_file=wingman_config_file, - wingman_config=wingman_config, - ) - await self.load_config(self.current_config_dir) - - printr.print( - "Have fun using Wingman Pro!", - toast=ToastType.NORMAL, - color=LogType.POSITIVE, - ) - - # POST /voice-activation/mute def start_voice_recognition( self, mute: Optional[bool] = False, @@ -908,7 +431,7 @@ def start_voice_recognition( self.is_listening = not mute if self.is_listening: if ( - self.settings.voice_activation.stt_provider + self.settings_service.settings.voice_activation.stt_provider == VoiceActivationSttProvider.AZURE ): self.azure_speech_recognizer.start_continuous_recognition() @@ -916,11 +439,11 @@ def start_voice_recognition( if adjust_for_ambient_noise: self.audio_recorder.adjust_for_ambient_noise() self.audio_recorder.start_continuous_listening( - va_settings=self.settings.voice_activation + va_settings=self.settings_service.settings.voice_activation ) else: if ( - self.settings.voice_activation.stt_provider + self.settings_service.settings.voice_activation.stt_provider == VoiceActivationSttProvider.AZURE ): self.azure_speech_recognizer.stop_continuous_recognition() @@ -934,160 +457,48 @@ def toggle_voice_recognition(self): mute = self.is_listening self.start_voice_recognition(mute) + # GET /audio-devices + def get_audio_devices(self): + audio_devices = sd.query_devices() + return audio_devices + # GET /startup-errors def get_startup_errors(self): return self.startup_errors - # GET /voices/elevenlabs - def get_elevenlabs_voices(self, api_key: str): - elevenlabs = ElevenLabs(api_key=api_key, wingman_name="") - voices = elevenlabs.get_available_voices() - convert = lambda voice: VoiceInfo(id=voice.voiceID, name=voice.name) - result = [convert(voice) for voice in voices] - - return result - - # GET /voices/azure - def get_azure_voices(self, api_key: str, region: AzureRegion, locale: str = ""): - azure = OpenAiAzure() - voices = azure.get_available_voices( - api_key=api_key, region=region.value, locale=locale - ) - result = [self.__convert_azure_voice(voice) for voice in voices] - return result - - # GET /voices/azure/wingman-pro - def get_wingman_pro_azure_voices(self, locale: str = ""): - wingman_pro = WingmanPro( - wingman_name="", settings=self.config_manager.settings_config.wingman_pro - ) - voices = wingman_pro.get_available_voices(locale=locale) - if not voices: - return [] - result = [self.__convert_azure_voice(voice) for voice in voices] - return result - - # POST /play/openai - async def play_openai_tts( - self, text: str, api_key: str, voice: OpenAiTtsVoice, sound_config: SoundConfig - ): - openai = OpenAi(api_key=api_key) - await openai.play_audio( - text=text, - voice=voice, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - ) - - # POST /play/azure - async def play_azure_tts( - self, text: str, api_key: str, config: AzureTtsConfig, sound_config: SoundConfig - ): - azure = OpenAiAzure() - await azure.play_audio( - text=text, - api_key=api_key, - config=config, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - ) - - # POST /play/elevenlabs - async def play_elevenlabs_tts( - self, - text: str, - api_key: str, - config: ElevenlabsConfig, - sound_config: SoundConfig, - ): - elevenlabs = ElevenLabs(api_key=api_key, wingman_name="") - await elevenlabs.play_audio( - text=text, - config=config, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - stream=False, - ) - - # POST /play/edgetts - async def play_edge_tts( - self, text: str, config: EdgeTtsConfig, sound_config: SoundConfig - ): - edge = Edge() - await edge.play_audio( - text=text, - config=config, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - ) - - # POST /play/xvasynth - async def play_xvasynth_tts( - self, text: str, config: XVASynthTtsConfig, sound_config: SoundConfig - ): - xvasynth = XVASynth(wingman_name="") - await xvasynth.play_audio( - text=text, - config=config, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - ) - - # POST /play/wingman-pro/azure - async def play_wingman_pro_azure( - self, text: str, config: AzureTtsConfig, sound_config: SoundConfig - ): - wingman_pro = WingmanPro( - wingman_name="system", - settings=self.config_manager.settings_config.wingman_pro, - ) - await wingman_pro.generate_azure_speech( - text=text, - config=config, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - ) - - # POST /play/wingman-pro/azure - async def play_wingman_pro_openai( - self, text: str, voice: OpenAiTtsVoice, sound_config: SoundConfig - ): - wingman_pro = WingmanPro( - wingman_name="system", - settings=self.config_manager.settings_config.wingman_pro, - ) - await wingman_pro.generate_openai_speech( - text=text, - voice=voice, - sound_config=sound_config, - audio_player=self.audio_player, - wingman_name="system", - ) - # POST /stop-playback async def stop_playback(self): await self.audio_player.stop_playback() - def __convert_azure_voice(self, voice): - # retrieved from Wingman Pro as serialized dict - if isinstance(voice, dict): - return VoiceInfo( - id=voice.get("short_name"), - name=voice.get("local_name"), - gender=voice.get("gender"), - locale=voice.get("locale"), - ) - # coming directly from Azure API as a voice object + # POST /send-text-to-wingman + async def send_text_to_wingman(self, text: str, wingman_name: str): + wingman = self.tower.get_wingman_by_name(wingman_name) + + def run_async_process(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(wingman.process(transcript=text)) + finally: + loop.close() + + if wingman and text: + play_thread = threading.Thread(target=run_async_process) + play_thread.start() + + # POST /reset-conversation-history + def reset_conversation_history(self, wingman_name: Optional[str]): + if wingman_name: + wingman = self.tower.get_wingman_by_name(wingman_name) + if wingman: + wingman.reset_conversation_history() + self.printr.toast( + f"Conversation history cleared for {wingman_name}.", + ) else: - return VoiceInfo( - id=voice.short_name, - name=voice.local_name, - gender=voice.gender.name, - locale=voice.locale, + for wingman in self.tower.wingmen: + wingman.reset_conversation_history() + self.printr.toast( + "Conversation history cleared.", ) + return True diff --git a/wingmen/wingman.py b/wingmen/wingman.py index b430367a..39a820c7 100644 --- a/wingmen/wingman.py +++ b/wingmen/wingman.py @@ -365,7 +365,7 @@ async def _execute_command(self, command: CommandConfig) -> str: if len(command.actions or []) > 0 and not self.debug: await printr.print_async( - f"❖ Executing command: {command.name}", color=LogType.INFO + f"Executing command: {command.name}", color=LogType.INFO ) if not self.debug: # in debug mode we already printed the separate execution times @@ -374,7 +374,7 @@ async def _execute_command(self, command: CommandConfig) -> str: if len(command.actions or []) == 0: await printr.print_async( - f"❖ No actions found for command: {command.name}", color=LogType.WARNING + f"No actions found for command: {command.name}", color=LogType.WARNING ) if self.debug: @@ -404,18 +404,23 @@ def execute_action(self, command: CommandConfig): for action in command.actions: if action.keyboard: - if action.keyboard.hold: - keyboard.press( - action.keyboard.hotkey_codes or action.keyboard.hotkey - ) - time.sleep(action.keyboard.hold) - keyboard.release( - action.keyboard.hotkey_codes or action.keyboard.hotkey - ) + if action.keyboard.press == action.keyboard.release: + # compressed key events + hold = action.keyboard.hold or 0.1 + if(action.keyboard.hotkey_codes and len(action.keyboard.hotkey_codes) == 1): + keyboard.direct_event(action.keyboard.hotkey_codes[0], 0+(1 if action.keyboard.hotkey_extended else 0)) + time.sleep(hold) + keyboard.direct_event(action.keyboard.hotkey_codes[0], 2+(1 if action.keyboard.hotkey_extended else 0)) + else: + keyboard.press(action.keyboard.hotkey_codes or action.keyboard.hotkey) + time.sleep(hold) + keyboard.release(action.keyboard.hotkey_codes or action.keyboard.hotkey) else: - keyboard.send( - action.keyboard.hotkey_codes or action.keyboard.hotkey - ) + # single key events + if(action.keyboard.hotkey_codes and len(action.keyboard.hotkey_codes) == 1): + keyboard.direct_event(action.keyboard.hotkey_codes[0], (0 if action.keyboard.press else 2)+(1 if action.keyboard.hotkey_extended else 0)) + else: + keyboard.send(action.keyboard.hotkey_codes or action.keyboard.hotkey, action.keyboard.press, action.keyboard.release) if action.mouse: if action.mouse.move_to: