Skip to content

Commit

Permalink
update: rewrite Realtime to support output data with very complex str…
Browse files Browse the repository at this point in the history
…ucture
  • Loading branch information
Maplemx committed Oct 16, 2024
1 parent d4e230a commit 9019b1e
Showing 1 changed file with 142 additions and 108 deletions.
250 changes: 142 additions & 108 deletions Agently/plugins/agent_component/Realtime.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,163 @@
import json5
from Agently.utils import Lexer
from .utils import ComponentABC
from Agently.utils import find_json, DataGenerator
from Agently.utils import find_json

class Realtime(ComponentABC):
def __init__(self, agent: object):
self.agent = agent
self._is_enable = False
self._is_init = False
self._output_data_frame = None
self._target_keys = []
self._buffer = ""
self._data_generator = DataGenerator()
self._streamed_keys = []
self._ongoing_key = None
self._ongoing_value = None
self._last_value = None
self.__is_enable = False
self.__is_init = False
self.__on_going_key_id = None
self.__cached_value = {}
self.__possible_keys = set()
self.__emitted = set()
self.__streaming_buffer = ""
self.__realtime_value = None

def use_realtime(self):
self._is_enable = True
self.__is_enable = True
return self.agent

def __get_output_data_frame(self, prompt_output:dict, prefix:str=""):
result = {}
for key, value in prompt_output.items():
if isinstance(key, dict):
prefix += f"{key}."
result.update({ key: self.__get_output_data_frame(value, prefix) })
else:
self._target_keys.append(prefix + key)
result.update({ key: None })
return result
def __scan_possible_keys(self, prompt_output_pointer, *, prefix:str=None):
prefix = prefix if prefix != None else ""
self.__possible_keys.add(prefix)
if isinstance(prompt_output_pointer, dict):
for key, value in prompt_output_pointer.items():
self.__scan_possible_keys(value, prefix=prefix+f".{ key }")
elif isinstance(prompt_output_pointer, list):
for item in prompt_output_pointer:
self.__scan_possible_keys(item, prefix=prefix+".[]")
else:
return

async def __scan_realtime_value(self, realtime_value:dict, prefix:str="", output_data_frame_pointer:dict={}):
for key, value in realtime_value.items():
if isinstance(value, dict):
if key in output_data_frame_pointer:
await self.__scan_realtime_value(value, prefix + f"{key}.", output_data_frame_pointer[key])
async def __emit_realtime(self, key, indexes, delta, value):
await self.agent.call_event_listeners(
"realtime",
{
"key": key[1:],
"indexes": indexes,
"delta": delta,
"value": value,
"complete_value": self.__realtime_value,
}
)

async def __scan_realtime_value(self, key: str, indexes:list, value:any):
indexes = indexes[:]
key_id = (key, json5.dumps(indexes))
if key_id in self.__emitted or key not in self.__possible_keys:
return
if isinstance(value, dict):
for item_key, item_value in value.items():
await self.__scan_realtime_value(key + f".{ item_key }", indexes, item_value)
self.__cached_value[key_id] = value
elif isinstance(value, list):
for item_index, item_value in enumerate(value):
temp = indexes[:]
temp.append(item_index)
await self.__scan_realtime_value(key + f".[]", temp, item_value)
self.__cached_value[key_id] = value
else:
if isinstance(value, str):
cached_value = self.__cached_value[key_id] if key_id in self.__cached_value else ""
cached_value = cached_value if cached_value != None else ""
delta = value.replace(cached_value, "")
if len(delta) > 0:
await self.__emit_realtime(key, indexes, delta, value)
self.__cached_value[key_id] = value
self.__on_going_key_id = key_id
else:
if self._ongoing_key == (prefix + key):
self._ongoing_value = value
if self._last_value and isinstance(self._ongoing_value, str):
delta = self._ongoing_value.replace(self._last_value, "")
else:
delta = value
if delta != "":
await self.agent.call_event_listeners(
"response:realtime",
{
"key": self._ongoing_key,
"value": self._ongoing_value,
"delta": delta,
"complete_value": realtime_value,
},
)
self._last_value = value
if self._ongoing_key != prefix + key:
if (prefix + key) not in self._streamed_keys and value not in (None, ""):
if self._ongoing_key != None:
await self.agent.call_event_listeners(f"key_stop:{ self._ongoing_key }", None)
self._ongoing_key = prefix + key
await self.agent.call_event_listeners(f"key_start:{ self._ongoing_key }", None)
self._streamed_keys.append(prefix + key)
self._ongoing_value = value
if self._last_value and isinstance(self._ongoing_value, str):
delta = self._ongoing_value.replace(self._last_value, "")
else:
delta = value
if delta != "":
await self.agent.call_event_listeners(
"response:realtime",
{
"key": self._ongoing_key,
"value": self._ongoing_value,
"delta": delta,
"complete_value": realtime_value,
},
)
self._last_value = value
if (prefix + key) in self._streamed_keys or value in (None, ""):
continue
self.__cached_value[key_id] = value
self.__on_going_key_id = key_id
def __judge_can_emit(self, current_key_id, on_going_key_id):
if (not on_going_key_id[0].startswith(current_key_id[0])
and on_going_key_id[0] + ".[]" != current_key_id[0]):
return True
current_indexes = json5.loads(current_key_id[1])
on_going_indexes = json5.loads(on_going_key_id[1])
for position, on_going_index in enumerate(on_going_indexes):
if (len(current_indexes) >= position + 1
and on_going_index > current_indexes[position]):
return True
return False

