Skip to content

Commit

Permalink
enh: notion garbage collection improvement (#3522)
Browse files Browse the repository at this point in the history
Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Jan 31, 2024
1 parent 869384c commit f63d57d
Showing 1 changed file with 93 additions and 134 deletions.
227 changes: 93 additions & 134 deletions connectors/src/connectors/notion/temporal/activities.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { CoreAPIDataSourceDocumentSection, ModelId } from "@dust-tt/types";
import { assertNever } from "@dust-tt/types";
import { isFullBlock, isFullPage, isNotionClientError } from "@notionhq/client";
import { Context } from "@temporalio/activity";
import { Op } from "sequelize";
Expand Down Expand Up @@ -671,12 +672,9 @@ export async function garbageCollectorMarkAsSeen({
return { newPageIds, newDatabaseIds };
}

// - for all pages that have a lastSeenTs < runTimestamp
// - query notion API and check if the page still exists
// - if the page does not exist, delete it from the database
// - for all databases that have a lastSeenTs < runTimestamp
// - query notion API and check if the database still exists
// - if the database does not exist, delete it from the database
// - for all pages/database that have a lastSeenTs < runTimestamp
// - query notion API and check if we can access the resource
// - if the resource is not accessible, delete it from the database (and from the data source if it's a page)
// - update the lastGarbageCollectionFinishTime
// - will give up after `GARBAGE_COLLECT_MAX_DURATION_MS` milliseconds (including retries if any)
export async function garbageCollect({
Expand Down Expand Up @@ -713,84 +711,115 @@ export async function garbageCollect({
}
const notionAccessToken = await getNotionAccessToken(connector.connectionId);

const pagesToDelete = await NotionPage.findAll({
const pagesToCheck = await NotionPage.findAll({
where: {
connectorId: connector.id,
// Only look at pages that have not been seen since the last garbage collection.
// Only look at pages that have not been seen during the garbage collect run.
lastSeenTs: {
[Op.lt]: new Date(runTimestamp),
},
},
// First handle pages that we have seen the longest time ago. If the garbage collection time
// outs that means we've cleaned-up the oldest page first and will have a chance to continue at
// next garbage collection.
order: [["lastSeenTs", "ASC"]],
});
const databasesToDelete = await NotionDatabase.findAll({
const databasesToCheck = await NotionDatabase.findAll({
where: {
connectorId: connector.id,
// Only look at pages that have not been seen since the last garbage collection.
// Only look at database that have not been seen during the garbage collect run.
lastSeenTs: {
[Op.lt]: new Date(runTimestamp),
},
},
// First handle pages that we have seen the longest time ago. If the garbage collection time
// outs that means we've cleaned-up the oldest page first and will have a chance to continue at
// next garbage collection.
order: [["lastSeenTs", "ASC"]],
});

localLogger.info(
{
pagesToDeleteCount: pagesToDelete.length,
databasesToDeleteCount: databasesToDelete.length,
pagesToDeleteCount: pagesToCheck.length,
databasesToDeleteCount: databasesToCheck.length,
},
"Found pages and databases to delete."
);

const resourcesToCheck: Array<
| { type: "page"; resource: NotionPage }
| { type: "database"; resource: NotionDatabase }
> = [
...pagesToCheck.map((page) => ({ type: "page" as const, resource: page })),
...databasesToCheck.map((db) => ({
type: "database" as const,
resource: db,
})),
];

// First handle resources that we have seen the longest time ago. If the garbage collection times out
// that means we've cleaned-up the oldest resources first and will have a chance to continue at
// next garbage collection.
resourcesToCheck.sort((a, b) => {
return a.resource.lastSeenTs.getTime() - b.resource.lastSeenTs.getTime();
});

const NOTION_UNHEALTHY_ERROR_CODES = [
"internal_server_error",
"notionhq_client_request_timeout",
"service_unavailable",
"notionhq_client_response_error",
];

// Handle Pages.

let deletedPagesCount = 0;
let deletedDatabasesCount = 0;

let skippedPagesCount = 0;
let skippedDatabasesCount = 0;

let stillAccessiblePagesCount = 0;
let stillAccessibleDatabasesCount = 0;

for (const [i, page] of pagesToDelete.entries()) {
for (const [i, x] of resourcesToCheck.entries()) {
const iterationLogger = localLogger.child({
pageId: page.notionPageId,
pagesToDeleteCount: pagesToDelete.length,
pageId: x.type === "page" ? x.resource.notionPageId : undefined,
databaseId:
x.type === "database" ? x.resource.notionDatabaseId : undefined,
pagesToCheckCount: pagesToCheck.length,
databasesToCheckCount: databasesToCheck.length,
index: i,
deletedPagesCount,
deletedDatabasesCount,
skippedPagesCount,
skippedDatabasesCount,
stillAccessiblePagesCount,
stillAccessibleDatabasesCount,
});

if (new Date().getTime() - startTs > GARBAGE_COLLECT_MAX_DURATION_MS) {
iterationLogger.warn("Garbage collection is taking too long, giving up.");
break;
}

if (page.skipReason) {
iterationLogger.info(
{ skipReason: page.skipReason },
"Page is marked as skipped, not deleting."
);
skippedPagesCount++;
if (x.resource.skipReason) {
if (x.type === "page") {
iterationLogger.info(
{ skipReason: x.resource.skipReason },
"Page is marked as skipped, not deleting."
);
skippedPagesCount++;
} else if (x.type === "database") {
iterationLogger.info(
{ skipReason: x.resource.skipReason },
"Database is marked as skipped, not deleting."
);
skippedDatabasesCount++;
} else {
assertNever(x);
}
continue;
}

let pageIsAccessible: boolean;
let resourceIsAccessible: boolean;
try {
pageIsAccessible = await isAccessibleAndUnarchived(
resourceIsAccessible = await isAccessibleAndUnarchived(
notionAccessToken,
page.notionPageId,
"page",
x.type === "page"
? x.resource.notionPageId
: x.resource.notionDatabaseId,
x.type,
iterationLogger
);
} catch (e) {
Expand All @@ -813,118 +842,48 @@ export async function garbageCollect({
error: potentialNotionError,
attempt: Context.current().info.attempt,
},
"Failed to check if page is accessible. Giving up and moving on"
"Failed to check if notion resource is accessible. Giving up and moving on"
);
pageIsAccessible = true;
resourceIsAccessible = true;
} else {
throw e;
}
}

if (pageIsAccessible) {
// Mark the page as seen.
await page.update({
lastSeenTs: new Date(runTimestamp),
});
iterationLogger.info("Page is still accessible, not deleting.");
stillAccessiblePagesCount++;
continue;
}

iterationLogger.info("Deleting page.");
await deleteFromDataSource(
{
dataSourceName: connector.dataSourceName,
workspaceId: connector.workspaceId,
workspaceAPIKey: connector.workspaceAPIKey,
},
`notion-${page.notionPageId}`
);
await page.destroy();

deletedPagesCount++;
}

// Handle Databases.

let deletedDatabasesCount = 0;
let skippedDatabasesCount = 0;
let stillAccessibleDatabasesCount = 0;

for (const [i, database] of databasesToDelete.entries()) {
const iterationLogger = localLogger.child({
databaseId: database.notionDatabaseId,
databasesToDeleteCount: databasesToDelete.length,
index: i,
deletedDatabasesCount,
skippedDatabasesCount,
stillAccessibleDatabasesCount,
});

if (new Date().getTime() - startTs > GARBAGE_COLLECT_MAX_DURATION_MS) {
iterationLogger.warn("Garbage collection is taking too long, giving up.");
break;
}

if (database.skipReason) {
iterationLogger.info(
{ skipReason: database.skipReason },
"Database is marked as skipped, not deleting."
);
skippedDatabasesCount++;
continue;
}

let databaseIsAccessible: boolean;
try {
databaseIsAccessible = await isAccessibleAndUnarchived(
notionAccessToken,
database.notionDatabaseId,
"database",
iterationLogger
);
} catch (e) {
// Sometimes a request will consistently fail with a 500 We don't want to delete the database
// in that case, so we just log the error and move on.
const potentialNotionError = e as {
body: unknown;
code: string;
status: number;
};
if (
(NOTION_UNHEALTHY_ERROR_CODES.includes(potentialNotionError.code) ||
(typeof potentialNotionError.status === "number" &&
potentialNotionError.status >= 500 &&
potentialNotionError.status < 600)) &&
Context.current().info.attempt >= 15
) {
iterationLogger.error(
{
error: potentialNotionError,
attempt: Context.current().info.attempt,
},
"Failed to check if database is accessible. Giving up and moving on"
);
databaseIsAccessible = true;
if (resourceIsAccessible) {
// Mark the resource as seen.
x.resource.lastSeenTs = new Date(runTimestamp);
await x.resource.save();
if (x.type === "page") {
iterationLogger.info("Page is still accessible, not deleting.");
stillAccessiblePagesCount++;
} else if (x.type === "database") {
iterationLogger.info("Database is still accessible, not deleting.");
stillAccessibleDatabasesCount++;
} else {
throw e;
assertNever(x);
}
}

if (databaseIsAccessible) {
// Mark the database as seen.
await database.update({
lastSeenTs: new Date(runTimestamp),
});
iterationLogger.info("Database is still accessible, not deleting.");
stillAccessibleDatabasesCount++;
continue;
}

iterationLogger.info("Deleting database.");
await database.destroy();
if (x.type === "page") {
iterationLogger.info("Deleting page.");
await deleteFromDataSource(
{
dataSourceName: connector.dataSourceName,
workspaceId: connector.workspaceId,
workspaceAPIKey: connector.workspaceAPIKey,
},
`notion-${x.resource.notionPageId}`
);
deletedPagesCount++;
} else {
iterationLogger.info("Deleting database.");
deletedDatabasesCount++;
}

deletedDatabasesCount++;
await x.resource.destroy();
}

await notionConnectorState.update({
Expand Down

0 comments on commit f63d57d

Please sign in to comment.