Skip to content

Commit

Permalink
Merge pull request #172 from Maplemx/dev
Browse files Browse the repository at this point in the history
Update Realtime
  • Loading branch information
Maplemx authored Oct 16, 2024
2 parents 7a25c9d + 9019b1e commit 1feed29
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 121 deletions.
7 changes: 6 additions & 1 deletion Agently/plugins/agent_component/Decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ def wrapper(*args, **kwargs):
return wrapper

def on_event(self, event: str, *, is_await:bool=False):
if not event.startswith("response:") and not event.startswith("tool:"):
if (
not event.startswith("response:")
and not event.startswith("tool:")
and not event.startswith("realtime:")
and not event == "realtime"
):
event = "response:" + event
def decorator(func: callable):
self.agent.add_event_listener(event, func, is_await=is_await)
Expand Down
68 changes: 58 additions & 10 deletions Agently/plugins/agent_component/EventListener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,63 @@ def __init__(self, agent: object):
self.async_tasks = []

def add(self, event:str, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
if is_agent_event:
if event not in (self.agent_listeners.get(trace_back=False) or {}):
self.agent_listeners.update(event, [])
self.agent_listeners.append(event, { "listener": listener, "is_await": is_await })
return self.agent
event = event.replace(".", "->")
if event.startswith("realtime:"):
event_data = event.replace(" ", "").split(":")
hooks = event_data[1].replace("->", ".").split("&")
hook_list = []
for hook in hooks:
hook_key_indexes = hook.split("?")
hook_key = hook_key_indexes[0]
hook_indexes = None
if len(hook_key_indexes) > 1:
hook_indexes = hook_key_indexes[1].split(",")
for index, item in enumerate(hook_indexes):
if item.startswith("(") and item.endswith(")"):
items_in_item = item[1:-1].split("|")
else:
items_in_item = [item]
for i, value in enumerate(items_in_item):
try:
items_in_item[i] = int(value)
except:
items_in_item[i] = None
hook_indexes[index] = items_in_item
hook_list.append((hook_key, hook_indexes))
async def realtime_hook_handler(data):
for hook in hook_list:
hook_key = hook[0]
hook_indexes = hook[1]
if (data["key"] == hook_key):
if hook_indexes and len(hook_indexes) <= len(data["indexes"]):
can_call = True
for position, hook_index in enumerate(hook_indexes):
if hook_index != None and data["indexes"][position] not in hook_index:
can_call = False
if can_call:
if asyncio.iscoroutinefunction(listener):
await listener(data)
else:
listener(data)
else:
if asyncio.iscoroutinefunction(listener):
await listener(data)
else:
listener(data)
if event not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update(event, [])
self.listeners.append("realtime", { "listener": realtime_hook_handler, "is_await": is_await })
else:
if event not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update(event, [])
self.listeners.append(event, { "listener": listener, "is_await": is_await })
return self.agent
if is_agent_event:
if event not in (self.agent_listeners.get(trace_back=False) or {}):
self.agent_listeners.update(event, [])
self.agent_listeners.append(event, { "listener": listener, "is_await": is_await })
return self.agent
else:
if event not in (self.listeners.get(trace_back=False) or {}):
self.listeners.update(event, [])
self.listeners.append(event, { "listener": listener, "is_await": is_await })
return self.agent

def on_delta(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
self.add("response:delta", listener, is_await=is_await, is_agent_event=is_agent_event)
Expand All @@ -36,10 +83,11 @@ def on_finally(self, listener: callable, *, is_await:bool=False, is_agent_event:
return self.agent

def on_realtime(self, listener: callable, *, is_await:bool=False, is_agent_event:bool=False):
self.add("response:realtime", listener, is_await=is_await, is_agent_event=is_agent_event)
self.add("realtime", listener, is_await=is_await, is_agent_event=is_agent_event)
return self.agent

async def call_event_listeners(self, event: str, data: any):
event = event.replace(".", "->")
listeners = self.listeners.get_trace_back() or {}
if event in listeners:
for listener_info in listeners[event]:
Expand Down
250 changes: 142 additions & 108 deletions Agently/plugins/agent_component/Realtime.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,163 @@
import json5
from Agently.utils import Lexer
from .utils import ComponentABC
from Agently.utils import find_json, DataGenerator
from Agently.utils import find_json

class Realtime(ComponentABC):
def __init__(self, agent: object):
self.agent = agent
self._is_enable = False
self._is_init = False
self._output_data_frame = None
self._target_keys = []
self._buffer = ""
self._data_generator = DataGenerator()
self._streamed_keys = []
self._ongoing_key = None
self._ongoing_value = None
self._last_value = None
self.__is_enable = False
self.__is_init = False
self.__on_going_key_id = None
self.__cached_value = {}
self.__possible_keys = set()
self.__emitted = set()
self.__streaming_buffer = ""
self.__realtime_value = None

def use_realtime(self):
self._is_enable = True
self.__is_enable = True
return self.agent

def __get_output_data_frame(self, prompt_output:dict, prefix:str=""):
result = {}
for key, value in prompt_output.items():
if isinstance(key, dict):
prefix += f"{key}."
result.update({ key: self.__get_output_data_frame(value, prefix) })
else:
self._target_keys.append(prefix + key)
result.update({ key: None })
return result
def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None):
prefix = prefix if prefix != None else ""
self.__possible_keys.add(prefix)
if isinstance(prompt_output_pointer, dict):
for key, value in prompt_output_pointer.items():
self.__scan_possible_keys(value, prefix=prefix+f".{ key }")
elif isinstance(prompt_output_pointer, list):
for item in prompt_output_pointer:
self.__scan_possible_keys(item, prefix=prefix+".[]")
else:
return

async def __scan_realtime_value(self, realtime_value:dict, prefix:str="", output_data_frame_pointer:dict={}):
for key, value in realtime_value.items():
if isinstance(value, dict):
if key in output_data_frame_pointer:
await self.__scan_realtime_value(value, prefix + f"{key}.", output_data_frame_pointer[key])
async def __emit_realtime(self, key, indexes, delta, value):
await self.agent.call_event_listeners(
"realtime",
{
"key": key[1:],
"indexes": indexes,
"delta": delta,
"value": value,
"complete_value": self.__realtime_value,
}
)

async def __scan_realtime_value(self, key: str, indexes:list, value:any):
indexes = indexes[:]
key_id = (key, json5.dumps(indexes))
if key_id in self.__emitted or key not in self.__possible_keys:
return
if isinstance(value, dict):
for item_key, item_value in value.items():
await self.__scan_realtime_value(key + f".{ item_key }", indexes, item_value)
self.__cached_value[key_id] = value
elif isinstance(value, list):
for item_index, item_value in enumerate(value):
temp = indexes[:]
temp.append(item_index)
await self.__scan_realtime_value(key + f".[]", temp, item_value)
self.__cached_value[key_id] = value
else:
if isinstance(value, str):
cached_value = self.__cached_value[key_id] if key_id in self.__cached_value else ""
cached_value = cached_value if cached_value != None else ""
delta = value.replace(cached_value, "")
if len(delta) > 0:
await self.__emit_realtime(key, indexes, delta, value)
self.__cached_value[key_id] = value
self.__on_going_key_id = key_id
else:
if self._ongoing_key == (prefix + key):
self._ongoing_value = value
if self._last_value and isinstance(self._ongoing_value, str):
delta = self._ongoing_value.replace(self._last_value, "")
else:
delta = value
if delta != "":
await self.agent.call_event_listeners(
"response:realtime",
{
"key": self._ongoing_key,
"value": self._ongoing_value,
"delta": delta,
"complete_value": realtime_value,
},
)
self._last_value = value
if self._ongoing_key != prefix + key:
if (prefix + key) not in self._streamed_keys and value not in (None, ""):
if self._ongoing_key != None:
await self.agent.call_event_listeners(f"key_stop:{ self._ongoing_key }", None)
self._ongoing_key = prefix + key
await self.agent.call_event_listeners(f"key_start:{ self._ongoing_key }", None)
self._streamed_keys.append(prefix + key)
self._ongoing_value = value
if self._last_value and isinstance(self._ongoing_value, str):
delta = self._ongoing_value.replace(self._last_value, "")
else:
delta = value
if delta != "":
await self.agent.call_event_listeners(
"response:realtime",
{
"key": self._ongoing_key,
"value": self._ongoing_value,
"delta": delta,
"complete_value": realtime_value,
},
)
self._last_value = value
if (prefix + key) in self._streamed_keys or value in (None, ""):
continue
self.__cached_value[key_id] = value
self.__on_going_key_id = key_id
def __judge_can_emit(self, current_key_id, on_going_key_id):
if (not on_going_key_id[0].startswith(current_key_id[0])
and on_going_key_id[0] + ".[]" != current_key_id[0]):
return True
current_indexes = json5.loads(current_key_id[1])
on_going_indexes = json5.loads(on_going_key_id[1])
for position, on_going_index in enumerate(on_going_indexes):
if (len(current_indexes) >= position + 1
and on_going_index > current_indexes[position]):
return True
return False

async def _suffix(self, event:str, data:any):
if not self._is_enable or "type" not in self.agent.request.response_cache or self.agent.request.response_cache["type"] != "JSON":
async def __emit_waiting(self, *, is_done:bool=False):
key_ids_to_del = []
for key_id, value in self.__cached_value.items():
indexes = json5.loads(key_id[1])
if not is_done and key_id[0] == "":
continue
if key_id in self.__emitted:
key_ids_to_del.append(key_id)
continue
if self.__on_going_key_id == None:
continue
if self.__judge_can_emit(key_id, self.__on_going_key_id) or is_done:
if isinstance(value, dict):
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
elif isinstance(value, list):
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
elif isinstance(value, str):
await self.__emit_realtime(key_id[0] + ".$complete", indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
else:
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
for key_id in key_ids_to_del:
del self.__cached_value[key_id]

async def _suffix(self, event: str, data: any):
if (
not self.__is_enable
or "type" not in self.agent.request.response_cache
or self.agent.request.response_cache["type"] != "JSON"
):
return None
else:
if not self._is_init:
prompt_output = self.agent.request.response_cache["prompt"]["output"]
if isinstance(prompt_output, dict):
self._output_data_frame = self.__get_output_data_frame(prompt_output)
self._is_init = True
if event == "response:delta":
self._buffer += data
realtime_json_str = find_json(self._buffer)
if realtime_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
realtime_value = json5.loads(lexer.complete_json())
if self._output_data_frame == None:
if self._last_value and isinstance(realtime_value, str):
delta = realtime_value.replace(self._last_value, "")
else:
delta = realtime_value
if delta != "":
await self.agent.call_event_listeners(
"response:realtime",
{
"key": None,
"value": realtime_value,
"delta": delta,
"complete_value": realtime_value,
},
)
self._last_value = realtime_value
else:
if isinstance(realtime_value, dict):
await self.__scan_realtime_value(realtime_value, "", self._output_data_frame)

if not self.__is_init:
prompt_output = self.agent.request.response_cache["prompt"]["output"]
if isinstance(prompt_output, dict):
self.__scan_possible_keys(prompt_output)
self.__is_init = True
if event == "response:delta":
self.__streaming_buffer += data
realtime_json_str = find_json(self.__streaming_buffer)
if realtime_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
try:
self.__realtime_value = json5.loads(lexer.complete_json())
await self.__scan_realtime_value("", [], self.__realtime_value)
await self.__emit_waiting()
except ValueError:
return None
except Exception as e:
raise(e)
if event == "response:done":
self.__streaming_buffer += data
realtime_json_str = find_json(self.__streaming_buffer)
if realtime_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
try:
self.__realtime_value = json5.loads(lexer.complete_json())
await self.__scan_realtime_value("", [], self.__realtime_value)
await self.__emit_waiting(is_done=True)
except ValueError:
return None
except Exception as e:
raise(e)

def export(self):
return {
"prefix": None,
"suffix": self._suffix,
"alias": {
"use_realtime": { "func": self.use_realtime }
},
"alias": { "use_realtime": { "func": self.use_realtime } }
}

def export():
Expand Down
4 changes: 2 additions & 2 deletions Agently/utils/DataOps.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def update(self, keys_with_dots: any, value: any=None):
self.data_ops.update(f"{ self.namespace_name }.{ keys_with_dots }", value)
return self.return_to

def get(self, keys_with_dots: (str, None) = None, default = None, *, no_copy: bool = False):
def get(self, keys_with_dots: str = None, default = None, *, no_copy: bool = False):
return self.data_ops.get(f"{ self.namespace_name }.{ keys_with_dots }" if keys_with_dots else self.namespace_name, default, no_copy = no_copy)

def remove(self, keys_with_dots: str):
Expand Down Expand Up @@ -180,7 +180,7 @@ def _deep_get(self, data):
else:
return copy.deepcopy(data)

def get(self, keys_with_dots: (str, None) = None, default: str=None, *, no_copy: bool = False):
def get(self, keys_with_dots: str = None, default: str=None, *, no_copy: bool = False):
if keys_with_dots:
keys = keys_with_dots.split('.')
pointer = self.target_data
Expand Down

0 comments on commit 1feed29

Please sign in to comment.