diff --git a/front/components/assistant/conversation/AgentMessage.tsx b/front/components/assistant/conversation/AgentMessage.tsx index c41999b0d1eee..244f3ce4c039e 100644 --- a/front/components/assistant/conversation/AgentMessage.tsx +++ b/front/components/assistant/conversation/AgentMessage.tsx @@ -136,9 +136,6 @@ export function AgentMessage({ const buildEventSourceURL = useCallback( (lastEvent: string | null) => { - if (!shouldStream) { - return null; - } const esURL = `/api/w/${owner.sId}/assistant/conversations/${conversationId}/messages/${message.sId}/events`; let lastEventId = ""; if (lastEvent) { @@ -151,7 +148,7 @@ export function AgentMessage({ return url; }, - [conversationId, message.sId, owner.sId, shouldStream] + [conversationId, message.sId, owner.sId] ); const onEventCallback = useCallback((eventStr: string) => { @@ -270,7 +267,12 @@ export function AgentMessage({ } }, []); - useEventSource(buildEventSourceURL, onEventCallback); + useEventSource( + buildEventSourceURL, + onEventCallback, + `message-${message.sId}`, + { isReadyToConsumeStream: shouldStream } + ); const agentMessageToRender = ((): AgentMessageType => { switch (message.status) { diff --git a/front/components/assistant/conversation/ConversationContainer.tsx b/front/components/assistant/conversation/ConversationContainer.tsx index 4c0af8a6d1a5d..6b4dd02b53fe8 100644 --- a/front/components/assistant/conversation/ConversationContainer.tsx +++ b/front/components/assistant/conversation/ConversationContainer.tsx @@ -1,17 +1,14 @@ import { Page } from "@dust-tt/sparkle"; import type { AgentMention, - AgentMessageWithRankType, LightAgentConfigurationType, MentionType, SubscriptionType, - UserMessageWithRankType, UserType, WorkspaceType, } from "@dust-tt/types"; import type { UploadedContentFragment } from "@dust-tt/types"; import { Transition } from "@headlessui/react"; -import { cloneDeep } from "lodash"; import { useRouter } from "next/router"; import { Fragment, @@ -35,7 +32,7 @@ import { } from "@app/components/assistant/conversation/lib"; import { DropzoneContainer } from "@app/components/misc/DropzoneContainer"; import { SendNotificationsContext } from "@app/components/sparkle/Notification"; -import type { FetchConversationMessagesResponse } from "@app/lib/api/assistant/messages"; +import { updateMessagePagesWithOptimisticData } from "@app/lib/client/conversation/event_handlers"; import { getRandomGreetingForName } from "@app/lib/client/greetings"; import { useSubmitFunction } from "@app/lib/client/utils"; import { useConversationMessages } from "@app/lib/swr"; @@ -330,29 +327,3 @@ export function ConversationContainer({ ); } - -/** - * If no message pages exist, create a single page with the optimistic message. - * If message pages exist, add the optimistic message to the first page, since - * the message pages array is not yet reversed. - */ -export function updateMessagePagesWithOptimisticData( - currentMessagePages: FetchConversationMessagesResponse[] | undefined, - messageOrPlaceholder: AgentMessageWithRankType | UserMessageWithRankType -): FetchConversationMessagesResponse[] { - if (!currentMessagePages || currentMessagePages.length === 0) { - return [ - { - messages: [messageOrPlaceholder], - hasMore: false, - lastValue: null, - }, - ]; - } - - // We need to deep clone here, since SWR relies on the reference. - const updatedMessages = cloneDeep(currentMessagePages); - updatedMessages.at(0)?.messages.push(messageOrPlaceholder); - - return updatedMessages; -} diff --git a/front/components/assistant/conversation/ConversationViewer.tsx b/front/components/assistant/conversation/ConversationViewer.tsx index a7b7802406cff..f6f35dd95d36e 100644 --- a/front/components/assistant/conversation/ConversationViewer.tsx +++ b/front/components/assistant/conversation/ConversationViewer.tsx @@ -17,12 +17,15 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import React from "react"; import { useInView } from "react-intersection-observer"; -import { updateMessagePagesWithOptimisticData } from "@app/components/assistant/conversation/ConversationContainer"; import { CONVERSATION_PARENT_SCROLL_DIV_ID } from "@app/components/assistant/conversation/lib"; import MessageGroup from "@app/components/assistant/conversation/messages/MessageGroup"; import { useEventSource } from "@app/hooks/useEventSource"; import { useLastMessageGroupObserver } from "@app/hooks/useLastMessageGroupObserver"; import type { FetchConversationMessagesResponse } from "@app/lib/api/assistant/messages"; +import { + getUpdatedMessagesFromEvent, + getUpdatedParticipantsFromEvent, +} from "@app/lib/client/conversation/event_handlers"; import { useConversation, useConversationMessages, @@ -40,22 +43,6 @@ export type MessageWithContentFragmentsType = contenFragments?: ContentFragmentType[]; }); -function shouldProcessStreamEvent( - messages: FetchConversationMessagesResponse[] | undefined, - event: - | UserMessageNewEvent - | AgentMessageNewEvent - | AgentGenerationCancelledEvent -): boolean { - const isMessageAlreadyInPages = messages?.some((messages) => { - return messages.messages.some( - (message) => "sId" in message && message.sId === event.messageId - ); - }); - - return !isMessageAlreadyInPages; -} - interface ConversationViewerProps { conversationId: string; hideReactions?: boolean; @@ -280,36 +267,20 @@ const ConversationViewer = React.forwardRef< switch (event.type) { case "user_message_new": case "agent_message_new": - if (shouldProcessStreamEvent(messages, event)) { - // Temporarily add agent message using event payload until revalidation. - void mutateMessages(async (currentMessagePages) => { - if (!currentMessagePages) { - return undefined; - } - - const { rank } = event.message; - - // We only support adding at the end of the first page. - const [firstPage] = currentMessagePages; - const firstPageLastMessage = firstPage.messages.at(-1); - if (firstPageLastMessage && firstPageLastMessage.rank < rank) { - return updateMessagePagesWithOptimisticData( - currentMessagePages, - event.message - ); - } - - return currentMessagePages; - }); - void mutateConversationParticipants(); - } + // Temporarily add agent message using event payload until revalidation. + void mutateMessages(async (currentMessagePages) => { + return getUpdatedMessagesFromEvent(currentMessagePages, event); + }); + + void mutateConversationParticipants(async (participants) => { + return getUpdatedParticipantsFromEvent(participants, event); + }); break; case "agent_generation_cancelled": - if (shouldProcessStreamEvent(messages, event)) { - void mutateMessages(); - } + void mutateMessages(); break; + case "conversation_title": { void mutateConversation(); void mutateConversations(); // to refresh the list of convos in the sidebar @@ -325,17 +296,23 @@ const ConversationViewer = React.forwardRef< [ mutateConversation, mutateConversations, - messages, mutateMessages, mutateConversationParticipants, ] ); - useEventSource(buildEventSourceURL, onEventCallback, { - // We only start consuming the stream when the conversation has been loaded and we have a first page of message. - isReadyToConsumeStream: - !isConversationLoading && !isLoadingInitialData && messages.length !== 0, - }); + useEventSource( + buildEventSourceURL, + onEventCallback, + `conversation-${conversationId}`, + { + // We only start consuming the stream when the conversation has been loaded and we have a first page of message. + isReadyToConsumeStream: + !isConversationLoading && + !isLoadingInitialData && + messages.length !== 0, + } + ); const eventIds = useRef([]); const typedGroupedMessages = useMemo( diff --git a/front/hooks/useEventSource.ts b/front/hooks/useEventSource.ts index 8ea52fa023002..c4f2f0097da16 100644 --- a/front/hooks/useEventSource.ts +++ b/front/hooks/useEventSource.ts @@ -1,67 +1,173 @@ -import { useEffect, useRef, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; + +const RECONNECT_DELAY = 5000; // 5 seconds. + +/** + * Stable EventSource Manager + * + * This singleton object manages EventSource instances across the entire application. + * It provides methods to create, retrieve, and remove EventSource connections, + * ensuring that each unique connection is properly managed and can be accessed + * or closed as needed. + * + * Key features: + * - Maintains a single source of truth for all EventSource instances. + * - Prevents duplicate EventSource creations for the same unique identifier. + * - Allows for centralized management of EventSource lifecycle. + * - Improves performance by reusing existing connections when possible. + * + * Usage: + * - Create a new EventSource: stableEventSourceManager.create(url, uniqueId) + * - Retrieve an existing EventSource: stableEventSourceManager.get(uniqueId) + * - Close and remove an EventSource: stableEventSourceManager.remove(uniqueId) + * + * This manager is designed to be used in conjunction with the useEventSource hook + * to provide stable and efficient EventSource handling across React component lifecycles. + */ +const stableEventSourceManager = { + // Map to store active EventSource instances, keyed by unique identifiers + sources: new Map(), + + /** + * Creates a new EventSource instance and stores it in the sources map. + * @param url The URL to connect the EventSource to + * @param uniqueId A unique identifier for this EventSource instance + * @returns The newly created EventSource instance + */ + create(url: string, uniqueId: string) { + const newSource = new EventSource(url); + this.sources.set(uniqueId, newSource); + return newSource; + }, + + /** + * Retrieves an existing EventSource instance by its unique identifier. + * @param uniqueId The unique identifier of the EventSource to retrieve + * @returns The EventSource instance if found, undefined otherwise + */ + get(uniqueId: string) { + return this.sources.get(uniqueId); + }, + + /** + * Closes and removes an EventSource instance from the sources map. + * @param uniqueId The unique identifier of the EventSource to remove + */ + remove(uniqueId: string) { + const source = this.sources.get(uniqueId); + if (source) { + source.close(); + this.sources.delete(uniqueId); + } + }, +}; export function useEventSource( - buildURL: (lastMessage: string | null) => string | null, + buildURL: (lastEvent: string | null) => string | null, onEventCallback: (event: string) => void, - { isReadyToConsumeStream }: { isReadyToConsumeStream: boolean } = { - isReadyToConsumeStream: true, - } + uniqueId: string, + { isReadyToConsumeStream = true }: { isReadyToConsumeStream?: boolean } = {} ) { - // State used to re-connect to the events stream; this is a hack to re-trigger - // the useEffect that set-up the EventSource to the streaming endpoint. - const [reconnectCounter, setReconnectCounter] = useState(0); - const lastEvent = useRef(null); - const errorCount = useRef(0); const [isError, setIsError] = useState(null); + const lastEvent = useRef(null); + const reconnectAttempts = useRef(0); - useEffect(() => { - if (!isReadyToConsumeStream) { - return; - } + // We use a counter to trigger reconnects when the counter changes. + const [reconnectCounter, setReconnectCounter] = useState(0); + // Store the reconnect timeout reference to clear it when needed. + const reconnectTimeoutRef = useRef(null); + + // Use the stable event source manager to ensure consistent EventSource management + // across renders and component lifecycles. + const sourceManager = stableEventSourceManager; + + const connect = useCallback(() => { const url = buildURL(lastEvent.current); if (!url) { - return; + // If the url is empty, it means streaming is done. + // Close any previous connections for this uniqueId and remove it from the manager. + sourceManager.remove(uniqueId); + + return null; } - let reconnectTimeout: NodeJS.Timeout | null = null; - const es = new EventSource(url); + let source = sourceManager.get(uniqueId); + // If the source is closed or doesn't exist, create a new one. + if (!source || source.readyState === EventSource.CLOSED) { + source = sourceManager.create(url, uniqueId); + } - es.onopen = () => { - errorCount.current = 0; + source.onopen = () => { + // If connected, reset the reconnect attempts and clear the reconnect timeout. + reconnectAttempts.current = 0; + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); + } }; - es.onmessage = (event: MessageEvent) => { + source.onmessage = (event: MessageEvent) => { if (event.data === "done") { + source.close(); + + // Reconnect to the stream right away. setReconnectCounter((c) => c + 1); - es.close(); return; } + onEventCallback(event.data); lastEvent.current = event.data; }; - es.onerror = (event) => { - console.error("useEventSource.onerror()", event); - errorCount.current += 1; - if (errorCount.current >= 3) { - console.log("too many errors, not reconnecting.."); + source.onerror = (event: Event) => { + console.error("EventSource error", event); + source.close(); + + reconnectAttempts.current++; + + if (reconnectAttempts.current >= 10) { + console.log( + "Too many errors, not reconnecting. Please refresh the page." + ); setIsError(new Error("Too many errors, closing connection.")); - es.close(); + return; } - reconnectTimeout = setTimeout(() => { + + console.error( + `Connection error. Attempting to reconnect in ${RECONNECT_DELAY}ms` + ); + + // Set timeout to reconnect after a delay. + reconnectTimeoutRef.current = setTimeout(() => { setReconnectCounter((c) => c + 1); - }, 1000); + }, RECONNECT_DELAY); }; + return source; + }, [buildURL, onEventCallback, uniqueId, sourceManager]); + + useEffect(() => { + if (!isReadyToConsumeStream) { + return; + } + + connect(); + return () => { - if (reconnectTimeout) { - clearTimeout(reconnectTimeout); + sourceManager.remove(uniqueId); + + if (reconnectTimeoutRef.current) { + clearTimeout(reconnectTimeoutRef.current); } - es.close(); }; - }, [buildURL, onEventCallback, reconnectCounter, isReadyToConsumeStream]); + }, [ + isReadyToConsumeStream, + connect, + reconnectCounter, + sourceManager, + uniqueId, + ]); return { isError }; } diff --git a/front/lib/client/conversation/event_handlers.ts b/front/lib/client/conversation/event_handlers.ts new file mode 100644 index 0000000000000..1dafb85cfbdc0 --- /dev/null +++ b/front/lib/client/conversation/event_handlers.ts @@ -0,0 +1,116 @@ +import type { + AgentMessageNewEvent, + AgentMessageWithRankType, + UserMessageNewEvent, + UserMessageWithRankType, +} from "@dust-tt/types"; +import { isAgentMessageType, isUserMessageType } from "@dust-tt/types"; +import { cloneDeep } from "lodash"; + +import type { FetchConversationMessagesResponse } from "@app/lib/api/assistant/messages"; +import type { FetchConversationParticipantsResponse } from "@app/pages/api/w/[wId]/assistant/conversations/[cId]/participants"; + +/** + * If no message pages exist, create a single page with the optimistic message. + * If message pages exist, add the optimistic message to the first page, since + * the message pages array is not yet reversed. + */ +export function updateMessagePagesWithOptimisticData( + currentMessagePages: FetchConversationMessagesResponse[] | undefined, + messageOrPlaceholder: AgentMessageWithRankType | UserMessageWithRankType +): FetchConversationMessagesResponse[] { + if (!currentMessagePages || currentMessagePages.length === 0) { + return [ + { + messages: [messageOrPlaceholder], + hasMore: false, + lastValue: null, + }, + ]; + } + + // We need to deep clone here, since SWR relies on the reference. + const updatedMessages = cloneDeep(currentMessagePages); + updatedMessages.at(0)?.messages.push(messageOrPlaceholder); + + return updatedMessages; +} + +// Function to update the message pages with the new message from the event. +export function getUpdatedMessagesFromEvent( + currentMessagePages: FetchConversationMessagesResponse[] | undefined, + event: AgentMessageNewEvent | UserMessageNewEvent +) { + if (!currentMessagePages) { + return undefined; + } + + // Check if the message already exists in the cache. + const isMessageAlreadyInCache = currentMessagePages.some((page) => + page.messages.some((message) => message.sId === event.message.sId) + ); + + // If the message is already in the cache, ignore the event. + if (isMessageAlreadyInCache) { + return currentMessagePages; + } + + const { rank } = event.message; + + // We only support adding at the end of the first page. + const [firstPage] = currentMessagePages; + const firstPageLastMessage = firstPage.messages.at(-1); + if (firstPageLastMessage && firstPageLastMessage.rank < rank) { + return updateMessagePagesWithOptimisticData( + currentMessagePages, + event.message + ); + } + + return currentMessagePages; +} + +// Function to update the participants with the new message from the event. +export function getUpdatedParticipantsFromEvent( + participants: FetchConversationParticipantsResponse | undefined, + event: AgentMessageNewEvent | UserMessageNewEvent +) { + if (!participants) { + return undefined; + } + + const { message } = event; + if (isUserMessageType(message)) { + const { user } = message; + const isAlreadyParticipant = participants.participants.users.some( + (u) => u.username === message.user?.username + ); + + if (!user || isAlreadyParticipant) { + return participants; + } else { + participants.participants.users.push({ + username: user.username, + fullName: user.fullName, + pictureUrl: user.image, + }); + } + } else if (isAgentMessageType(message)) { + const { configuration } = message; + const isAlreadyParticipant = participants.participants.agents.some( + (a) => a.configurationId === configuration.sId + ); + + if (isAlreadyParticipant) { + return participants; + } else { + participants.participants.agents.push({ + configurationId: configuration.sId, + name: configuration.name, + pictureUrl: configuration.pictureUrl, + }); + } + } + + return participants; +}