diff --git a/Agently/plugins/agent_component/Decorator.py b/Agently/plugins/agent_component/Decorator.py index d0d37d7..c490077 100644 --- a/Agently/plugins/agent_component/Decorator.py +++ b/Agently/plugins/agent_component/Decorator.py @@ -34,6 +34,8 @@ def on_event(self, event: str, *, is_await:bool=False): and not event.startswith("tool:") and not event.startswith("realtime:") and not event == "realtime" + and not event.startswith("instant:") + and not event == "instant" ): event = "response:" + event def decorator(func: callable): @@ -65,7 +67,6 @@ def decorator(func: callable): self.agent.register_tool(**tool_info_kwrags) return func return decorator - def export(self): return { diff --git a/Agently/plugins/agent_component/EventListener.py b/Agently/plugins/agent_component/EventListener.py index 9a9c560..d1a3885 100644 --- a/Agently/plugins/agent_component/EventListener.py +++ b/Agently/plugins/agent_component/EventListener.py @@ -1,5 +1,5 @@ -import types import asyncio +from itertools import combinations from .utils import ComponentABC from Agently.utils import RuntimeCtx, RuntimeCtxNamespace @@ -14,53 +14,55 @@ def __init__(self, agent: object): def add(self, event:str, listener: callable, *, is_await:bool=False, is_agent_event:bool=False): event = event.replace(".", "->") if event == "realtime": - self.agent.settings.set("use_realtime", True) + self.agent.settings.set("use_instant", True) + event = "instant" if event.startswith("realtime:"): - self.agent.settings.set("use_realtime", True) + self.agent.settings.set("use_instant", True) + event = "instant:" + event[9:] + if event == "instant" or event.startswith("instant:"): + self.agent.settings.set("use_instant", True) + if event.startswith("instant:"): event_data = event.replace(" ", "").split(":") - hooks = event_data[1].replace("->", ".").split("&") - hook_list = [] - for hook in hooks: - hook_key_indexes = hook.split("?") - hook_key = hook_key_indexes[0] - hook_indexes = None - if len(hook_key_indexes) > 1: - hook_indexes = hook_key_indexes[1].split(",") - for index, item in enumerate(hook_indexes): - if item.startswith("(") and item.endswith(")"): - items_in_item = item[1:-1].split("|") + keys = event_data[1].replace("->", ".").split("&") + key_indexes_list = [] + for key_str in keys: + if isinstance(key_str, str): + if "?" in key_str: + key, indexes_str = key_str.split("?") + index_list = indexes_str.split(",") + if index_list == [""]: + index_list = [] + else: + key = key_str + index_list = [] + indexes = [] + for index in index_list: + if index in ("_", "*"): + indexes.append(-1) else: - items_in_item = [item] - for i, value in enumerate(items_in_item): - try: - items_in_item[i] = int(value) - except: - items_in_item[i] = None - hook_indexes[index] = items_in_item - hook_list.append((hook_key, hook_indexes)) - async def realtime_hook_handler(data): - for hook in hook_list: - hook_key = hook[0] - hook_indexes = hook[1] - if (data["key"] == hook_key): - if hook_indexes and len(hook_indexes) <= len(data["indexes"]): - can_call = True - for position, hook_index in enumerate(hook_indexes): - if hook_index != None and data["indexes"][position] not in hook_index: - can_call = False - if can_call: - if asyncio.iscoroutinefunction(listener): - await listener(data) - else: - listener(data) + indexes.append(int(index)) + key_indexes_list.append((key, indexes)) + async def instant_hook_handler(data): + indexes = data["indexes"] + if (data["key"], indexes) in key_indexes_list or (data["key"], []) in key_indexes_list: + if asyncio.iscoroutinefunction(listener): + await listener(data) + else: + listener(data) + indexes_len = len(indexes) + for r in range(1, indexes_len + 1): + for indices in combinations(range(indexes_len), r): + possible_indexes = indexes[:] + for i in indices: + possible_indexes[i] = -1 + if (data["key"], possible_indexes) in key_indexes_list: + if asyncio.iscoroutinefunction(listener): + await listener(data) else: - if asyncio.iscoroutinefunction(listener): - await listener(data) - else: - listener(data) - if "realtime" not in (self.listeners.get(trace_back=False) or {}): - self.listeners.update("realtime", []) - self.listeners.append("realtime", { "listener": realtime_hook_handler, "is_await": is_await }) + listener(data) + if event not in (self.listeners.get(trace_back=False) or {}): + self.listeners.update(event, []) + self.listeners.append("instant", { "listener": instant_hook_handler, "is_await": is_await }) else: if is_agent_event: if event not in (self.agent_listeners.get(trace_back=False) or {}): @@ -85,9 +87,13 @@ def on_finally(self, listener: callable, *, is_await:bool=False, is_agent_event: self.add("response:finally", listener, is_await=is_await, is_agent_event=is_agent_event) return self.agent + def on_instant(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False): + self.add("instant", listener, is_await=is_await, is_agent_event=is_agent_event) + return self.agent + def on_realtime(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False): - self.agent.settings.set("use_realtime", True) - self.add("realtime", listener, is_await=is_await, is_agent_event=is_agent_event) + self.agent.settings.set("use_instant", True) + self.add("instant", listener, is_await=is_await, is_agent_event=is_agent_event) return self.agent async def call_event_listeners(self, event: str, data: any): @@ -121,7 +127,7 @@ def export(self): "on_delta": { "func": self.on_delta }, "on_done": { "func": self.on_done }, "on_finally": { "func": self.on_finally }, - "on_realtime": { "func": self.on_realtime }, + "on_instant": { "func": self.on_instant }, "call_event_listeners": { "func": self.call_event_listeners }, }, } diff --git a/Agently/plugins/agent_component/Realtime.py b/Agently/plugins/agent_component/Instant.py similarity index 72% rename from Agently/plugins/agent_component/Realtime.py rename to Agently/plugins/agent_component/Instant.py index d32b4bf..0f2d4af 100644 --- a/Agently/plugins/agent_component/Realtime.py +++ b/Agently/plugins/agent_component/Instant.py @@ -3,20 +3,20 @@ from .utils import ComponentABC from Agently.utils import find_json -class Realtime(ComponentABC): +class Instant(ComponentABC): def __init__(self, agent: object): self.agent = agent - self.__get_enable = self.agent.settings.get_trace_back("use_realtime") + self.__get_enable = self.agent.settings.get_trace_back("use_instant") self.__is_init = False self.__on_going_key_id = None self.__cached_value = {} self.__possible_keys = set() self.__emitted = set() self.__streaming_buffer = "" - self.__realtime_value = None + self.__instant_value = None - def use_realtime(self): - self.agent.settings.set("use_realtime", True) + def use_instant(self): + self.agent.settings.set("use_instant", True) return self.agent def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None): @@ -31,32 +31,32 @@ def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None): else: return - async def __emit_realtime(self, key, indexes, delta, value): - event = "realtime" + async def __emit_instant(self, key, indexes, delta, value): + event = "instant" data = { "key": key[1:], "indexes": indexes, "delta": delta, "value": value, - "complete_value": self.__realtime_value, + "complete_value": self.__instant_value, } self.agent.put_data_to_generator(event, data) await self.agent.call_event_listeners(event, data) - async def __scan_realtime_value(self, key: str, indexes:list, value:any): + async def __scan_instant_value(self, key: str, indexes:list, value:any): indexes = indexes[:] key_id = (key, json5.dumps(indexes)) if key_id in self.__emitted or key not in self.__possible_keys: return if isinstance(value, dict): for item_key, item_value in value.items(): - await self.__scan_realtime_value(key + f".{ item_key }", indexes, item_value) + await self.__scan_instant_value(key + f".{ item_key }", indexes, item_value) self.__cached_value[key_id] = value elif isinstance(value, list): for item_index, item_value in enumerate(value): temp = indexes[:] temp.append(item_index) - await self.__scan_realtime_value(key + f".[]", temp, item_value) + await self.__scan_instant_value(key + f".[]", temp, item_value) self.__cached_value[key_id] = value else: if isinstance(value, str): @@ -64,7 +64,7 @@ async def __scan_realtime_value(self, key: str, indexes:list, value:any): cached_value = cached_value if cached_value != None else "" delta = value.replace(cached_value, "") if len(delta) > 0: - await self.__emit_realtime(key, indexes, delta, value) + await self.__emit_instant(key + ".$delta", indexes, delta, value) self.__cached_value[key_id] = value self.__on_going_key_id = key_id else: @@ -95,19 +95,15 @@ async def __emit_waiting(self, *, is_done:bool=False): continue if self.__judge_can_emit(key_id, self.__on_going_key_id) or is_done: if isinstance(value, dict): - await self.__emit_realtime(key_id[0], indexes, value, value) + await self.__emit_instant(key_id[0], indexes, value, value) self.__emitted.add(key_id) key_ids_to_del.append(key_id) elif isinstance(value, list): - await self.__emit_realtime(key_id[0], indexes, value, value) - self.__emitted.add(key_id) - key_ids_to_del.append(key_id) - elif isinstance(value, str): - await self.__emit_realtime(key_id[0] + ".$complete", indexes, value, value) + await self.__emit_instant(key_id[0], indexes, value, value) self.__emitted.add(key_id) key_ids_to_del.append(key_id) else: - await self.__emit_realtime(key_id[0], indexes, value, value) + await self.__emit_instant(key_id[0], indexes, value, value) self.__emitted.add(key_id) key_ids_to_del.append(key_id) for key_id in key_ids_to_del: @@ -115,7 +111,7 @@ async def __emit_waiting(self, *, is_done:bool=False): async def _suffix(self, event: str, data: any): if ( - not self.agent.settings.get("use_realtime") + not self.agent.settings.get("use_instant") or "type" not in self.agent.request.response_cache or self.agent.request.response_cache["type"] != "JSON" ): @@ -127,13 +123,13 @@ async def _suffix(self, event: str, data: any): self.__is_init = True if event == "response:delta": self.__streaming_buffer += data - realtime_json_str = find_json(self.__streaming_buffer) - if realtime_json_str != None: + instant_json_str = find_json(self.__streaming_buffer) + if instant_json_str != None: lexer = Lexer() - lexer.append_string(realtime_json_str) + lexer.append_string(instant_json_str) try: - self.__realtime_value = json5.loads(lexer.complete_json()) - await self.__scan_realtime_value("", [], self.__realtime_value) + self.__instant_value = json5.loads(lexer.complete_json()) + await self.__scan_instant_value("", [], self.__instant_value) await self.__emit_waiting() except ValueError: return None @@ -141,13 +137,13 @@ async def _suffix(self, event: str, data: any): raise(e) if event == "response:done": self.__streaming_buffer += data - realtime_json_str = find_json(self.__streaming_buffer) - if realtime_json_str != None: + instant_json_str = find_json(self.__streaming_buffer) + if instant_json_str != None: lexer = Lexer() - lexer.append_string(realtime_json_str) + lexer.append_string(instant_json_str) try: - self.__realtime_value = json5.loads(lexer.complete_json()) - await self.__scan_realtime_value("", [], self.__realtime_value) + self.__instant_value = json5.loads(lexer.complete_json()) + await self.__scan_instant_value("", [], self.__instant_value) await self.__emit_waiting(is_done=True) except ValueError: return None @@ -157,8 +153,11 @@ async def _suffix(self, event: str, data: any): def export(self): return { "suffix": self._suffix, - "alias": { "use_realtime": { "func": self.use_realtime } } + "alias": { + "use_instant": { "func": self.use_instant }, + "use_realtime": { "func": self.use_instant }, + }, } def export(): - return ("Realtime", Realtime) \ No newline at end of file + return ("Instant", Instant) \ No newline at end of file diff --git a/Agently/plugins/agent_component/ResponseGenerator.py b/Agently/plugins/agent_component/ResponseGenerator.py index f3e59f2..4015ceb 100644 --- a/Agently/plugins/agent_component/ResponseGenerator.py +++ b/Agently/plugins/agent_component/ResponseGenerator.py @@ -1,5 +1,5 @@ +from itertools import combinations import threading -import asyncio import queue from .utils import ComponentABC @@ -13,7 +13,6 @@ def put_data_to_generator(self, event, data): def get_complete_generator(self): thread = threading.Thread(target=self.agent.start) - thread.daemon = True thread.start() while True: try: @@ -25,17 +24,67 @@ def get_complete_generator(self): continue thread.join() - def get_realtime_generator(self): - self.agent.settings.set("use_realtime", True) + def get_instant_keys_generator(self, keys): + if not isinstance(keys, list): + if isinstance(keys, str): + keys = keys.split("&") + else: + raise Exception("[Response Generator]", ".get_instant_keys_generator() require a list or string input.\nKey format: ?") + key_indexes_list = [] + for key_str in keys: + if isinstance(key_str, str): + if "?" in key_str: + key, indexes_str = key_str.split("?") + index_list = indexes_str.split(",") + if index_list == [""]: + index_list = [] + else: + key = key_str + index_list = [] + indexes = [] + for index in index_list: + if index in ("_", "*"): + indexes.append(-1) + else: + indexes.append(int(index)) + key_indexes_list.append((key, indexes)) + self.agent.settings.set("use_instant", True) thread = threading.Thread(target=self.agent.start) - thread.daemon = True thread.start() while True: try: item = self.data_queue.get_nowait() if item == (None, None): break - if item[0] == "realtime": + if item[0] == "instant": + indexes = item[1]["indexes"] + if (item[1]["key"], indexes) in key_indexes_list or (item[1]["key"], []) in key_indexes_list: + yield item[1] + continue + indexes_len = len(indexes) + for r in range(1, indexes_len + 1): + for indices in combinations(range(indexes_len), r): + possible_indexes = indexes[:] + for i in indices: + possible_indexes[i] = -1 + if (item[1]["key"], possible_indexes) in key_indexes_list: + yield item[1] + break + except: + continue + thread.join() + + + def get_instant_generator(self): + self.agent.settings.set("use_instant", True) + thread = threading.Thread(target=self.agent.start) + thread.start() + while True: + try: + item = self.data_queue.get_nowait() + if item == (None, None): + break + if item[0] == "instant": yield item[1] except: continue @@ -43,7 +92,6 @@ def get_realtime_generator(self): def get_generator(self): thread = threading.Thread(target=self.agent.start) - thread.daemon = True thread.start() while True: try: @@ -68,7 +116,9 @@ def export(self): "alias": { "put_data_to_generator": { "func": self.put_data_to_generator }, "get_generator": { "func": self.get_generator, "return_value": True }, - "get_realtime_generator": { "func": self.get_realtime_generator, "return_value": True }, + "get_instant_generator": { "func": self.get_instant_generator, "return_value": True }, + "get_realtime_generator": { "func": self.get_instant_generator, "return_value": True }, + "get_instant_keys_generator": { "func": self.get_instant_keys_generator, "return_value": True }, "get_complete_generator": { "func": self.get_complete_generator, "return_value": True }, }, }