Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

langgraph[patch]: format messages in state #2199

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 124 additions & 2 deletions libs/langgraph/langgraph/graph/message.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import uuid

Check notice on line 1 in libs/langgraph/langgraph/graph/message.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 48.2 ms +- 0.7 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 43.9 ms +- 1.1 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 76.9 ms +- 1.9 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 84.9 ms +- 0.9 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 469 ms +- 9 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 432 ms +- 10 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 815 ms +- 60 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 835 ms +- 16 ms ......................................... react_agent_10x: Mean +- std dev: 31.0 ms +- 2.9 ms ......................................... react_agent_10x_sync: Mean +- std dev: 22.4 ms +- 1.4 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 47.0 ms +- 3.2 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 37.1 ms +- 3.0 ms ......................................... react_agent_100x: Mean +- std dev: 320 ms +- 6 ms ......................................... react_agent_100x_sync: Mean +- std dev: 261 ms +- 12 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 907 ms +- 8 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 813 ms +- 7 ms ......................................... wide_state_25x300: Mean +- std dev: 18.4 ms +- 0.5 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 10.9 ms +- 0.1 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 273 ms +- 4 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 270 ms +- 13 ms ......................................... wide_state_15x600: Mean +- std dev: 21.3 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 12.6 ms +- 0.1 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 473 ms +- 8 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 467 ms +- 14 ms ......................................... wide_state_9x1200: Mean +- std dev: 21.3 ms +- 0.4 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 12.6 ms +- 0.1 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 304 ms +- 2 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 301 ms +- 13 ms

Check notice on line 1 in libs/langgraph/langgraph/graph/message.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +====================================+=========+=======================+ | fanout_to_subgraph_10x_sync | 46.3 ms | 43.9 ms: 1.05x faster | +------------------------------------+---------+-----------------------+ | react_agent_100x | 328 ms | 320 ms: 1.03x faster | +------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint | 916 ms | 907 ms: 1.01x faster | +------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint_sync | 821 ms | 813 ms: 1.01x faster | +------------------------------------+---------+-----------------------+ | wide_state_9x1200_checkpoint | 307 ms | 304 ms: 1.01x faster | +------------------------------------+---------+-----------------------+ | wide_state_15x600 | 21.1 ms | 21.3 ms: 1.01x slower | +------------------------------------+---------+-----------------------+ | wide_state_25x300 | 18.2 ms | 18.4 ms: 1.01x slower | +------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_sync | 427 ms | 432 ms: 1.01x slower | +------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint | 793 ms | 815 ms: 1.03x slower | +------------------------------------+---------+-----------------------+ | wide_state_25x300_checkpoint_sync | 262 ms | 270 ms: 1.03x slower | +------------------------------------+---------+-----------------------+ | react_agent_10x | 29.0 ms | 31.0 ms: 1.07x slower | +------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.00x slower | +------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (17): react_agent_10x_checkpoint, react_agent_10x_checkpoint_sync, wide_state_9x1200_checkpoint_sync, wide_state_15x600_checkpoint_sync, fanout_to_subgraph_100x, wide_state_15x600_checkpoint, fanout_to_subgraph_10x, wide_state_25x300_checkpoint, fanout_to_subgraph_10x_checkpoint_sync, react_agent_100x_sync, wide_state_25x300_sync, wide_state_15x600_sync, wide_state_9x1200_sync, react_agent_10x_sync, fanout_to_subgraph_10x_checkpoint, wide_state_9x1200, fanout_to_subgraph_100x_checkpoint_sync
from typing import Annotated, TypedDict, Union, cast
import warnings
from functools import partial
from typing import (
Annotated,
Any,
Callable,
Literal,
Optional,
Sequence,
TypedDict,
Union,
cast,
)

