Skip to content

Commit

Permalink
Merge pull request #180 from Maplemx/dev
Browse files Browse the repository at this point in the history
v3.4.0.4
  • Loading branch information
Maplemx authored Oct 30, 2024
2 parents e81dd37 + 19f4cef commit e3025fd
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 88 deletions.
3 changes: 2 additions & 1 deletion Agently/plugins/agent_component/Decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def on_event(self, event: str, *, is_await:bool=False):
and not event.startswith("tool:")
and not event.startswith("realtime:")
and not event == "realtime"
and not event.startswith("instant:")
and not event == "instant"
):
event = "response:" + event
def decorator(func: callable):
Expand Down Expand Up @@ -65,7 +67,6 @@ def decorator(func: callable):
self.agent.register_tool(**tool_info_kwrags)
return func
return decorator


def export(self):
return {
Expand Down
100 changes: 53 additions & 47 deletions Agently/plugins/agent_component/EventListener.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import types
import asyncio
from itertools import combinations
from .utils import ComponentABC
from Agently.utils import RuntimeCtx, RuntimeCtxNamespace

Expand All @@ -14,53 +14,55 @@ def __init__(self, agent: object):
def add(self, event:str, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
event = event.replace(".", "->")
if event == "realtime":
self.agent.settings.set("use_realtime", True)
self.agent.settings.set("use_instant", True)
event = "instant"
if event.startswith("realtime:"):
self.agent.settings.set("use_realtime", True)
self.agent.settings.set("use_instant", True)
event = "instant:" + event[9:]
if event == "instant" or event.startswith("instant:"):
self.agent.settings.set("use_instant", True)
if event.startswith("instant:"):
event_data = event.replace(" ", "").split(":")
hooks = event_data[1].replace("->", ".").split("&")
hook_list = []
for hook in hooks:
hook_key_indexes = hook.split("?")
hook_key = hook_key_indexes[0]
hook_indexes = None
if len(hook_key_indexes) > 1:
hook_indexes = hook_key_indexes[1].split(",")
for index, item in enumerate(hook_indexes):
if item.startswith("(") and item.endswith(")"):
items_in_item = item[1:-1].split("|")
keys = event_data[1].replace("->", ".").split("&")
key_indexes_list = []
for key_str in keys:
if isinstance(key_str, str):
if "?" in key_str:
key, indexes_str = key_str.split("?")
index_list = indexes_str.split(",")
if index_list == [""]:
index_list = []
else:
key = key_str
index_list = []
indexes = []
for index in index_list:
if index in ("_", "*"):
indexes.append(-1)
else:
items_in_item = [item]
for i, value in enumerate(items_in_item):
try:
items_in_item[i] = int(value)
except:
items_in_item[i] = None
hook_indexes[index] = items_in_item
hook_list.append((hook_key, hook_indexes))
async def realtime_hook_handler(data):
for hook in hook_list:
hook_key = hook[0]
hook_indexes = hook[1]
if (data["key"] == hook_key):
if hook_indexes and len(hook_indexes) <= len(data["indexes"]):
can_call = True
for position, hook_index in enumerate(hook_indexes):
if hook_index != None and data["indexes"][position] not in hook_index:
can_call = False
if can_call:
if asyncio.iscoroutinefunction(listener):
await listener(data)
else:
listener(data)
indexes.append(int(index))
key_indexes_list.append((key, indexes))
async def instant_hook_handler(data):
indexes = data["indexes"]
if (data["key"], indexes) in key_indexes_list or (data["key"], []) in key_indexes_list:
if asyncio.iscoroutinefunction(listener):
await listener(data)
else:
listener(data)
indexes_len = len(indexes)
for r in range(1, indexes_len + 1):
for indices in combinations(range(indexes_len), r):
possible_indexes = indexes[:]
for i in indices:
possible_indexes[i] = -1
if (data["key"], possible_indexes) in key_indexes_list:
if asyncio.iscoroutinefunction(listener):
await listener(data)
else:
if asyncio.iscoroutinefunction(listener):
await listener(data)
else:
listener(data)
if "realtime" not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update("realtime", [])
self.listeners.append("realtime", { "listener": realtime_hook_handler, "is_await": is_await })
listener(data)
if event not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update(event, [])
self.listeners.append("instant", { "listener": instant_hook_handler, "is_await": is_await })
else:
if is_agent_event:
if event not in (self.agent_listeners.get(trace_back=False) or {}):
Expand All @@ -85,9 +87,13 @@ def on_finally(self, listener: callable, *, is_await:bool=False, is_agent_event:
self.add("response:finally", listener, is_await=is_await, is_agent_event=is_agent_event)
return self.agent

def on_instant(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
self.add("instant", listener, is_await=is_await, is_agent_event=is_agent_event)
return self.agent

def on_realtime(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
self.agent.settings.set("use_realtime", True)
self.add("realtime", listener, is_await=is_await, is_agent_event=is_agent_event)
self.agent.settings.set("use_instant", True)
self.add("instant", listener, is_await=is_await, is_agent_event=is_agent_event)
return self.agent

async def call_event_listeners(self, event: str, data: any):
Expand Down Expand Up @@ -121,7 +127,7 @@ def export(self):
"on_delta": { "func": self.on_delta },
"on_done": { "func": self.on_done },
"on_finally": { "func": self.on_finally },
"on_realtime": { "func": self.on_realtime },
"on_instant": { "func": self.on_instant },
"call_event_listeners": { "func": self.call_event_listeners },
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
from .utils import ComponentABC
from Agently.utils import find_json

class Realtime(ComponentABC):
class Instant(ComponentABC):
def __init__(self, agent: object):
self.agent = agent
self.__get_enable = self.agent.settings.get_trace_back("use_realtime")
self.__get_enable = self.agent.settings.get_trace_back("use_instant")
self.__is_init = False
self.__on_going_key_id = None
self.__cached_value = {}
self.__possible_keys = set()
self.__emitted = set()
self.__streaming_buffer = ""
self.__realtime_value = None
self.__instant_value = None

def use_realtime(self):
self.agent.settings.set("use_realtime", True)
def use_instant(self):
self.agent.settings.set("use_instant", True)
return self.agent

def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None):
Expand All @@ -31,40 +31,40 @@ def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None):
else:
return

async def __emit_realtime(self, key, indexes, delta, value):
event = "realtime"
async def __emit_instant(self, key, indexes, delta, value):
event = "instant"
data = {
"key": key[1:],
"indexes": indexes,
"delta": delta,
"value": value,
"complete_value": self.__realtime_value,
"complete_value": self.__instant_value,
}
self.agent.put_data_to_generator(event, data)
await self.agent.call_event_listeners(event, data)

async def __scan_realtime_value(self, key: str, indexes:list, value:any):
async def __scan_instant_value(self, key: str, indexes:list, value:any):
indexes = indexes[:]
key_id = (key, json5.dumps(indexes))
if key_id in self.__emitted or key not in self.__possible_keys:
return
if isinstance(value, dict):
for item_key, item_value in value.items():
await self.__scan_realtime_value(key + f".{ item_key }", indexes, item_value)
await self.__scan_instant_value(key + f".{ item_key }", indexes, item_value)
self.__cached_value[key_id] = value
elif isinstance(value, list):
for item_index, item_value in enumerate(value):
temp = indexes[:]
temp.append(item_index)
await self.__scan_realtime_value(key + f".[]", temp, item_value)
await self.__scan_instant_value(key + f".[]", temp, item_value)
self.__cached_value[key_id] = value
else:
if isinstance(value, str):
cached_value = self.__cached_value[key_id] if key_id in self.__cached_value else ""
cached_value = cached_value if cached_value != None else ""
delta = value.replace(cached_value, "")
if len(delta) > 0:
await self.__emit_realtime(key, indexes, delta, value)
await self.__emit_instant(key + ".$delta", indexes, delta, value)
self.__cached_value[key_id] = value
self.__on_going_key_id = key_id
else:
Expand Down Expand Up @@ -95,27 +95,23 @@ async def __emit_waiting(self, *, is_done:bool=False):
continue
if self.__judge_can_emit(key_id, self.__on_going_key_id) or is_done:
if isinstance(value, dict):
await self.__emit_realtime(key_id[0], indexes, value, value)
await self.__emit_instant(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
elif isinstance(value, list):
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
elif isinstance(value, str):
await self.__emit_realtime(key_id[0] + ".$complete", indexes, value, value)
await self.__emit_instant(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
else:
await self.__emit_realtime(key_id[0], indexes, value, value)
await self.__emit_instant(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
for key_id in key_ids_to_del:
del self.__cached_value[key_id]

async def _suffix(self, event: str, data: any):
if (
not self.agent.settings.get("use_realtime")
not self.agent.settings.get("use_instant")
or "type" not in self.agent.request.response_cache
or self.agent.request.response_cache["type"] != "JSON"
):
Expand All @@ -127,27 +123,27 @@ async def _suffix(self, event: str, data: any):
self.__is_init = True
if event == "response:delta":
self.__streaming_buffer += data
realtime_json_str = find_json(self.__streaming_buffer)
if realtime_json_str != None:
instant_json_str = find_json(self.__streaming_buffer)
if instant_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
lexer.append_string(instant_json_str)
try:
self.__realtime_value = json5.loads(lexer.complete_json())
await self.__scan_realtime_value("", [], self.__realtime_value)
self.__instant_value = json5.loads(lexer.complete_json())
await self.__scan_instant_value("", [], self.__instant_value)
await self.__emit_waiting()
except ValueError:
return None
except Exception as e:
raise(e)
if event == "response:done":
self.__streaming_buffer += data
realtime_json_str = find_json(self.__streaming_buffer)
if realtime_json_str != None:
instant_json_str = find_json(self.__streaming_buffer)
if instant_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
lexer.append_string(instant_json_str)
try:
self.__realtime_value = json5.loads(lexer.complete_json())
await self.__scan_realtime_value("", [], self.__realtime_value)
self.__instant_value = json5.loads(lexer.complete_json())
await self.__scan_instant_value("", [], self.__instant_value)
await self.__emit_waiting(is_done=True)
except ValueError:
return None
Expand All @@ -157,8 +153,11 @@ async def _suffix(self, event: str, data: any):
def export(self):
return {
"suffix": self._suffix,
"alias": { "use_realtime": { "func": self.use_realtime } }
"alias": {
"use_instant": { "func": self.use_instant },
"use_realtime": { "func": self.use_instant },
},
}

def export():
return ("Realtime", Realtime)
return ("Instant", Instant)
Loading

0 comments on commit e3025fd

Please sign in to comment.