Skip to content

Commit

Permalink
connectors: renderDocumentTitleAndContent (#3151)
Browse files Browse the repository at this point in the history
* WIP refactor

* wip

* clean-up renderDocumentForTitleAndContent

* updatedAt and createdAt

* Adapt slack bot.ts

* comments

* nit

* nit

* nit

* s/lastUpdatedAt/updatedAt

* review pr

* rename

* fix title for non threaded messages in slack
  • Loading branch information
spolu authored Jan 11, 2024
1 parent 33573ae commit 5d47c6e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 111 deletions.
51 changes: 28 additions & 23 deletions connectors/src/connectors/github/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from "@connectors/connectors/github/lib/github_api";
import {
deleteFromDataSource,
renderDocumentTitleAndContent,
renderMarkdownSection,
upsertToDatasource,
} from "@connectors/lib/data_sources";
Expand Down Expand Up @@ -108,11 +109,12 @@ async function renderIssue(

const issue = await getIssue(installationId, repoName, login, issueNumber);

const content = renderMarkdownSection(
`Issue #${issue.number} [${repoName}]: ${issue.title}\n`,
issue.body || "",
{ flavor: "gfm" }
);
const content = renderDocumentTitleAndContent({
title: `Issue #${issue.number} [${repoName}]: ${issue.title}`,
createdAt: issue.createdAt,
updatedAt: issue.updatedAt,
content: renderMarkdownSection(issue.body ?? "", { flavor: "gfm" }),
});

let resultPage = 1;
let lastCommentUpdateTime: Date | null = null;
Expand Down Expand Up @@ -147,11 +149,11 @@ async function renderIssue(

for (const comment of comments) {
if (comment.body) {
const c = renderMarkdownSection(
`>> ${renderGithubUser(comment.creator)}:\n`,
comment.body,
{ flavor: "gfm" }
);
const c = {
prefix: `>> ${renderGithubUser(comment.creator)}:\n`,
content: null,
sections: [renderMarkdownSection(comment.body, { flavor: "gfm" })],
};
content.sections.push(c);
}
if (
Expand Down Expand Up @@ -280,11 +282,10 @@ async function renderDiscussion(
discussionNumber
);

const content = renderMarkdownSection(
`Discussion #${discussion.number} [${repoName}]: ${discussion.title}\n`,
discussion.bodyText,
{ flavor: "gfm" }
);
const content = renderDocumentTitleAndContent({
title: `Discussion #${discussion.number} [${repoName}]: ${discussion.title}`,
content: renderMarkdownSection(discussion.bodyText, { flavor: "gfm" }),
});

let nextCursor: string | null = null;

Expand All @@ -311,9 +312,11 @@ async function renderDiscussion(
prefix += "[ACCEPTED ANSWER] ";
}
prefix += `${comment.author?.login || "Unknown author"}:\n`;
const c = renderMarkdownSection(prefix, comment.bodyText, {
flavor: "gfm",
});
const c = {
prefix,
content: null,
sections: [renderMarkdownSection(comment.bodyText, { flavor: "gfm" })],
};
content.sections.push(c);

let nextChildCursor: string | null = null;
Expand All @@ -333,11 +336,13 @@ async function renderDiscussion(
);

for (const childComment of childComments) {
const cc = renderMarkdownSection(
`>> ${childComment.author?.login || "Unknown author"}:\n`,
childComment.bodyText,
{ flavor: "gfm" }
);
const cc = {
prefix: `>> ${childComment.author?.login || "Unknown author"}:\n`,
content: null,
sections: [
renderMarkdownSection(comment.bodyText, { flavor: "gfm" }),
],
};
c.sections.push(cc);
}

Expand Down
15 changes: 9 additions & 6 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import PQueue from "p-queue";
import {
deleteFromDataSource,
MAX_DOCUMENT_TXT_LEN,
renderSectionForTitleAndContent,
renderDocumentTitleAndContent,
upsertToDatasource,
} from "@connectors/lib/data_sources";
import { HTTPError } from "@connectors/lib/error";
Expand Down Expand Up @@ -509,11 +509,14 @@ async function syncOneFile(
return false;
}

// Add the title of the file to the beginning of the document.
const content = renderSectionForTitleAndContent(
file.name,
documentContent || null
);
const content = renderDocumentTitleAndContent({
title: file.name,
updatedAt: file.updatedAtMs ? new Date(file.updatedAtMs) : undefined,
createdAt: file.createdAtMs ? new Date(file.createdAtMs) : undefined,
content: documentContent
? { prefix: null, content: documentContent, sections: [] }
: null,
});

if (documentContent === undefined) {
logger.error(
Expand Down
22 changes: 12 additions & 10 deletions connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import {
import {
deleteFromDataSource,
MAX_DOCUMENT_TXT_LEN,
renderSectionForTitleAndContent,
renderDocumentTitleAndContent,
upsertToDatasource,
} from "@connectors/lib/data_sources";
import { ExternalOauthTokenError } from "@connectors/lib/error";
Expand Down Expand Up @@ -1734,8 +1734,8 @@ export async function renderAndUpsertPageFromCache({
skipReason = "body_too_large";
}

const createdTime = new Date(pageCacheEntry.createdTime).getTime();
const updatedTime = new Date(pageCacheEntry.lastEditedTime).getTime();
const createdAt = new Date(pageCacheEntry.createdTime);
const updatedAt = new Date(pageCacheEntry.lastEditedTime);

if (!pageHasBody) {
localLogger.info(
Expand All @@ -1753,10 +1753,12 @@ export async function renderAndUpsertPageFromCache({
runTimestamp.toString()
);

const content = renderSectionForTitleAndContent(
title || null,
renderedPage
);
const content = renderDocumentTitleAndContent({
title: title ?? null,
createdAt: createdAt,
updatedAt: updatedAt,
content: { prefix: null, content: renderedPage, sections: [] },
});

localLogger.info(
"notionRenderAndUpsertPageFromCache: Upserting to Data Source."
Expand All @@ -1770,13 +1772,13 @@ export async function renderAndUpsertPageFromCache({
documentId,
documentContent: content,
documentUrl: pageCacheEntry.url,
timestampMs: updatedTime,
timestampMs: updatedAt.getTime(),
tags: getTagsForPage({
title,
author,
lastEditor,
createdTime,
updatedTime,
createdTime: createdAt.getTime(),
updatedTime: updatedAt.getTime(),
}),
parents,
retries: 3,
Expand Down
14 changes: 7 additions & 7 deletions connectors/src/connectors/slack/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -756,13 +756,13 @@ async function makeContentFragment(
return new Err(new Error("Could not retrieve channel name"));
}

const content = await formatMessagesForUpsert(
channelId,
channel.channel.name,
allMessages,
connector.id,
slackClient
);
const content = await formatMessagesForUpsert({
channelName: channel.channel.name,
messages: allMessages,
isThread: true,
connectorId: connector.id,
slackClient,
});

let url: string | null = null;
if (allMessages[0]?.ts) {
Expand Down
76 changes: 47 additions & 29 deletions connectors/src/connectors/slack/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_c
import { cacheGet, cacheSet } from "@connectors/lib/cache";
import {
deleteFromDataSource,
renderDocumentTitleAndContent,
upsertToDatasource,
} from "@connectors/lib/data_sources";
import { WorkflowError } from "@connectors/lib/error";
Expand Down Expand Up @@ -422,13 +423,13 @@ export async function syncNonThreaded(
}
messages.reverse();

const content = await formatMessagesForUpsert(
channelId,
const content = await formatMessagesForUpsert({
channelName,
messages,
isThread: false,
connectorId,
client
);
slackClient: client,
});

const startDate = new Date(startTsMs);
const endDate = new Date(endTsMs);
Expand Down Expand Up @@ -598,13 +599,13 @@ export async function syncThread(
return;
}

const content = await formatMessagesForUpsert(
channelId,
const content = await formatMessagesForUpsert({
channelName,
allMessages,
messages: allMessages,
isThread: true,
connectorId,
slackClient
);
slackClient,
});

const firstMessage = allMessages[0];
let sourceUrl: string | undefined = undefined;
Expand Down Expand Up @@ -670,13 +671,19 @@ async function processMessageForMentions(
return message;
}

export async function formatMessagesForUpsert(
channelId: string,
channelName: string,
messages: MessageElement[],
connectorId: ModelId,
slackClient: WebClient
): Promise<CoreAPIDataSourceDocumentSection> {
export async function formatMessagesForUpsert({
channelName,
messages,
isThread,
connectorId,
slackClient,
}: {
channelName: string;
messages: MessageElement[];
isThread: boolean;
connectorId: ModelId;
slackClient: WebClient;
}): Promise<CoreAPIDataSourceDocumentSection> {
const data = await Promise.all(
messages.map(async (message) => {
const text = await processMessageForMentions(
Expand All @@ -694,6 +701,7 @@ export async function formatMessagesForUpsert(
const messageDateStr = formatDateForUpsert(messageDate);

return {
messageDate,
dateStr: messageDateStr,
userName,
text: text,
Expand All @@ -703,22 +711,32 @@ export async function formatMessagesForUpsert(
})
);

const first = data[0];
if (!first) {
const first = data.at(0);
const last = data.at(-1);
if (!last || !first) {
throw new Error("Cannot format empty list of messages");
}

return {
prefix: `Thread in #${channelName} [${first.dateStr}]: ${
first.text.replace(/\s+/g, " ").trim().substring(0, 128) + "..."
}\n`,
content: null,
sections: data.map((d) => ({
prefix: `>> @${d.userName} [${d.dateStr}]:\n`,
content: d.text + "\n",
sections: [],
})),
};
const title = isThread
? `Thread in #${channelName}: ${
first.text.replace(/\s+/g, " ").trim().substring(0, 128) + "..."
}`
: `Messages in #${channelName}`;

return renderDocumentTitleAndContent({
title,
createdAt: first.messageDate,
updatedAt: last.messageDate,
content: {
prefix: null,
content: null,
sections: data.map((d) => ({
prefix: `>> @${d.userName} [${d.dateStr}]:\n`,
content: d.text + "\n",
sections: [],
})),
},
});
}

export async function fetchUsers(connectorId: ModelId) {
Expand Down
Loading

0 comments on commit 5d47c6e

Please sign in to comment.