from langchain_core.messages import (
AnyMessage,
BaseMessage,
BaseMessageChunk,
MessageLikeRepresentation,
RemoveMessage,
Expand All @@ -15,7 +28,32 @@
Messages = Union[list[MessageLikeRepresentation], MessageLikeRepresentation]


def add_messages(left: Messages, right: Messages) -> Messages:
def _add_messages_wrapper(func: Callable) -> Callable[[Messages, Messages], Messages]:
def _add_messages(
left: Optional[Messages] = None, right: Optional[Messages] = None, **kwargs: Any
) -> Union[Messages, Callable[[Messages, Messages], Messages]]:
if left is not None and right is not None:
return func(left, right, **kwargs)
elif left is not None or right is not None:
msg = (
f"Must specify non-null arguments for both 'left' and 'right'. Only "
f"received: '{'left' if left else 'right'}'."
)
raise ValueError(msg)
else:
return partial(func, **kwargs)

_add_messages.__doc__ = func.__doc__
return cast(Callable[[Messages, Messages], Messages], _add_messages)


@_add_messages_wrapper
def add_messages(
left: Messages,
right: Messages,
*,
format: Optional[Literal["langchain-openai"]] = None,
baskaryan marked this conversation as resolved.
Show resolved Hide resolved
) -> Messages:
"""Merges two lists of messages, updating existing messages by ID.

By default, this ensures the state is "append-only", unless the
Expand All @@ -25,6 +63,14 @@
left: The base list of messages.
right: The list of messages (or single message) to merge
into the base list.
format: The format to return messages in. If None then messages will be
returned as is. If 'langchain-openai' then messages will be returned as
BaseMessage objects with their contents formatted to match OpenAI message
format, meaning contents can be string, 'text' blocks, or 'image_url' blocks
and tool responses are returned as their own ToolMessages.

**REQUIREMENT**: Must have ``langchain-core>=0.3.11`` installed to use this
feature.

Returns:
A new list of messages with the messages from `right` merged into `left`.
Expand Down Expand Up @@ -58,8 +104,59 @@
>>> graph = builder.compile()
>>> graph.invoke({})
{'messages': [AIMessage(content='Hello', id=...)]}

>>> from typing import Annotated
>>> from typing_extensions import TypedDict
>>> from langgraph.graph import StateGraph, add_messages
>>>
>>> class State(TypedDict):
... messages: Annotated[list, add_messages(format='langchain-openai')]
...
>>> def chatbot_node(state: State) -> list:
... return {"messages": [
... {
... "role": "user",
... "content": [
... {
... "type": "text",
... "text": "Here's an image:",
... "cache_control": {"type": "ephemeral"},
... },
... {
... "type": "image",
... "source": {
... "type": "base64",
... "media_type": "image/jpeg",
... "data": "1234",
... },
... },
... ]
... },
... ]}
>>> builder = StateGraph(State)
>>> builder.add_node("chatbot", chatbot_node)
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> graph = builder.compile()
>>> graph.invoke({"messages": []})
{
'messages': [
HumanMessage(
content=[
{"type": "text", "text": "Here's an image:"},
{
"type": "image_url",
"image_url": {"url": "data:image/jpeg;base64,1234"},
},
],
),
]
}
```

..versionchanged:: 0.2.40

Support for 'format="langchain-openai"' flag added.
"""
# coerce to list
if not isinstance(left, list):
Expand Down Expand Up @@ -100,6 +197,15 @@

merged.append(m)
merged = [m for m in merged if m.id not in ids_to_remove]

if format == "langchain-openai":
merged = _format_messages(merged)
elif format:
msg = f"Unrecognized {format=}. Expected one of 'langchain-openai', None."
raise ValueError(msg)
else:
pass

return merged


Expand Down Expand Up @@ -156,3 +262,19 @@

class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]


def _format_messages(messages: Sequence[BaseMessage]) -> list[BaseMessage]:
try:
from langchain_core.messages import convert_to_openai_messages
except ImportError:
msg = (
"Must have langchain-core>=0.3.11 installed to use automatic message "
"formatting (format='langchain-openai'). Please update your langchain-core "
"version or remove the 'format' flag. Returning un-formatted "
"messages."
)
warnings.warn(msg)
return list(messages)
else:
return convert_to_messages(convert_to_openai_messages(messages))
8 changes: 6 additions & 2 deletions libs/langgraph/langgraph/graph/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,12 @@ def _is_field_binop(typ: Type[Any]) -> Optional[BinaryOperatorAggregate]:
if len(meta) >= 1 and callable(meta[-1]):
sig = signature(meta[0])
params = list(sig.parameters.values())
if len(params) == 2 and all(
p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in params
if (
sum(
p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
for p in params
)
== 2
):
return BinaryOperatorAggregate(typ, meta[0])
else:
Expand Down
2 changes: 1 addition & 1 deletion libs/langgraph/langgraph/pregel/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _create_state_snapshot(self, state: ThreadState) -> StateSnapshot:
interrupts=tuple(interrupts),
state=self._create_state_snapshot(task["state"])
if task["state"]
else {"configurable": task["checkpoint"]}
else cast(RunnableConfig, {"configurable": task["checkpoint"]})
if task["checkpoint"]
else None,
result=task.get("result"),
Expand Down
Loading
Loading