Skip to content

Commit

Permalink
Flav/stable use event source (#6313)
Browse files Browse the repository at this point in the history
* Fix AgentMessage memoization

* Implement stable event source

* Fix agent message streaming

* ✨

* ✂️
  • Loading branch information
flvndvd authored and albandum committed Aug 28, 2024
1 parent 382580b commit 9a0d1b8
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 117 deletions.
12 changes: 7 additions & 5 deletions front/components/assistant/conversation/AgentMessage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ export function AgentMessage({

const buildEventSourceURL = useCallback(
(lastEvent: string | null) => {
if (!shouldStream) {
return null;
}
const esURL = `/api/w/${owner.sId}/assistant/conversations/${conversationId}/messages/${message.sId}/events`;
let lastEventId = "";
if (lastEvent) {
Expand All @@ -151,7 +148,7 @@ export function AgentMessage({

return url;
},
[conversationId, message.sId, owner.sId, shouldStream]
[conversationId, message.sId, owner.sId]
);

const onEventCallback = useCallback((eventStr: string) => {
Expand Down Expand Up @@ -270,7 +267,12 @@ export function AgentMessage({
}
}, []);

useEventSource(buildEventSourceURL, onEventCallback);
useEventSource(
buildEventSourceURL,
onEventCallback,
`message-${message.sId}`,
{ isReadyToConsumeStream: shouldStream }
);

const agentMessageToRender = ((): AgentMessageType => {
switch (message.status) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import { Page } from "@dust-tt/sparkle";
import type {
AgentMention,
AgentMessageWithRankType,
LightAgentConfigurationType,
MentionType,
SubscriptionType,
UserMessageWithRankType,
UserType,
WorkspaceType,
} from "@dust-tt/types";
import type { UploadedContentFragment } from "@dust-tt/types";
import { Transition } from "@headlessui/react";
import { cloneDeep } from "lodash";
import { useRouter } from "next/router";
import {
Fragment,
Expand All @@ -35,7 +32,7 @@ import {
} from "@app/components/assistant/conversation/lib";
import { DropzoneContainer } from "@app/components/misc/DropzoneContainer";
import { SendNotificationsContext } from "@app/components/sparkle/Notification";
import type { FetchConversationMessagesResponse } from "@app/lib/api/assistant/messages";
import { updateMessagePagesWithOptimisticData } from "@app/lib/client/conversation/event_handlers";
import { getRandomGreetingForName } from "@app/lib/client/greetings";
import { useSubmitFunction } from "@app/lib/client/utils";
import { useConversationMessages } from "@app/lib/swr";
Expand Down Expand Up @@ -330,29 +327,3 @@ export function ConversationContainer({
</DropzoneContainer>
);
}

/**
* If no message pages exist, create a single page with the optimistic message.
* If message pages exist, add the optimistic message to the first page, since
* the message pages array is not yet reversed.
*/
export function updateMessagePagesWithOptimisticData(
currentMessagePages: FetchConversationMessagesResponse[] | undefined,
messageOrPlaceholder: AgentMessageWithRankType | UserMessageWithRankType
): FetchConversationMessagesResponse[] {
if (!currentMessagePages || currentMessagePages.length === 0) {
return [
{
messages: [messageOrPlaceholder],
hasMore: false,
lastValue: null,
},
];
}

// We need to deep clone here, since SWR relies on the reference.
const updatedMessages = cloneDeep(currentMessagePages);
updatedMessages.at(0)?.messages.push(messageOrPlaceholder);

return updatedMessages;
}
75 changes: 26 additions & 49 deletions front/components/assistant/conversation/ConversationViewer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import React from "react";
import { useInView } from "react-intersection-observer";

import { updateMessagePagesWithOptimisticData } from "@app/components/assistant/conversation/ConversationContainer";
import { CONVERSATION_PARENT_SCROLL_DIV_ID } from "@app/components/assistant/conversation/lib";
import MessageGroup from "@app/components/assistant/conversation/messages/MessageGroup";
import { useEventSource } from "@app/hooks/useEventSource";
import { useLastMessageGroupObserver } from "@app/hooks/useLastMessageGroupObserver";
import type { FetchConversationMessagesResponse } from "@app/lib/api/assistant/messages";
import {
getUpdatedMessagesFromEvent,
getUpdatedParticipantsFromEvent,
} from "@app/lib/client/conversation/event_handlers";
import {
useConversation,
useConversationMessages,
Expand All @@ -40,22 +43,6 @@ export type MessageWithContentFragmentsType =
contenFragments?: ContentFragmentType[];
});

function shouldProcessStreamEvent(
messages: FetchConversationMessagesResponse[] | undefined,
event:
| UserMessageNewEvent
| AgentMessageNewEvent
| AgentGenerationCancelledEvent
): boolean {
const isMessageAlreadyInPages = messages?.some((messages) => {
return messages.messages.some(
(message) => "sId" in message && message.sId === event.messageId
);
});

return !isMessageAlreadyInPages;
}

interface ConversationViewerProps {
conversationId: string;
hideReactions?: boolean;
Expand Down Expand Up @@ -280,36 +267,20 @@ const ConversationViewer = React.forwardRef<
switch (event.type) {
case "user_message_new":
case "agent_message_new":
if (shouldProcessStreamEvent(messages, event)) {
// Temporarily add agent message using event payload until revalidation.
void mutateMessages(async (currentMessagePages) => {
if (!currentMessagePages) {
return undefined;
}

const { rank } = event.message;

// We only support adding at the end of the first page.
const [firstPage] = currentMessagePages;
const firstPageLastMessage = firstPage.messages.at(-1);
if (firstPageLastMessage && firstPageLastMessage.rank < rank) {
return updateMessagePagesWithOptimisticData(
currentMessagePages,
event.message
);
}

return currentMessagePages;
});
void mutateConversationParticipants();
}
// Temporarily add agent message using event payload until revalidation.
void mutateMessages(async (currentMessagePages) => {
return getUpdatedMessagesFromEvent(currentMessagePages, event);
});

void mutateConversationParticipants(async (participants) => {
return getUpdatedParticipantsFromEvent(participants, event);
});
break;

case "agent_generation_cancelled":
if (shouldProcessStreamEvent(messages, event)) {
void mutateMessages();
}
void mutateMessages();
break;

case "conversation_title": {
void mutateConversation();
void mutateConversations(); // to refresh the list of convos in the sidebar
Expand All @@ -325,17 +296,23 @@ const ConversationViewer = React.forwardRef<
[
mutateConversation,
mutateConversations,
messages,
mutateMessages,
mutateConversationParticipants,
]
);

useEventSource(buildEventSourceURL, onEventCallback, {
// We only start consuming the stream when the conversation has been loaded and we have a first page of message.
isReadyToConsumeStream:
!isConversationLoading && !isLoadingInitialData && messages.length !== 0,
});
useEventSource(
buildEventSourceURL,
onEventCallback,
`conversation-${conversationId}`,
{
// We only start consuming the stream when the conversation has been loaded and we have a first page of message.
isReadyToConsumeStream:
!isConversationLoading &&
!isLoadingInitialData &&
messages.length !== 0,
}
);
const eventIds = useRef<string[]>([]);

const typedGroupedMessages = useMemo(
Expand Down
Loading

0 comments on commit 9a0d1b8

Please sign in to comment.