Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/generation log colang 2.x #797

Draft
wants to merge 14 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion nemoguardrails/actions/v2_x/generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
get_element_from_head,
get_event_from_element,
)
from nemoguardrails.context import llm_call_info_var
from nemoguardrails.embeddings.index import EmbeddingsIndex, IndexItem
from nemoguardrails.llm.filters import colang
from nemoguardrails.llm.params import llm_params
from nemoguardrails.llm.types import Task
from nemoguardrails.logging import verbose
from nemoguardrails.logging.explain import LLMCallInfo
from nemoguardrails.utils import console, new_uuid

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -279,6 +281,10 @@ async def generate_user_intent(
Task.GENERATE_USER_INTENT_FROM_USER_ACTION
)

# Initialize the LLMCallInfo object
llm_call_info_var.set(
LLMCallInfo(task=Task.GENERATE_USER_INTENT_FROM_USER_ACTION.value)
)
# We make this call with lowest temperature to have it as deterministic as possible.
with llm_params(llm, temperature=self.config.lowest_temperature):
result = await llm_call(llm, prompt, stop=stop)
Expand Down Expand Up @@ -348,6 +354,13 @@ async def generate_user_intent_and_bot_action(
stop = self.llm_task_manager.get_stop_tokens(
Task.GENERATE_USER_INTENT_AND_BOT_ACTION_FROM_USER_ACTION
)
#
# Initialize the LLMCallInfo object
llm_call_info_var.set(
LLMCallInfo(
task=Task.GENERATE_USER_INTENT_AND_BOT_ACTION_FROM_USER_ACTION.value
)
)

# We make this call with lowest temperature to have it as deterministic as possible.
with llm_params(llm, temperature=self.config.lowest_temperature):
Expand Down Expand Up @@ -400,6 +413,23 @@ async def check_if_flow_defined(self, state: "State", flow_id: str) -> bool:
"""Return True if a flow with the provided flow_id is defined."""
return flow_id in state.flow_configs

# @action(name="GetRailsFlowNameAction", is_system_action=True)
# async def get_input_or_output_rails_name(self, state: "State", flow_id: str) -> str:
# """Return the input or output name of the flow with the provided flow_id."""
# flow_config = state.flow_configs[flow_id]
# for element in flow_config.elements:
# if (
# not isinstance(element, dict)
# and hasattr(element, "op")
# and element.op == "send"
# ):
# flow_name = (
# element.spec.arguments["flow_id"]
# if hasattr(element, "spec")
# else None
# )
# return flow_name

@action(name="CheckForActiveEventMatchAction", is_system_action=True)
async def check_for_active_flow_finished_match(
self, state: "State", event_name: str, **arguments: Any
Expand Down Expand Up @@ -459,6 +489,10 @@ async def generate_flow_from_instructions(
},
)

# Initialize the LLMCallInfo object
llm_call_info_var.set(
LLMCallInfo(task=Task.GENERATE_FLOW_FROM_INSTRUCTIONS.value)
)
# We make this call with temperature 0 to have it as deterministic as possible.
with llm_params(llm, temperature=self.config.lowest_temperature):
result = await llm_call(llm, prompt)
Expand Down Expand Up @@ -523,6 +557,7 @@ async def generate_flow_from_name(

stop = self.llm_task_manager.get_stop_tokens(Task.GENERATE_FLOW_FROM_NAME)

llm_call_info_var.set(LLMCallInfo(task=Task.GENERATE_FLOW_FROM_NAME.value))
# We make this call with temperature 0 to have it as deterministic as possible.
with llm_params(llm, temperature=self.config.lowest_temperature):
result = await llm_call(llm, prompt, stop)
Expand Down Expand Up @@ -581,6 +616,7 @@ async def generate_flow_continuation(
},
)

llm_call_info_var.set(LLMCallInfo(task=Task.GENERATE_FLOW_CONTINUATION.value))
# We make this call with temperature 0 to have it as deterministic as possible.
with llm_params(llm, temperature=temperature):
result = await llm_call(llm, prompt)
Expand Down Expand Up @@ -702,6 +738,9 @@ async def generate_value(
Task.GENERATE_USER_INTENT_FROM_USER_ACTION
)

llm_call_info_var.set(
LLMCallInfo(task=Task.GENERATE_USER_INTENT_FROM_USER_ACTION.value)
)
with llm_params(llm, temperature=0.1):
result = await llm_call(llm, prompt, stop)

Expand Down Expand Up @@ -804,7 +843,9 @@ async def generate_flow(
stop = self.llm_task_manager.get_stop_tokens(
Task.GENERATE_FLOW_CONTINUATION_FROM_NLD
)

llm_call_info_var.set(
LLMCallInfo(task=Task.GENERATE_FLOW_CONTINUATION_FROM_NLD.value)
)
with llm_params(llm, temperature=self.config.lowest_temperature):
result = await llm_call(llm, prompt, stop)

Expand Down
2 changes: 1 addition & 1 deletion nemoguardrails/colang/v1_0/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
compute_context,
compute_next_steps,
)
from nemoguardrails.logging.processing_log import processing_log_var
from nemoguardrails.context import processing_log_var
from nemoguardrails.utils import new_event_dict, new_uuid

log = logging.getLogger(__name__)
Expand Down
7 changes: 5 additions & 2 deletions nemoguardrails/colang/v2_x/runtime/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Deque, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -85,6 +85,9 @@ class Event:
# A list of matching scores from the event sequence triggered by an external event
matching_scores: List[float] = field(default_factory=list)

# Timestamp of event creation
created_at: datetime = datetime.now(timezone.utc)

def is_equal(self, other: Event) -> bool:
"""Compares two events in terms of their name and arguments."""
if isinstance(other, Event):
Expand All @@ -102,7 +105,7 @@ def __str__(self) -> str:
@classmethod
def from_umim_event(cls, event: dict) -> Event:
"""Creates an event from a flat dictionary."""
new_event = Event(event["type"], {})
new_event = cls(event["type"], {})
new_event.arguments = dict(
[(key, event[key]) for key in event if key not in ["type"]]
)
Expand Down
15 changes: 12 additions & 3 deletions nemoguardrails/colang/v2_x/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
initialize_state,
run_to_completion,
)
from nemoguardrails.context import processing_log_var
from nemoguardrails.rails.llm.config import RailsConfig
from nemoguardrails.utils import new_event_dict, new_readable_uuid