async def _suffix(self, event:str, data:any):
if not self._is_enable or "type" not in self.agent.request.response_cache or self.agent.request.response_cache["type"] != "JSON":
async def __emit_waiting(self, *, is_done:bool=False):
key_ids_to_del = []
for key_id, value in self.__cached_value.items():
indexes = json5.loads(key_id[1])
if not is_done and key_id[0] == "":
continue
if key_id in self.__emitted:
key_ids_to_del.append(key_id)
continue
if self.__on_going_key_id == None:
continue
if self.__judge_can_emit(key_id, self.__on_going_key_id) or is_done:
if isinstance(value, dict):
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
elif isinstance(value, list):
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
elif isinstance(value, str):
await self.__emit_realtime(key_id[0] + ".$complete", indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
else:
await self.__emit_realtime(key_id[0], indexes, value, value)
self.__emitted.add(key_id)
key_ids_to_del.append(key_id)
for key_id in key_ids_to_del:
del self.__cached_value[key_id]

async def _suffix(self, event: str, data: any):
if (
not self.__is_enable
or "type" not in self.agent.request.response_cache
or self.agent.request.response_cache["type"] != "JSON"
):
return None
else:
if not self._is_init:
prompt_output = self.agent.request.response_cache["prompt"]["output"]
if isinstance(prompt_output, dict):
self._output_data_frame = self.__get_output_data_frame(prompt_output)
self._is_init = True
if event == "response:delta":
self._buffer += data
realtime_json_str = find_json(self._buffer)
if realtime_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
realtime_value = json5.loads(lexer.complete_json())
if self._output_data_frame == None:
if self._last_value and isinstance(realtime_value, str):
delta = realtime_value.replace(self._last_value, "")
else:
delta = realtime_value
if delta != "":
await self.agent.call_event_listeners(
"response:realtime",
{
"key": None,
"value": realtime_value,
"delta": delta,
"complete_value": realtime_value,
},
)
self._last_value = realtime_value
else:
if isinstance(realtime_value, dict):
await self.__scan_realtime_value(realtime_value, "", self._output_data_frame)

if not self.__is_init:
prompt_output = self.agent.request.response_cache["prompt"]["output"]
if isinstance(prompt_output, dict):
self.__scan_possible_keys(prompt_output)
self.__is_init = True
if event == "response:delta":
self.__streaming_buffer += data
realtime_json_str = find_json(self.__streaming_buffer)
if realtime_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
try:
self.__realtime_value = json5.loads(lexer.complete_json())
await self.__scan_realtime_value("", [], self.__realtime_value)
await self.__emit_waiting()
except ValueError:
return None
except Exception as e:
raise(e)
if event == "response:done":
self.__streaming_buffer += data
realtime_json_str = find_json(self.__streaming_buffer)
if realtime_json_str != None:
lexer = Lexer()
lexer.append_string(realtime_json_str)
try:
self.__realtime_value = json5.loads(lexer.complete_json())
await self.__scan_realtime_value("", [], self.__realtime_value)
await self.__emit_waiting(is_done=True)
except ValueError:
return None
except Exception as e:
raise(e)

def export(self):
return {
"prefix": None,
"suffix": self._suffix,
"alias": {
"use_realtime": { "func": self.use_realtime }
},
"alias": { "use_realtime": { "func": self.use_realtime } }
}

def export():
Expand Down

0 comments on commit 9019b1e

Please sign in to comment.