diff --git a/nemoguardrails/actions/v2_x/generation.py b/nemoguardrails/actions/v2_x/generation.py index 68114922d..b005be1b5 100644 --- a/nemoguardrails/actions/v2_x/generation.py +++ b/nemoguardrails/actions/v2_x/generation.py @@ -48,11 +48,13 @@ get_element_from_head, get_event_from_element, ) +from nemoguardrails.context import llm_call_info_var from nemoguardrails.embeddings.index import EmbeddingsIndex, IndexItem from nemoguardrails.llm.filters import colang from nemoguardrails.llm.params import llm_params from nemoguardrails.llm.types import Task from nemoguardrails.logging import verbose +from nemoguardrails.logging.explain import LLMCallInfo from nemoguardrails.utils import console, new_uuid log = logging.getLogger(__name__) @@ -279,6 +281,10 @@ async def generate_user_intent( Task.GENERATE_USER_INTENT_FROM_USER_ACTION ) + # Initialize the LLMCallInfo object + llm_call_info_var.set( + LLMCallInfo(task=Task.GENERATE_USER_INTENT_FROM_USER_ACTION.value) + ) # We make this call with lowest temperature to have it as deterministic as possible. with llm_params(llm, temperature=self.config.lowest_temperature): result = await llm_call(llm, prompt, stop=stop) @@ -348,6 +354,13 @@ async def generate_user_intent_and_bot_action( stop = self.llm_task_manager.get_stop_tokens( Task.GENERATE_USER_INTENT_AND_BOT_ACTION_FROM_USER_ACTION ) + # + # Initialize the LLMCallInfo object + llm_call_info_var.set( + LLMCallInfo( + task=Task.GENERATE_USER_INTENT_AND_BOT_ACTION_FROM_USER_ACTION.value + ) + ) # We make this call with lowest temperature to have it as deterministic as possible. with llm_params(llm, temperature=self.config.lowest_temperature): @@ -400,6 +413,23 @@ async def check_if_flow_defined(self, state: "State", flow_id: str) -> bool: """Return True if a flow with the provided flow_id is defined.""" return flow_id in state.flow_configs + # @action(name="GetRailsFlowNameAction", is_system_action=True) + # async def get_input_or_output_rails_name(self, state: "State", flow_id: str) -> str: + # """Return the input or output name of the flow with the provided flow_id.""" + # flow_config = state.flow_configs[flow_id] + # for element in flow_config.elements: + # if ( + # not isinstance(element, dict) + # and hasattr(element, "op") + # and element.op == "send" + # ): + # flow_name = ( + # element.spec.arguments["flow_id"] + # if hasattr(element, "spec") + # else None + # ) + # return flow_name + @action(name="CheckForActiveEventMatchAction", is_system_action=True) async def check_for_active_flow_finished_match( self, state: "State", event_name: str, **arguments: Any @@ -459,6 +489,10 @@ async def generate_flow_from_instructions( }, ) + # Initialize the LLMCallInfo object + llm_call_info_var.set( + LLMCallInfo(task=Task.GENERATE_FLOW_FROM_INSTRUCTIONS.value) + ) # We make this call with temperature 0 to have it as deterministic as possible. with llm_params(llm, temperature=self.config.lowest_temperature): result = await llm_call(llm, prompt) @@ -523,6 +557,7 @@ async def generate_flow_from_name( stop = self.llm_task_manager.get_stop_tokens(Task.GENERATE_FLOW_FROM_NAME) + llm_call_info_var.set(LLMCallInfo(task=Task.GENERATE_FLOW_FROM_NAME.value)) # We make this call with temperature 0 to have it as deterministic as possible. with llm_params(llm, temperature=self.config.lowest_temperature): result = await llm_call(llm, prompt, stop) @@ -581,6 +616,7 @@ async def generate_flow_continuation( }, ) + llm_call_info_var.set(LLMCallInfo(task=Task.GENERATE_FLOW_CONTINUATION.value)) # We make this call with temperature 0 to have it as deterministic as possible. with llm_params(llm, temperature=temperature): result = await llm_call(llm, prompt) @@ -702,6 +738,9 @@ async def generate_value( Task.GENERATE_USER_INTENT_FROM_USER_ACTION ) + llm_call_info_var.set( + LLMCallInfo(task=Task.GENERATE_USER_INTENT_FROM_USER_ACTION.value) + ) with llm_params(llm, temperature=0.1): result = await llm_call(llm, prompt, stop) @@ -804,7 +843,9 @@ async def generate_flow( stop = self.llm_task_manager.get_stop_tokens( Task.GENERATE_FLOW_CONTINUATION_FROM_NLD ) - + llm_call_info_var.set( + LLMCallInfo(task=Task.GENERATE_FLOW_CONTINUATION_FROM_NLD.value) + ) with llm_params(llm, temperature=self.config.lowest_temperature): result = await llm_call(llm, prompt, stop) diff --git a/nemoguardrails/colang/v1_0/runtime/runtime.py b/nemoguardrails/colang/v1_0/runtime/runtime.py index 56fa00efc..ee01e9e74 100644 --- a/nemoguardrails/colang/v1_0/runtime/runtime.py +++ b/nemoguardrails/colang/v1_0/runtime/runtime.py @@ -32,7 +32,7 @@ compute_context, compute_next_steps, ) -from nemoguardrails.logging.processing_log import processing_log_var +from nemoguardrails.context import processing_log_var from nemoguardrails.utils import new_event_dict, new_uuid log = logging.getLogger(__name__) diff --git a/nemoguardrails/colang/v2_x/runtime/flows.py b/nemoguardrails/colang/v2_x/runtime/flows.py index 053f43e65..ef63c9faf 100644 --- a/nemoguardrails/colang/v2_x/runtime/flows.py +++ b/nemoguardrails/colang/v2_x/runtime/flows.py @@ -21,7 +21,7 @@ import time from collections import deque from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import Any, Callable, Deque, Dict, List, Optional, Tuple, Union @@ -85,6 +85,9 @@ class Event: # A list of matching scores from the event sequence triggered by an external event matching_scores: List[float] = field(default_factory=list) + # Timestamp of event creation + created_at: datetime = datetime.now(timezone.utc) + def is_equal(self, other: Event) -> bool: """Compares two events in terms of their name and arguments.""" if isinstance(other, Event): @@ -102,7 +105,7 @@ def __str__(self) -> str: @classmethod def from_umim_event(cls, event: dict) -> Event: """Creates an event from a flat dictionary.""" - new_event = Event(event["type"], {}) + new_event = cls(event["type"], {}) new_event.arguments = dict( [(key, event[key]) for key in event if key not in ["type"]] ) diff --git a/nemoguardrails/colang/v2_x/runtime/runtime.py b/nemoguardrails/colang/v2_x/runtime/runtime.py index 20044b8a6..ca1fc2d2f 100644 --- a/nemoguardrails/colang/v2_x/runtime/runtime.py +++ b/nemoguardrails/colang/v2_x/runtime/runtime.py @@ -42,6 +42,7 @@ initialize_state, run_to_completion, ) +from nemoguardrails.context import processing_log_var from nemoguardrails.rails.llm.config import RailsConfig from nemoguardrails.utils import new_event_dict, new_readable_uuid @@ -352,7 +353,8 @@ def _get_action_finished_event(result: dict, **kwargs) -> Dict[str, Any]: ) async def _get_async_actions_finished_events( - self, main_flow_uid: str + self, + main_flow_uid: str, ) -> Tuple[List[dict], int]: """Helper to return the ActionFinished events for the local async actions that finished. @@ -400,6 +402,7 @@ async def process_events( state: Union[Optional[dict], State] = None, blocking: bool = False, instant_actions: Optional[List[str]] = None, + processing_log: Optional[List[Dict]] = None, ) -> Tuple[List[Dict[str, Any]], State]: """Process a sequence of events in a given state. @@ -432,7 +435,11 @@ async def process_events( output_events = [] input_events: List[Union[dict, InternalEvent]] = events.copy() local_running_actions: List[asyncio.Task[dict]] = [] + if processing_log is None: + processing_log = [] + processing_log_var.set(processing_log) + processing_log.append({}) if state is None or state == {}: state = State( flow_states={}, flow_configs=self.flow_configs, rails_config=self.config @@ -453,6 +460,7 @@ async def process_events( # Start the main flow input_event = InternalEvent(name="StartFlow", arguments={"flow_id": "main"}) input_events.insert(0, input_event) + main_flow_state = state.flow_id_states["main"][-1] # Start all module level flows before main flow @@ -512,9 +520,10 @@ async def process_events( # Advance the state machine new_event: Optional[Union[dict, Event]] = event + while new_event is not None: try: - run_to_completion(state, new_event) + run_to_completion(state, new_event, processing_log) new_event = None except Exception as e: log.warning("Colang runtime error!", exc_info=True) @@ -535,7 +544,7 @@ async def process_events( # ) for out_event in state.outgoing_events: - # We also record the out events in the recent history. + processing_log.append(InternalEvent.from_umim_event(out_event)) state.last_events.append(out_event) # We need to check if we need to run a locally registered action diff --git a/nemoguardrails/colang/v2_x/runtime/statemachine.py b/nemoguardrails/colang/v2_x/runtime/statemachine.py index b24b40aa2..c4e59c771 100644 --- a/nemoguardrails/colang/v2_x/runtime/statemachine.py +++ b/nemoguardrails/colang/v2_x/runtime/statemachine.py @@ -259,12 +259,16 @@ def _context_log(flow_state: FlowState) -> str: ) -def run_to_completion(state: State, external_event: Union[dict, Event]) -> State: +def run_to_completion( + state: State, external_event: Union[dict, Event], processing_log=None +) -> State: """ Compute the next state of the flow-driven system. """ log.info("[bold violet]-> External Event[/]: %s", external_event) + should_log = processing_log is not None + # Convert to event type converted_external_event: Event if isinstance(external_event, dict): @@ -291,6 +295,11 @@ def run_to_completion(state: State, external_event: Union[dict, Event]) -> State while heads_are_merging: while state.internal_events: event = state.internal_events.popleft() + + # Enrich the event with flow information + if should_log: + _log_event = _enrich_event_with_flow_info(event, state) + processing_log.append(_log_event) log.info("Process internal event: %s", event) # Find all active interaction loops @@ -2362,8 +2371,18 @@ def _generate_action_event_from_actionable_element( event = get_event_from_element(state, flow_state, element) umim_event = _generate_umim_event(state, event) if isinstance(event, ActionEvent): - event.action_uid = umim_event["action_uid"] + # Create Action with flow_uid assert isinstance(element.spec, Spec) + action = Action( + name=element.spec.name, # By assuming element.spec.name is the action name + arguments=event.arguments, + flow_uid=flow_state.uid, # We link Action to FlowState + ) + + # Add to state + state.actions[action.uid] = action + + event.action_uid = action.uid # Assign action event to optional reference if element.spec.ref and isinstance(element.spec.ref, dict): ref_name = element.spec.ref["elements"][0]["elements"][0].lstrip("$") @@ -2424,3 +2443,48 @@ def _is_child_activated_flow(state: State, flow_state: FlowState) -> bool: and flow_state.parent_uid is not None and flow_state.flow_id == state.flow_states[flow_state.parent_uid].flow_id ) + + +def _enrich_event_with_flow_info(event: Event, state: State) -> Event: + """Enriches the event with flow and action information for logging purposes.""" + event_copy = copy.deepcopy(event) + + # to avoid filter validity failure + # TODO: Check if we can simplify this since we have the creation timestamp in the event itself now + if "action" in event_copy.name.lower(): + new_event = event + else: + # It will add 'event_created_at' to the event + umim_event = new_event_dict(event_copy.name, **event_copy.arguments) + new_event = InternalEvent.from_umim_event(umim_event) + new_event.arguments["event_created_at"] = event_copy.created_at + + # for ActionEvent we need to add flow_id, flow_instance_uid, source_flow_instance_uid, child_flow_uids + flow_id = None + flow_uid = None + parent_uid = None + child_uids = None + + if isinstance(new_event, ActionEvent): + action_uid = event.arguments.get("action_uid") + if action_uid and action_uid in state.actions: + action = state.actions[action_uid] + flow_state_uid = action.flow_uid + if flow_state_uid in state.flow_states: + flow_state = state.flow_states[flow_state_uid] + flow_id = flow_state.flow_id + flow_uid = flow_state.uid + parent_uid = flow_state.parent_uid # Already correct + child_uids = flow_state.child_flow_uids + + update_dict = { + "flow_id": flow_id, + "flow_instance_uid": flow_uid, + "source_flow_instance_uid": parent_uid, + "child_flow_uids": child_uids, + } + + # filter out None values and update new_event.arguments + new_event.arguments.update({k: v for k, v in update_dict.items() if v is not None}) + + return new_event diff --git a/nemoguardrails/context.py b/nemoguardrails/context.py index 9f1d2fb6a..cf6e61460 100644 --- a/nemoguardrails/context.py +++ b/nemoguardrails/context.py @@ -32,3 +32,5 @@ # The raw LLM request that comes from the user. # This is used in passthrough mode. raw_llm_request = contextvars.ContextVar("raw_llm_request", default=None) + +processing_log_var = contextvars.ContextVar("processing_log", default=None) diff --git a/nemoguardrails/logging/callbacks.py b/nemoguardrails/logging/callbacks.py index 13e009580..1a9dcea04 100644 --- a/nemoguardrails/logging/callbacks.py +++ b/nemoguardrails/logging/callbacks.py @@ -23,9 +23,13 @@ from langchain.callbacks.manager import AsyncCallbackManagerForChainRun from langchain.schema import AgentAction, AgentFinish, BaseMessage, LLMResult -from nemoguardrails.context import explain_info_var, llm_call_info_var, llm_stats_var +from nemoguardrails.context import ( + explain_info_var, + llm_call_info_var, + llm_stats_var, + processing_log_var, +) from nemoguardrails.logging.explain import LLMCallInfo -from nemoguardrails.logging.processing_log import processing_log_var from nemoguardrails.logging.stats import LLMStats log = logging.getLogger(__name__) @@ -180,7 +184,9 @@ async def on_llm_end( llm_call_info.completion_tokens = token_usage.get("completion_tokens", 0) # Finally, we append the LLM call log to the processing log + # TODO: add support for Colang 2.x processing_log = processing_log_var.get() + if processing_log: processing_log.append( {"type": "llm_call_info", "timestamp": time(), "data": llm_call_info} diff --git a/nemoguardrails/logging/processing_log_v2.py b/nemoguardrails/logging/processing_log_v2.py new file mode 100644 index 000000000..de743c058 --- /dev/null +++ b/nemoguardrails/logging/processing_log_v2.py @@ -0,0 +1,1015 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +from typing import Any, Dict, List, Optional + +from dateutil import parser +from pydantic import BaseModel, Field + +from nemoguardrails.colang.v2_x.runtime.flows import ActionEvent +from nemoguardrails.colang.v2_x.runtime.statemachine import InternalEvent +from nemoguardrails.logging.explain import LLMCallInfo +from nemoguardrails.rails.llm.options import ( + ActivatedRail, + ExecutedAction, + GenerationLog, +) + +# Initialize data structures +activated_rails: List[ActivatedRail] = [] +flows: Dict[str, Any] = {} # flow_instance_uid -> flow data +actions: Dict[str, ExecutedAction] = {} # action_uid -> ExecutedAction + +ignored_flows = [ + # System flows that we can ignore + "main", + "wait", + "repeating timer", + "llm response polling", + "llm_response_polling", + "wait", + "logging marked bot intent flows", + "polling llm request response", + "marking user intent flows", + "marking bot intent flows", + "logging marked user intent flows", + # "bot started saying something", + # "tracking bot talking state", + # "bot said something", + # "bot say", + # "_bot_say", + "tracking bot talking state", + "await_flow_by_name", + # Any other flows to ignore +] +generation_flows = [] +# generation_flows = [ +# "llm generate interaction continuation flow", +# # "generating user intent for unhandled user utterance", +# "llm continuation", +# # Add any other generation flows +# ] + + +def get_rail_type(flow_id): + if flow_id in ["input rails", "self check input", "run input rails"]: + return "input" + elif flow_id in ["output rails", "self check output", "run output rails"]: + return "output" + elif flow_id in generation_flows: + return "generation" + elif flow_id not in ignored_flows: + return "dialog" + else: + return None # Ignored flows + + +def parse_timestamp(timestamp_str): + if timestamp_str: + # print("timestamp_str: ", timestamp_str) + try: + dt = parser.isoparse(timestamp_str) + # print("dt: ", dt) + # print("dt.timestamp(): ", dt.timestamp()) + return dt.timestamp() + except Exception as e: + print(f"Failed to parse timestamp: {e}") + return None + + +def compute_generation_log_v2(processing_log: List[dict]) -> GenerationLog: + # Initialize variables + activated_rails: List[ActivatedRail] = [] + flows: Dict[str, Any] = {} # flow_instance_uid -> flow data + actions: Dict[str, ExecutedAction] = {} + action_order: List[str] = [] # Keeps track of action_uids in the order they started + current_action_uid_stack: List[str] = [] + fill_event_created_at(processing_log) + + # Variables to keep track of timestamps + last_timestamp = None + input_rails_started_at = None + input_rails_finished_at = None + output_rails_started_at = None + output_rails_finished_at = None + + # Process events + for event in processing_log: + if not isinstance(event, dict) and "TimerBotAction" in event.name: + continue + # Process InternalEvent + if isinstance(event, InternalEvent): + event_name = event.name + event_arguments = event.arguments + flow_instance_uid = event_arguments.get("flow_instance_uid") + flow_id = event_arguments.get("flow_id") + parent_flow_instance_uid = event_arguments.get("parent_flow_instance_uid") + + timestamp = parse_timestamp(event_arguments.get("event_created_at")) + if timestamp is not None: + if last_timestamp is None or timestamp > last_timestamp: + last_timestamp = timestamp + + if event_name == "FlowStarted": + flow_data = { + "flow_id": flow_id, + "parent_flow_instance_uid": parent_flow_instance_uid, + "started_at": timestamp, + "flow": None, + } + flows[flow_instance_uid] = flow_data + + rail_type = get_rail_type(flow_id) + if rail_type: + activated_rail = ActivatedRail( + type=rail_type, + name=flow_id, + started_at=timestamp, + ) + flow_data["flow"] = activated_rail + activated_rails.append(activated_rail) + + # Update input/output rails started_at + if rail_type == "input" and input_rails_started_at is None: + input_rails_started_at = timestamp + if rail_type == "output" and output_rails_started_at is None: + output_rails_started_at = timestamp + + elif event_name == "FlowFinished": + if flow_instance_uid in flows: + flow_data = flows[flow_instance_uid] + flow = flow_data["flow"] + if flow is not None: + flow.finished_at = timestamp + if flow.started_at and flow.finished_at: + flow.duration = flow.finished_at - flow.started_at + + # Update input/output rails finished_at + rail_type = flow.type + if rail_type == "input": + input_rails_finished_at = timestamp + if rail_type == "output": + output_rails_finished_at = timestamp + + del flows[flow_instance_uid] + + elif isinstance(event, ActionEvent): + event_name = event.name + event_arguments = event.arguments + action_uid = event.action_uid + + timestamp = parse_timestamp(event_arguments.get("event_created_at")) + if timestamp is not None: + if last_timestamp is None or timestamp > last_timestamp: + last_timestamp = timestamp + + if event_name.startswith("Start"): + action_name = event_name[len("Start") :] + action_params = event_arguments + executed_action = ExecutedAction( + action_name=action_name, + action_params=action_params, + started_at=timestamp, + ) + actions[action_uid] = executed_action + action_order.append(action_uid) + + # Find the parent flow + parent_flow_instance_uid = event_arguments.get( + "parent_flow_instance_uid" + ) + flow_found = False + current_flow_instance_uid = parent_flow_instance_uid + visited_flow_uids = set() + + while ( + current_flow_instance_uid + and current_flow_instance_uid not in visited_flow_uids + ): + visited_flow_uids.add(current_flow_instance_uid) + if current_flow_instance_uid in flows: + flow_data = flows[current_flow_instance_uid] + flow = flow_data["flow"] + if flow is not None: + flow.executed_actions.append(executed_action) + flow_found = True + break + else: + next_parent = flow_data.get("parent_flow_instance_uid") + if next_parent == current_flow_instance_uid: + # Avoid infinite loop + break + current_flow_instance_uid = next_parent + else: + current_flow_instance_uid = None # Cannot find the parent flow + + if not flow_found: + # Could not find an ActivatedRail in the parent chain + pass # Or handle accordingly + + elif event_name.endswith("ActionFinished"): + if action_uid in actions: + executed_action = actions[action_uid] + executed_action.finished_at = timestamp + if executed_action.started_at and executed_action.finished_at: + executed_action.duration = ( + executed_action.finished_at - executed_action.started_at + ) + executed_action.return_value = event_arguments.get("return_value") + # Do not remove the action_uid from action_order here + + else: + # Handle llm_call_info events + llm_call_info = None + if hasattr(event, "type") and event.type == "llm_call_info": + llm_call_info = event.data + elif isinstance(event, dict) and event.get("type") == "llm_call_info": + llm_call_info = event["data"] + + if llm_call_info: + # Associate llm_call_info with the most recent action that has not finished + for action_uid in reversed(action_order): + executed_action = actions.get(action_uid) + if executed_action and not executed_action.finished_at: + executed_action.llm_calls.append(llm_call_info) + break + else: + # No action found to associate with; handle accordingly + pass + + # Now assign the activated rails to the GenerationLog object + generation_log = GenerationLog(activated_rails=activated_rails) + + # Start integrating the old logic + + # If we have activated rails, get the last one + if activated_rails: + activated_rail = activated_rails[-1] + if activated_rail.finished_at is None: + activated_rail.finished_at = last_timestamp + activated_rail.duration = ( + activated_rail.finished_at - activated_rail.started_at + ) + + if activated_rail.type in ["input", "output"]: + activated_rail.stop = True + activated_rail.decisions.append("stop") + + # If we have input rails, we also record the general stats + if input_rails_started_at: + # If we don't have a timestamp for when the input rails have finished, + # we record the last timestamp. + if input_rails_finished_at is None: + input_rails_finished_at = last_timestamp + + generation_log.stats.input_rails_duration = ( + input_rails_finished_at - input_rails_started_at + ) + + # For all the dialog/generation rails, we set the finished time and the duration based on + # the rail right after. + for i in range(len(generation_log.activated_rails) - 1): + activated_rail = generation_log.activated_rails[i] + + if ( + activated_rail.type in ["dialog", "generation"] + and activated_rail.finished_at is None + ): + next_rail = generation_log.activated_rails[i + 1] + activated_rail.finished_at = next_rail.started_at + activated_rail.duration = ( + activated_rail.finished_at - activated_rail.started_at + ) + + # If we have output rails, we also record the general stats + if output_rails_started_at: + # If we don't have a timestamp for when the output rails have finished, + # we record the last timestamp. + if output_rails_finished_at is None: + output_rails_finished_at = last_timestamp + + generation_log.stats.output_rails_duration = ( + output_rails_finished_at - output_rails_started_at + ) + + # We also need to compute the stats for dialog rails and generation. + # And the stats for the LLM calls. + for activated_rail in generation_log.activated_rails: + # TODO: figure out a cleaner way to do this. + # the generation should not be inside the `generate user intent` + # If we have a dialog rail for `generate user intent` and it has an + # LLM call with the task `general`, then we consider this as a generation rail. + if activated_rail.name == "generate user intent": + if len(activated_rail.executed_actions) == 1: + executed_action = activated_rail.executed_actions[0] + + if ( + len(executed_action.llm_calls) == 1 + and executed_action.llm_calls[0].task == "general" + ): + activated_rail.type = "generation" + + if generation_log.stats.dialog_rails_duration is None: + generation_log.stats.dialog_rails_duration = 0.0 + + if generation_log.stats.generation_rails_duration is None: + generation_log.stats.generation_rails_duration = 0.0 + + # Ensure llm_calls_count is initialized + if generation_log.stats.llm_calls_count is None: + generation_log.stats.llm_calls_count = 0 + + # Ensure llm_calls_duration is initialized + if generation_log.stats.llm_calls_duration is None: + generation_log.stats.llm_calls_duration = 0.0 + + # Ensure llm_calls_total_prompt_tokens is initialized + if generation_log.stats.llm_calls_total_prompt_tokens is None: + generation_log.stats.llm_calls_total_prompt_tokens = 0 + + # Ensure llm_calls_total_completion_tokens is initialized + if generation_log.stats.llm_calls_total_completion_tokens is None: + generation_log.stats.llm_calls_total_completion_tokens = 0 + + # Ensure llm_calls_total_tokens is initialized + if generation_log.stats.llm_calls_total_tokens is None: + generation_log.stats.llm_calls_total_tokens = 0 + if activated_rail.type == "dialog" and activated_rail.duration: + generation_log.stats.dialog_rails_duration += activated_rail.duration + + if activated_rail.type == "generation" and activated_rail.duration: + generation_log.stats.generation_rails_duration += activated_rail.duration + + for executed_action in activated_rail.executed_actions: + print("we are here") + for llm_call in executed_action.llm_calls: + generation_log.stats.llm_calls_count += 1 + generation_log.stats.llm_calls_duration += llm_call.duration or 0.0 + generation_log.stats.llm_calls_total_prompt_tokens += ( + llm_call.prompt_tokens or 0 + ) + generation_log.stats.llm_calls_total_completion_tokens += ( + llm_call.completion_tokens or 0 + ) + generation_log.stats.llm_calls_total_tokens += ( + llm_call.total_tokens or 0 + ) + + print(generation_log.stats) + + # Compute total duration + if last_timestamp is not None and processing_log: + first_event_timestamp = None + for event in processing_log: + timestamp = None + if isinstance(event, dict): + timestamp = event.get("timestamp") + else: + event_arguments = event.arguments + timestamp = parse_timestamp(event_arguments.get("event_created_at")) + if timestamp is not None: + first_event_timestamp = timestamp + break + if first_event_timestamp is not None: + generation_log.stats.total_duration = last_timestamp - first_event_timestamp + + print("Final generation_log.stats:", generation_log.stats) + return generation_log + + +# def compute_generation_log_v2(processing_log: List[dict]) -> GenerationLog: +# # Initialize variables +# activated_rails: List[ActivatedRail] = [] +# flows: Dict[str, Any] = {} # flow_instance_uid -> flow data +# actions: Dict[str, ExecutedAction] = {} +# action_order: List[str] = [] # Keeps track of action_uids in the order they started +# current_action_uid_stack: List[str] = [] +# fill_event_created_at(processing_log) +# +# for event in processing_log: +# if not isinstance(event, dict) and "TimerBotAction" in event.name: +# continue +# # Process InternalEvent +# if isinstance(event, InternalEvent): +# event_name = event.name +# event_arguments = event.arguments +# flow_instance_uid = event_arguments.get("flow_instance_uid") +# flow_id = event_arguments.get("flow_id") +# parent_flow_instance_uid = event_arguments.get("parent_flow_instance_uid") +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name == "FlowStarted": +# flow_data = { +# "flow_id": flow_id, +# "parent_flow_instance_uid": parent_flow_instance_uid, +# "started_at": timestamp, +# "flow": None, +# } +# flows[flow_instance_uid] = flow_data +# +# rail_type = get_rail_type(flow_id) +# if rail_type: +# activated_rail = ActivatedRail( +# type=rail_type, +# name=flow_id, +# started_at=timestamp, +# ) +# flow_data["flow"] = activated_rail +# activated_rails.append(activated_rail) +# +# elif event_name == "FlowFinished": +# if flow_instance_uid in flows: +# flow_data = flows[flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.finished_at = timestamp +# if flow.started_at and flow.finished_at: +# flow.duration = flow.finished_at - flow.started_at +# +# del flows[flow_instance_uid] +# +# elif isinstance(event, ActionEvent): +# event_name = event.name +# event_arguments = event.arguments +# action_uid = event.action_uid +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name.startswith("Start"): +# action_name = event_name[len("Start") :] +# action_params = event_arguments +# executed_action = ExecutedAction( +# action_name=action_name, +# action_params=action_params, +# started_at=timestamp, +# ) +# actions[action_uid] = executed_action +# action_order.append(action_uid) +# +# # Find the parent flow +# parent_flow_instance_uid = event_arguments.get( +# "parent_flow_instance_uid" +# ) +# flow_found = False +# current_flow_instance_uid = parent_flow_instance_uid +# visited_flow_uids = set() +# +# while ( +# current_flow_instance_uid +# and current_flow_instance_uid not in visited_flow_uids +# ): +# visited_flow_uids.add(current_flow_instance_uid) +# if current_flow_instance_uid in flows: +# flow_data = flows[current_flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.executed_actions.append(executed_action) +# flow_found = True +# break +# else: +# next_parent = flow_data.get("parent_flow_instance_uid") +# if next_parent == current_flow_instance_uid: +# # Avoid infinite loop +# break +# current_flow_instance_uid = next_parent +# else: +# current_flow_instance_uid = None # Cannot find the parent flow +# +# if not flow_found: +# # Could not find an ActivatedRail in the parent chain +# pass # Or handle accordingly +# +# elif event_name.endswith("ActionFinished"): +# if action_uid in actions: +# executed_action = actions[action_uid] +# executed_action.finished_at = timestamp +# if executed_action.started_at and executed_action.finished_at: +# executed_action.duration = ( +# executed_action.finished_at - executed_action.started_at +# ) +# executed_action.return_value = event_arguments.get("return_value") +# # Do not remove the action_uid from action_order here +# +# else: +# # Handle llm_call_info events +# llm_call_info = None +# if hasattr(event, "type") and event.type == "llm_call_info": +# llm_call_info = event.data +# elif isinstance(event, dict) and event.get("type") == "llm_call_info": +# llm_call_info = event["data"] +# +# if llm_call_info: +# # Associate llm_call_info with the most recent action that has not finished +# for action_uid in reversed(action_order): +# executed_action = actions.get(action_uid) +# if executed_action and not executed_action.finished_at: +# print( +# f"Associating llm_call_info {llm_call_info.task} with action: {executed_action}" +# ) +# executed_action.llm_calls.append(llm_call_info) +# break +# else: +# # No action found to associate with; handle accordingly +# print( +# f"No action found to associate with llm_call_info: {llm_call_info}" +# ) +# print(f"existing actions: {executed_action}") +# +# pass +# +# # Now assign the activated rails to the GenerationLog object +# generation_log = GenerationLog(activated_rails=activated_rails) +# +# return generation_log + + +# def compute_generation_log_v2(processing_log: List[dict]) -> GenerationLog: +# # Initialize variables +# activated_rails: List[ActivatedRail] = [] +# flows: Dict[str, Any] = {} # flow_instance_uid -> flow data +# actions: Dict[str, ExecutedAction] = {} +# current_action_uid_stack: List[str] = [] +# +# # Now, process the events +# for event in processing_log: +# # Process InternalEvent +# if isinstance(event, InternalEvent): +# event_name = event.name +# event_arguments = event.arguments +# flow_instance_uid = event_arguments.get("flow_instance_uid") +# flow_id = event_arguments.get("flow_id") +# parent_flow_instance_uid = event_arguments.get("parent_flow_instance_uid") +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name == "FlowStarted": +# flow_data = { +# "flow_id": flow_id, +# "parent_flow_instance_uid": parent_flow_instance_uid, +# "started_at": timestamp, +# "flow": None, +# } +# flows[flow_instance_uid] = flow_data +# +# rail_type = get_rail_type(flow_id) +# if rail_type: +# activated_rail = ActivatedRail( +# type=rail_type, +# name=flow_id, +# started_at=timestamp, +# ) +# flow_data["flow"] = activated_rail +# activated_rails.append(activated_rail) +# +# elif event_name == "FlowFinished": +# if flow_instance_uid in flows: +# flow_data = flows[flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.finished_at = timestamp +# if flow.started_at and flow.finished_at: +# flow.duration = flow.finished_at - flow.started_at +# +# del flows[flow_instance_uid] +# +# elif isinstance(event, ActionEvent): +# event_name = event.name +# event_arguments = event.arguments +# action_uid = event.action_uid +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name.startswith("Start"): +# action_name = event_name[len("Start") :] +# action_params = event_arguments +# executed_action = ExecutedAction( +# action_name=action_name, +# action_params=action_params, +# started_at=timestamp, +# ) +# actions[action_uid] = executed_action +# current_action_uid_stack.append(action_uid) +# +# # Find the parent flow +# parent_flow_instance_uid = event_arguments.get( +# "parent_flow_instance_uid" +# ) +# flow_found = False +# current_flow_instance_uid = parent_flow_instance_uid +# visited_flow_uids = set() +# +# while ( +# current_flow_instance_uid +# and current_flow_instance_uid not in visited_flow_uids +# ): +# visited_flow_uids.add(current_flow_instance_uid) +# if current_flow_instance_uid in flows: +# flow_data = flows[current_flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.executed_actions.append(executed_action) +# flow_found = True +# break +# else: +# next_parent = flow_data.get("parent_flow_instance_uid") +# if next_parent == current_flow_instance_uid: +# # Avoid infinite loop +# break +# current_flow_instance_uid = next_parent +# else: +# current_flow_instance_uid = None # Cannot find the parent flow +# +# if not flow_found: +# # Could not find an ActivatedRail in the parent chain +# pass # Or handle accordingly +# +# elif event_name.endswith("ActionFinished"): +# if action_uid in actions: +# executed_action = actions[action_uid] +# executed_action.finished_at = timestamp +# if executed_action.started_at and executed_action.finished_at: +# executed_action.duration = ( +# executed_action.finished_at - executed_action.started_at +# ) +# executed_action.return_value = event_arguments.get("return_value") +# if ( +# current_action_uid_stack +# and current_action_uid_stack[-1] == action_uid +# ): +# current_action_uid_stack.pop() +# +# elif isinstance(event, dict) and event.get("type") == "llm_call_info": +# # Associate llm_call_info with the current action +# if current_action_uid_stack: +# current_action_uid = current_action_uid_stack[-1] +# if current_action_uid in actions: +# executed_action = actions[current_action_uid] +# llm_call_info = event["data"] +# executed_action.llm_calls.append(llm_call_info) +# else: +# # No current action to associate with, can log or handle accordingly +# pass +# +# # Now assign the activated rails to the GenerationLog object +# generation_log = GenerationLog(activated_rails=activated_rails) +# +# return generation_log + + +# def compute_generation_log_v2(processing_log: List[dict]) -> GenerationLog: +# # Initialize variables +# activated_rails: List[ActivatedRail] = [] +# flows: Dict[str, Any] = {} # flow_instance_uid -> flow data +# actions: Dict[str, ExecutedAction] = {} +# current_action_uid_stack: List[str] = [] +# +# # Define ignored_flows and generation_flows +# ignored_flows = [ +# "main", +# "wait", +# "repeating timer", +# "polling llm request response", +# "bot started saying something", +# "tracking bot talking state", +# "bot said something", +# "bot say", +# "_bot_say", +# "await_flow_by_name", +# # Add any other flows to ignore +# ] +# +# generation_flows = [ +# "llm generate interaction continuation flow", +# "generating user intent for unhandled user utterance", +# "llm continuation", +# # Add any other generation flows +# ] +# +# def get_rail_type(flow_id): +# if flow_id in ["input rails", "self check input", "run input rails"]: +# return "input" +# elif flow_id in ["output rails", "self check output", "run output rails"]: +# return "output" +# elif flow_id in generation_flows: +# return "generation" +# elif flow_id not in ignored_flows: +# return "dialog" +# else: +# return None # Ignored flows +# +# def parse_timestamp(timestamp_str): +# if timestamp_str: +# try: +# dt = parser.isoparse(timestamp_str) +# return dt.timestamp() +# except Exception: +# pass +# return None +# +# # Now, process the events +# for event in processing_log: +# # Process InternalEvent +# if isinstance(event, InternalEvent): +# event_name = event.name +# event_arguments = event.arguments +# flow_instance_uid = event_arguments.get("flow_instance_uid") +# flow_id = event_arguments.get("flow_id") +# parent_flow_instance_uid = event_arguments.get("parent_flow_instance_uid") +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name == "FlowStarted": +# flow_data = { +# "flow_id": flow_id, +# "parent_flow_instance_uid": parent_flow_instance_uid, +# "started_at": timestamp, +# "flow": None, +# } +# flows[flow_instance_uid] = flow_data +# +# rail_type = get_rail_type(flow_id) +# if rail_type: +# activated_rail = ActivatedRail( +# type=rail_type, +# name=flow_id, +# started_at=timestamp, +# ) +# flow_data["flow"] = activated_rail +# activated_rails.append(activated_rail) +# +# elif event_name == "FlowFinished": +# if flow_instance_uid in flows: +# flow_data = flows[flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.finished_at = timestamp +# if flow.started_at and flow.finished_at: +# flow.duration = flow.finished_at - flow.started_at +# +# del flows[flow_instance_uid] +# +# elif isinstance(event, ActionEvent): +# event_name = event.name +# event_arguments = event.arguments +# action_uid = event.action_uid +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name.startswith("Start"): +# action_name = event_name[len("Start") :] +# action_params = event_arguments +# executed_action = ExecutedAction( +# action_name=action_name, +# action_params=action_params, +# started_at=timestamp, +# ) +# actions[action_uid] = executed_action +# current_action_uid_stack.append(action_uid) +# +# # Find the parent flow +# parent_flow_instance_uid = event_arguments.get( +# "parent_flow_instance_uid" +# ) +# flow_found = False +# current_flow_instance_uid = parent_flow_instance_uid +# visited_flow_uids = set() +# +# while ( +# current_flow_instance_uid +# and current_flow_instance_uid not in visited_flow_uids +# ): +# visited_flow_uids.add(current_flow_instance_uid) +# if current_flow_instance_uid in flows: +# flow_data = flows[current_flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.executed_actions.append(executed_action) +# flow_found = True +# break +# else: +# next_parent = flow_data.get("parent_flow_instance_uid") +# if next_parent == current_flow_instance_uid: +# # Avoid infinite loop +# break +# current_flow_instance_uid = next_parent +# else: +# break # Cannot find the parent flow +# +# if not flow_found: +# # Could not find an ActivatedRail in the parent chain +# pass # Or handle accordingly +# +# elif event_name.endswith("ActionFinished"): +# if action_uid in actions: +# executed_action = actions[action_uid] +# executed_action.finished_at = timestamp +# if executed_action.started_at and executed_action.finished_at: +# executed_action.duration = ( +# executed_action.finished_at - executed_action.started_at +# ) +# executed_action.return_value = event_arguments.get("return_value") +# if ( +# current_action_uid_stack +# and current_action_uid_stack[-1] == action_uid +# ): +# current_action_uid_stack.pop() +# +# elif isinstance(event, dict) and event.get("type") == "llm_call_info": +# # Associate llm_call_info with the current action +# if current_action_uid_stack: +# current_action_uid = current_action_uid_stack[-1] +# if current_action_uid in actions: +# executed_action = actions[current_action_uid] +# llm_call_info = event["data"] +# executed_action.llm_calls.append(llm_call_info) +# else: +# # No current action to associate with, can log or handle accordingly +# pass +# +# # Now assign the activated rails to the GenerationLog object +# generation_log = GenerationLog(activated_rails=activated_rails) +# +# return generation_log + + +# def compute_generation_log_v2(processing_log: List[dict]) -> GenerationLog: +# for event in processing_log: +# # Process InternalEvent +# if isinstance(event, InternalEvent): +# event_name = event.name +# event_arguments = event.arguments +# flow_instance_uid = event_arguments.get("flow_instance_uid") +# flow_id = event_arguments.get("flow_id") +# parent_flow_instance_uid = event_arguments.get("parent_flow_instance_uid") +# +# # print("ACTIVATED RAIL") +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# # if not timestamp: +# # print("No timestamp") +# # print(event) +# # print("***" * 100) +# +# # print(f"timestamp: {timestamp}") +# +# if event_name == "FlowStarted": +# flow_data = { +# "flow_id": flow_id, +# "parent_flow_instance_uid": parent_flow_instance_uid, +# "started_at": timestamp, +# "flow": None, +# } +# flows[flow_instance_uid] = flow_data +# +# rail_type = get_rail_type(flow_id) +# if rail_type: +# activated_rail = ActivatedRail( +# type=rail_type, +# name=flow_id, +# started_at=timestamp, +# ) +# # print(f"timestamp: {timestamp}") +# # print(f"activated rail timestamp: {activated_rail.started_at}") +# # print("===" * 10) +# flow_data["flow"] = activated_rail +# activated_rails.append(activated_rail) +# +# elif event_name == "FlowFinished": +# if flow_instance_uid in flows: +# flow_data = flows[flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.finished_at = timestamp +# if flow.started_at and flow.finished_at: +# flow.duration = flow.finished_at - flow.started_at +# +# del flows[flow_instance_uid] +# +# elif isinstance(event, ActionEvent): +# event_name = event.name +# event_arguments = event.arguments +# action_uid = event.action_uid +# +# timestamp = parse_timestamp(event_arguments.get("event_created_at")) +# +# if event_name.startswith("Start"): +# action_name = event_name[len("Start") :] +# action_params = event_arguments +# executed_action = ExecutedAction( +# action_name=action_name, +# action_params=action_params, +# started_at=timestamp, +# ) +# actions[action_uid] = executed_action +# +# # Find the parent flow +# parent_flow_instance_uid = event_arguments.get( +# "parent_flow_instance_uid" +# ) +# flow_found = False +# current_flow_instance_uid = parent_flow_instance_uid +# visited_flow_uids = set() +# +# while ( +# current_flow_instance_uid +# and current_flow_instance_uid not in visited_flow_uids +# ): +# visited_flow_uids.add(current_flow_instance_uid) +# if current_flow_instance_uid in flows: +# flow_data = flows[current_flow_instance_uid] +# flow = flow_data["flow"] +# if flow is not None: +# flow.executed_actions.append(executed_action) +# flow_found = True +# break +# else: +# next_parent = flow_data.get("parent_flow_instance_uid") +# if next_parent == current_flow_instance_uid: +# # Avoid infinite loop +# break +# current_flow_instance_uid = next_parent +# else: +# break # Cannot find the parent flow +# +# if not flow_found: +# # Could not find an ActivatedRail in the parent chain +# pass # Or handle accordingly +# +# elif event_name.endswith("ActionFinished"): +# if action_uid in actions: +# executed_action = actions[action_uid] +# executed_action.finished_at = timestamp +# if executed_action.started_at and executed_action.finished_at: +# executed_action.duration = ( +# executed_action.finished_at - executed_action.started_at +# ) +# executed_action.return_value = event_arguments.get("return_value") +# +# # we can now assign it to the GenerationLog object +# generation_log = GenerationLog(activated_rails=activated_rails) +# +# # For demonstration purposes, let's print the activated rails +# # for rail in generation_log.activated_rails: +# # print( +# # f"Rail type: {rail.type}, name: {rail.name}, started_at: {rail.started_at}, finished_at: {rail.finished_at}, duration: {rail.duration}" +# # ) +# # for action in rail.executed_actions: +# # print( +# # f" Action: {action.action_name}, started_at: {action.started_at}, finished_at: {action.finished_at}, duration: {action.duration}" +# # ) +# return generation_log + + +def fill_event_created_at(events): + """ + Processes a list of events and fills in the 'event_created_at' for 'FlowStarted' and 'FlowFinished' events + that are missing it, by using the 'event_created_at' from their corresponding 'UnhandledEvent's. + + Args: + events (list): A list of event objects. + """ + # Create mappings from flow_instance_uid to event_created_at for UnhandledEvent where event == 'FlowStarted' or 'FlowFinished' + unhandled_event_times = {"FlowStarted": {}, "FlowFinished": {}} + + for event in events: + if isinstance(event, InternalEvent): + if event.name == "UnhandledEvent": + event_arguments = event.arguments + unhandled_event_name = event_arguments.get("event") + if unhandled_event_name in ("FlowStarted", "FlowFinished"): + flow_instance_uid = event_arguments.get("flow_instance_uid") + event_created_at = event_arguments.get("event_created_at") + if flow_instance_uid and event_created_at: + unhandled_event_times[unhandled_event_name][ + flow_instance_uid + ] = event_created_at + + # Now, fill in 'event_created_at' for 'FlowStarted' and 'FlowFinished' events missing it + for event in events: + if isinstance(event, InternalEvent): + if event.name in ("FlowStarted", "FlowFinished"): + event_arguments = event.arguments + event_created_at = event_arguments.get("event_created_at") + if not event_created_at: + flow_instance_uid = event_arguments.get("flow_instance_uid") + if ( + flow_instance_uid + and flow_instance_uid in unhandled_event_times[event.name] + ): + # Fill in the missing 'event_created_at' from the UnhandledEvent + event.arguments["event_created_at"] = unhandled_event_times[ + event.name + ][flow_instance_uid] diff --git a/nemoguardrails/rails/llm/llmrails.py b/nemoguardrails/rails/llm/llmrails.py index 60fd88600..760b54da4 100644 --- a/nemoguardrails/rails/llm/llmrails.py +++ b/nemoguardrails/rails/llm/llmrails.py @@ -54,6 +54,7 @@ from nemoguardrails.llm.providers import get_llm_provider, get_llm_provider_names from nemoguardrails.logging.explain import ExplainInfo from nemoguardrails.logging.processing_log import compute_generation_log +from nemoguardrails.logging.processing_log_v2 import compute_generation_log_v2 from nemoguardrails.logging.stats import LLMStats from nemoguardrails.logging.verbose import set_verbose from nemoguardrails.patch_asyncio import check_sync_call_from_async_loop @@ -696,7 +697,11 @@ async def generate_async( # In generation mode, the processing is always blocking, i.e., it waits for # all local actions (sync and async). new_events, output_state = await runtime.process_events( - events, state=state, instant_actions=instant_actions, blocking=True + events, + state=state, + instant_actions=instant_actions, + blocking=True, + processing_log=processing_log, ) # We also encode the output state as a JSON output_state = {"state": state_to_json(output_state), "version": "2.x"} @@ -861,21 +866,41 @@ async def generate_async( raise ValueError( "The `output_vars` option is not supported for Colang 2.0 configurations." ) + _log = compute_generation_log_v2(processing_log) + if options.log.activated_rails or options.log.llm_calls: + res.log = GenerationLog() - if ( - options.log.activated_rails - or options.log.llm_calls - or options.log.internal_events - or options.log.colang_history - ): - raise ValueError( - "The `log` option is not supported for Colang 2.0 configurations." - ) + # We always include the stats + res.log.stats = _log.stats + # + if options.log.activated_rails: + res.log.activated_rails = _log.activated_rails + + if options.log.llm_calls: + res.log.llm_calls = [] + for activated_rail in _log.activated_rails: + for executed_action in activated_rail.executed_actions: + res.log.llm_calls.extend(executed_action.llm_calls) + + # Include internal events if requested + if options.log.internal_events: + if res.log is None: + res.log = GenerationLog() + res.log.internal_events = new_events + # TODO: figure out what is the issue here + if options.log.colang_history: + if res.log is None: + res.log = GenerationLog() + res.log.colang_history = get_colang_history(events=events) if options.llm_output: - raise ValueError( - "The `llm_output` option is not supported for Colang 2.0 configurations." - ) + # Currently, we include the output from the generation LLM calls. + for activated_rail in _log.activated_rails: + # TODO: fix or figure out generation flows in processing_log_v2 + if activated_rail.type in ["generation", "dialog"]: + for executed_action in activated_rail.executed_actions: + for llm_call in executed_action.llm_calls: + res.llm_output = llm_call.raw_response # Include the state if state is not None: @@ -1019,12 +1044,17 @@ async def process_events_async( llm_stats = LLMStats() llm_stats_var.set(llm_stats) + processing_log = [] + # Compute the new events. # We need to protect 'process_events' to be called only once at a time # TODO (cschueller): Why is this? async with process_events_semaphore: output_events, output_state = await self.runtime.process_events( - events, state, blocking + events=events, + state=state, + blocking=blocking, + processing_log=processing_log, ) took = time.time() - t0 diff --git a/tests/v2_x/test_generation_log_option.py b/tests/v2_x/test_generation_log_option.py new file mode 100644 index 000000000..68e62c410 --- /dev/null +++ b/tests/v2_x/test_generation_log_option.py @@ -0,0 +1,113 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from nemoguardrails import RailsConfig +from nemoguardrails.rails.llm.options import GenerationResponse +from tests.utils import TestChat + +colang_content = ''' + import core + import llm + + import guardrails + import nemoguardrails.library + + flow input rails $input_text + dummy input rail $input_text + + flow output rails $output_text + dummy output rail $output_text + + flow main + activate llm continuation + activate greeting + activate other reactions + + flow dummy input rail $input_text + if "dummy" in $input_text + bot refuse to respond + abort + + flow dummy output rail $output_text + if "dummy" in $output_text + bot refuse to respond + abort + + flow greeting + user expressed greeting + bot say "Hello world! It is a dummy message." + + flow other reactions + user expressed to be bored + bot say "No problem!" + + flow user expressed greeting + """"User expressed greeting in any way or form.""" + user said "hi" + + flow user expressed to be bored + """"User expressed to be bored.""" + user said "This is boring" + ''' + +yaml_content = """ +colang_version: "2.x" +models: + - type: main + engine: openai + model: gpt-3.5-turbo-instruct + + """ + + +def test_triggered_rails_info_2(): + config = RailsConfig.from_content(colang_content, yaml_content) + + chat = TestChat( + config, + llm_completions=[ + "user expressed greeting", + ], + ) + + res: GenerationResponse = chat.app.generate( + "Hello!", + options={ + "log": { + "activated_rails": True, + "llm_calls": True, + "internal_events": True, + } + }, + ) + + assert res.response == "I'm sorry, I can't respond to that." + + assert res.log, "GenerationLog is not present although it is set in options" + assert ( + res.log.activated_rails + ), "Activated Rails are not present although it is set in options" + + assert len(res.log.activated_rails) == 36 + assert len(res.log.llm_calls) == 0 + assert len(res.log.internal_events) > 0 + + input_rails = [rail for rail in res.log.activated_rails if rail.type == "input"] + output_rails = [rail for rail in res.log.activated_rails if rail.type == "output"] + + assert len(input_rails) > 0, "No input rails found" + assert len(output_rails) > 0, "No output rails found"