Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(elastic-search): drop removeDeletedEntries function #10587

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions scripts/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
createElasticSearchIndex,
deleteElasticSearchIndex,
getAvailableElasticSearchIndexes,
restoreUndeletedEntries,
syncElasticSearchIndex,
} from '../server/lib/elastic-search/sync';
import logger from '../server/lib/logger';
Expand Down Expand Up @@ -112,23 +111,6 @@ program
}
});

// Restore undeleted entries
program
.command('restore')
.description('Restore undeleted entries')
.argument('[indexes...]', 'Only sync specific indexes')
.action(async indexesInput => {
checkElasticSearchAvailable();

const indexes = parseIndexesFromInput(indexesInput);
for (const indexName of indexes) {
logger.info(`Restoring undeleted entries for index ${indexName}`);
await restoreUndeletedEntries(indexName as ElasticSearchIndexName, { log: true });
}

logger.info('Restore completed!');
});

// Sync command
program
.command('sync')
Expand Down
131 changes: 4 additions & 127 deletions server/lib/elastic-search/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/

import config from 'config';
import { chunk, partition, uniq } from 'lodash';

import { Op } from '../../models';
import logger from '../logger';
Expand All @@ -26,7 +25,7 @@ export async function createElasticSearchIndex(indexName: ElasticSearchIndexName
});
}

async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: Date) {
async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: Date, { log = false } = {}) {
const client = getElasticSearchClient({ throwIfUnavailable: true });
const adapter = ElasticSearchModelsAdapters[indexName];
const pageSize = 20000; // We're only fetching the id, so we can fetch more entries at once
Expand All @@ -44,7 +43,10 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate:

if (deletedEntries.length === 0) {
return;
} else if (log) {
logger.info(`Deleting ${deletedEntries.length} entries...`);
}

await client.bulk({
index: formatIndexNameForElasticSearch(indexName),
body: deletedEntries.flatMap(entry => [{ delete: { _id: entry.id } }]),
Expand All @@ -53,130 +55,6 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate:
} while (deletedEntries.length === pageSize);
}

/**
* Takes a list like [1,2,3,4,0,165,7,8,9] and simplifies it to [[0,4],[7,9],165]
* to reduce the cost of running SQL queries with large IN clauses. Also simplify smaller ranges:
* adding the number in `id NOT IN (a,b,c)` is cheaper than `id NOT BETWEEN a AND b`. We therefore only
* keep ranges of 5 or more numbers.
*/
const simplifyNumbersInRanges = (
ids: number[],
minRangeSize = 5,
): {
ranges: number[][];
lonelyNumbers: number[];
} => {
const results: Array<number | number[]> = [];

// We need the list to be sorted
const sortedIds = uniq(ids).sort((a, b) => a - b);

// Build the list like [[0,4],[7,9],165]
let firstRangeNumber = sortedIds[0];
let consecutiveNumbers = 0;
for (let i = 1; i < sortedIds.length; i++) {
const id = sortedIds[i];
if (id === firstRangeNumber + consecutiveNumbers + 1) {
consecutiveNumbers++;
} else {
if (consecutiveNumbers >= minRangeSize) {
results.push([firstRangeNumber, firstRangeNumber + consecutiveNumbers]);
} else {
for (let j = 0; j <= consecutiveNumbers; j++) {
results.push(firstRangeNumber + j);
}
}
firstRangeNumber = id;
consecutiveNumbers = 0;
}
}

// Group by type
const [lonelyNumbers, ranges] = partition(results, x => typeof x === 'number');
return { ranges, lonelyNumbers };
};

export async function restoreUndeletedEntries(indexName: ElasticSearchIndexName, { log = false } = {}) {
const client = getElasticSearchClient({ throwIfUnavailable: true });
const adapter = ElasticSearchModelsAdapters[indexName];

if (log) {
logger.info(`Fetching IDs of all undeleted entries in index ${indexName}...`);
}

/* eslint-disable camelcase */
let scrollSearch = await client.search({
index: formatIndexNameForElasticSearch(indexName),
body: { _source: false },
filter_path: ['hits.hits._id', '_scroll_id'],
size: 10_000, // Max value allowed by ES
scroll: '1m', // Keep the search context alive for 1 minute
});

let allIds = scrollSearch.hits.hits.map(hit => hit._id);
const scrollId = scrollSearch._scroll_id;

// Continue scrolling through results
while (scrollSearch.hits.hits.length > 0) {
scrollSearch = await client.scroll({ scroll_id: scrollId, scroll: '1m' });
allIds = allIds.concat(scrollSearch.hits.hits.map(hit => hit._id));
logger.info(`Fetched ${allIds.length} IDs...`);
}

// Clear the scroll when done
await client.clearScroll({ scroll_id: scrollId });
/* eslint-enable camelcase */

// Search for entries that are not marked as deleted in the database
const { ranges, lonelyNumbers } = simplifyNumbersInRanges(allIds.map(Number));
const undeletedEntries = (await adapter.getModel().findAll({
attributes: ['id'],
raw: true,
where: {
[Op.and]: [
{ id: { [Op.notIn]: lonelyNumbers } },
...ranges.map(([start, end]) => ({ id: { [Op.notBetween]: [start, end] } })),
],
},
})) as unknown as Array<{ id: number }>;

if (!undeletedEntries.length) {
if (log) {
logger.info('No undeleted entries found');
}
return;
} else if (log) {
logger.info(`Restoring ${undeletedEntries.length} undeleted entries...`);
}

// Restore undeleted entries
const undeletedIds = undeletedEntries.map(entry => entry.id);
const limit = 5_000;
let modelEntries = [];
let maxId = undefined;
let offset = 0;

for (const ids of chunk(undeletedIds, limit)) {
modelEntries = await adapter.findEntriesToIndex({ offset, limit, ids });
if (modelEntries.length === 0) {
return;
} else if (!maxId) {
maxId = modelEntries[0].id;
}

// Send data to ElasticSearch
await client.bulk({
index: formatIndexNameForElasticSearch(indexName),
body: modelEntries.flatMap(entry => [{ index: { _id: entry.id } }, adapter.mapModelInstanceToDocument(entry)]),
});

offset += limit;
if (log) {
logger.info(`... ${offset} entries synced`);
}
}
}

export async function syncElasticSearchIndex(
indexName: ElasticSearchIndexName,
options: { fromDate?: Date; log?: boolean } = {},
Expand All @@ -193,7 +71,6 @@ export async function syncElasticSearchIndex(
// If there's a fromDate, it means we are doing a simple sync (not a full resync) and therefore need to look at deleted entries
if (fromDate) {
await removeDeletedEntries(indexName, fromDate);
await restoreUndeletedEntries(indexName);
}

// Sync new/edited entries
Expand Down
Loading