From b73dd3c177a4ce9831f44ae895d0053c77456c59 Mon Sep 17 00:00:00 2001 From: Maplemx Date: Thu, 24 Oct 2024 12:19:28 +0800 Subject: [PATCH 1/3] update: 1. use settings to contorl `use_realtime` status; 2. put data to generator; --- Agently/plugins/agent_component/Realtime.py | 26 ++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/Agently/plugins/agent_component/Realtime.py b/Agently/plugins/agent_component/Realtime.py index 0abedee..d32b4bf 100644 --- a/Agently/plugins/agent_component/Realtime.py +++ b/Agently/plugins/agent_component/Realtime.py @@ -6,7 +6,7 @@ class Realtime(ComponentABC): def __init__(self, agent: object): self.agent = agent - self.__is_enable = False + self.__get_enable = self.agent.settings.get_trace_back("use_realtime") self.__is_init = False self.__on_going_key_id = None self.__cached_value = {} @@ -16,7 +16,7 @@ def __init__(self, agent: object): self.__realtime_value = None def use_realtime(self): - self.__is_enable = True + self.agent.settings.set("use_realtime", True) return self.agent def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None): @@ -32,16 +32,16 @@ def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None): return async def __emit_realtime(self, key, indexes, delta, value): - await self.agent.call_event_listeners( - "realtime", - { - "key": key[1:], - "indexes": indexes, - "delta": delta, - "value": value, - "complete_value": self.__realtime_value, - } - ) + event = "realtime" + data = { + "key": key[1:], + "indexes": indexes, + "delta": delta, + "value": value, + "complete_value": self.__realtime_value, + } + self.agent.put_data_to_generator(event, data) + await self.agent.call_event_listeners(event, data) async def __scan_realtime_value(self, key: str, indexes:list, value:any): indexes = indexes[:] @@ -115,7 +115,7 @@ async def __emit_waiting(self, *, is_done:bool=False): async def _suffix(self, event: str, data: any): if ( - not self.__is_enable + not self.agent.settings.get("use_realtime") or "type" not in self.agent.request.response_cache or self.agent.request.response_cache["type"] != "JSON" ): From b1a648004b412827cb36c2a004e90728a5f9742b Mon Sep 17 00:00:00 2001 From: Maplemx Date: Thu, 24 Oct 2024 12:20:29 +0800 Subject: [PATCH 2/3] update: automatically use realtime when add "realtime" events --- Agently/plugins/agent_component/EventListener.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Agently/plugins/agent_component/EventListener.py b/Agently/plugins/agent_component/EventListener.py index a7960a5..9a9c560 100644 --- a/Agently/plugins/agent_component/EventListener.py +++ b/Agently/plugins/agent_component/EventListener.py @@ -13,7 +13,10 @@ def __init__(self, agent: object): def add(self, event:str, listener: callable, *, is_await:bool=False, is_agent_event:bool=False): event = event.replace(".", "->") + if event == "realtime": + self.agent.settings.set("use_realtime", True) if event.startswith("realtime:"): + self.agent.settings.set("use_realtime", True) event_data = event.replace(" ", "").split(":") hooks = event_data[1].replace("->", ".").split("&") hook_list = [] @@ -55,8 +58,8 @@ async def realtime_hook_handler(data): await listener(data) else: listener(data) - if event not in (self.listeners.get(trace_back=False) or {}): - self.listeners.update(event, []) + if "realtime" not in (self.listeners.get(trace_back=False) or {}): + self.listeners.update("realtime", []) self.listeners.append("realtime", { "listener": realtime_hook_handler, "is_await": is_await }) else: if is_agent_event: @@ -83,6 +86,7 @@ def on_finally(self, listener: callable, *, is_await:bool=False, is_agent_event: return self.agent def on_realtime(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False): + self.agent.settings.set("use_realtime", True) self.add("realtime", listener, is_await=is_await, is_agent_event=is_agent_event) return self.agent From 81b345bb56852681f368385a1dfeeac69e12f1a5 Mon Sep 17 00:00:00 2001 From: Maplemx Date: Thu, 24 Oct 2024 12:21:03 +0800 Subject: [PATCH 3/3] feat: ResponseGenerator --- .../agent_component/ResponseGenerator.py | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 Agently/plugins/agent_component/ResponseGenerator.py diff --git a/Agently/plugins/agent_component/ResponseGenerator.py b/Agently/plugins/agent_component/ResponseGenerator.py new file mode 100644 index 0000000..f3e59f2 --- /dev/null +++ b/Agently/plugins/agent_component/ResponseGenerator.py @@ -0,0 +1,77 @@ +import threading +import asyncio +import queue +from .utils import ComponentABC + +class ResponseGenerator(ComponentABC): + def __init__(self, agent): + self.agent = agent + self.data_queue = queue.Queue() + + def put_data_to_generator(self, event, data): + self.data_queue.put((event, data)) + + def get_complete_generator(self): + thread = threading.Thread(target=self.agent.start) + thread.daemon = True + thread.start() + while True: + try: + item = self.data_queue.get_nowait() + if item == (None, None): + break + yield item + except: + continue + thread.join() + + def get_realtime_generator(self): + self.agent.settings.set("use_realtime", True) + thread = threading.Thread(target=self.agent.start) + thread.daemon = True + thread.start() + while True: + try: + item = self.data_queue.get_nowait() + if item == (None, None): + break + if item[0] == "realtime": + yield item[1] + except: + continue + thread.join() + + def get_generator(self): + thread = threading.Thread(target=self.agent.start) + thread.daemon = True + thread.start() + while True: + try: + item = self.data_queue.get_nowait() + if item == (None, None): + break + if not item[0].endswith(("_origin")): + yield item + except: + continue + thread.join() + + def _suffix(self, event, data): + if event != "response:finally": + self.put_data_to_generator(event, data) + else: + self.put_data_to_generator(None, None) + + def export(self): + return { + "suffix": self._suffix, + "alias": { + "put_data_to_generator": { "func": self.put_data_to_generator }, + "get_generator": { "func": self.get_generator, "return_value": True }, + "get_realtime_generator": { "func": self.get_realtime_generator, "return_value": True }, + "get_complete_generator": { "func": self.get_complete_generator, "return_value": True }, + }, + } + +def export(): + return ("ResponseGenerator", ResponseGenerator) \ No newline at end of file