Expand Down Expand Up @@ -352,7 +353,8 @@ def _get_action_finished_event(result: dict, **kwargs) -> Dict[str, Any]:
)

async def _get_async_actions_finished_events(
self, main_flow_uid: str
self,
main_flow_uid: str,
) -> Tuple[List[dict], int]:
"""Helper to return the ActionFinished events for the local async actions that finished.

Expand Down Expand Up @@ -400,6 +402,7 @@ async def process_events(
state: Union[Optional[dict], State] = None,
blocking: bool = False,
instant_actions: Optional[List[str]] = None,
processing_log: Optional[List[Dict]] = None,
) -> Tuple[List[Dict[str, Any]], State]:
"""Process a sequence of events in a given state.

Expand Down Expand Up @@ -432,7 +435,11 @@ async def process_events(
output_events = []
input_events: List[Union[dict, InternalEvent]] = events.copy()
local_running_actions: List[asyncio.Task[dict]] = []
if processing_log is None:
processing_log = []

processing_log_var.set(processing_log)
processing_log.append({})
if state is None or state == {}:
state = State(
flow_states={}, flow_configs=self.flow_configs, rails_config=self.config
Expand All @@ -453,6 +460,7 @@ async def process_events(
# Start the main flow
input_event = InternalEvent(name="StartFlow", arguments={"flow_id": "main"})
input_events.insert(0, input_event)

main_flow_state = state.flow_id_states["main"][-1]

# Start all module level flows before main flow
Expand Down Expand Up @@ -512,9 +520,10 @@ async def process_events(

# Advance the state machine
new_event: Optional[Union[dict, Event]] = event

while new_event is not None:
try:
run_to_completion(state, new_event)
run_to_completion(state, new_event, processing_log)
new_event = None
except Exception as e:
log.warning("Colang runtime error!", exc_info=True)
Expand All @@ -535,7 +544,7 @@ async def process_events(
# )

for out_event in state.outgoing_events:
# We also record the out events in the recent history.
processing_log.append(InternalEvent.from_umim_event(out_event))
state.last_events.append(out_event)

# We need to check if we need to run a locally registered action
Expand Down
68 changes: 66 additions & 2 deletions nemoguardrails/colang/v2_x/runtime/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,16 @@ def _context_log(flow_state: FlowState) -> str:
)


def run_to_completion(state: State, external_event: Union[dict, Event]) -> State:
def run_to_completion(
state: State, external_event: Union[dict, Event], processing_log=None
) -> State:
"""
Compute the next state of the flow-driven system.
"""
log.info("[bold violet]-> External Event[/]: %s", external_event)

should_log = processing_log is not None

# Convert to event type
converted_external_event: Event
if isinstance(external_event, dict):
Expand All @@ -291,6 +295,11 @@ def run_to_completion(state: State, external_event: Union[dict, Event]) -> State
while heads_are_merging:
while state.internal_events:
event = state.internal_events.popleft()

# Enrich the event with flow information
if should_log:
_log_event = _enrich_event_with_flow_info(event, state)
processing_log.append(_log_event)
log.info("Process internal event: %s", event)

# Find all active interaction loops
Expand Down Expand Up @@ -2362,8 +2371,18 @@ def _generate_action_event_from_actionable_element(
event = get_event_from_element(state, flow_state, element)
umim_event = _generate_umim_event(state, event)
if isinstance(event, ActionEvent):
event.action_uid = umim_event["action_uid"]
# Create Action with flow_uid
assert isinstance(element.spec, Spec)
action = Action(
name=element.spec.name, # By assuming element.spec.name is the action name
arguments=event.arguments,
flow_uid=flow_state.uid, # We link Action to FlowState
)

# Add to state
state.actions[action.uid] = action

event.action_uid = action.uid
# Assign action event to optional reference
if element.spec.ref and isinstance(element.spec.ref, dict):
ref_name = element.spec.ref["elements"][0]["elements"][0].lstrip("$")
Expand Down Expand Up @@ -2424,3 +2443,48 @@ def _is_child_activated_flow(state: State, flow_state: FlowState) -> bool:
and flow_state.parent_uid is not None
and flow_state.flow_id == state.flow_states[flow_state.parent_uid].flow_id
)


def _enrich_event_with_flow_info(event: Event, state: State) -> Event:
"""Enriches the event with flow and action information for logging purposes."""
event_copy = copy.deepcopy(event)

# to avoid filter validity failure
# TODO: Check if we can simplify this since we have the creation timestamp in the event itself now
if "action" in event_copy.name.lower():
new_event = event
else:
# It will add 'event_created_at' to the event
umim_event = new_event_dict(event_copy.name, **event_copy.arguments)
new_event = InternalEvent.from_umim_event(umim_event)
new_event.arguments["event_created_at"] = event_copy.created_at

# for ActionEvent we need to add flow_id, flow_instance_uid, source_flow_instance_uid, child_flow_uids
flow_id = None
flow_uid = None
parent_uid = None
child_uids = None

if isinstance(new_event, ActionEvent):
action_uid = event.arguments.get("action_uid")
if action_uid and action_uid in state.actions:
action = state.actions[action_uid]
flow_state_uid = action.flow_uid
if flow_state_uid in state.flow_states:
flow_state = state.flow_states[flow_state_uid]
flow_id = flow_state.flow_id
flow_uid = flow_state.uid
parent_uid = flow_state.parent_uid # Already correct
child_uids = flow_state.child_flow_uids

update_dict = {
"flow_id": flow_id,
"flow_instance_uid": flow_uid,
"source_flow_instance_uid": parent_uid,
"child_flow_uids": child_uids,
}

# filter out None values and update new_event.arguments
new_event.arguments.update({k: v for k, v in update_dict.items() if v is not None})

return new_event
2 changes: 2 additions & 0 deletions nemoguardrails/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@
# The raw LLM request that comes from the user.
# This is used in passthrough mode.
raw_llm_request = contextvars.ContextVar("raw_llm_request", default=None)

processing_log_var = contextvars.ContextVar("processing_log", default=None)
10 changes: 8 additions & 2 deletions nemoguardrails/logging/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
from langchain.callbacks.manager import AsyncCallbackManagerForChainRun
from langchain.schema import AgentAction, AgentFinish, BaseMessage, LLMResult

from nemoguardrails.context import explain_info_var, llm_call_info_var, llm_stats_var
from nemoguardrails.context import (
explain_info_var,
llm_call_info_var,
llm_stats_var,
processing_log_var,
)
from nemoguardrails.logging.explain import LLMCallInfo
from nemoguardrails.logging.processing_log import processing_log_var
from nemoguardrails.logging.stats import LLMStats

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -180,7 +184,9 @@ async def on_llm_end(
llm_call_info.completion_tokens = token_usage.get("completion_tokens", 0)

# Finally, we append the LLM call log to the processing log
# TODO: add support for Colang 2.x
processing_log = processing_log_var.get()

if processing_log:
processing_log.append(
{"type": "llm_call_info", "timestamp": time(), "data": llm_call_info}
Expand Down
Loading