diff --git a/scripts/search.ts b/scripts/search.ts index 88335ba1938..ce0e894daf8 100644 --- a/scripts/search.ts +++ b/scripts/search.ts @@ -12,7 +12,6 @@ import { createElasticSearchIndex, deleteElasticSearchIndex, getAvailableElasticSearchIndexes, - restoreUndeletedEntries, syncElasticSearchIndex, } from '../server/lib/elastic-search/sync'; import logger from '../server/lib/logger'; @@ -112,23 +111,6 @@ program } }); -// Restore undeleted entries -program - .command('restore') - .description('Restore undeleted entries') - .argument('[indexes...]', 'Only sync specific indexes') - .action(async indexesInput => { - checkElasticSearchAvailable(); - - const indexes = parseIndexesFromInput(indexesInput); - for (const indexName of indexes) { - logger.info(`Restoring undeleted entries for index ${indexName}`); - await restoreUndeletedEntries(indexName as ElasticSearchIndexName, { log: true }); - } - - logger.info('Restore completed!'); - }); - // Sync command program .command('sync') diff --git a/server/lib/elastic-search/sync.ts b/server/lib/elastic-search/sync.ts index af85f36d035..dc7fadd2063 100644 --- a/server/lib/elastic-search/sync.ts +++ b/server/lib/elastic-search/sync.ts @@ -3,7 +3,6 @@ */ import config from 'config'; -import { chunk, partition, uniq } from 'lodash'; import { Op } from '../../models'; import logger from '../logger'; @@ -26,7 +25,7 @@ export async function createElasticSearchIndex(indexName: ElasticSearchIndexName }); } -async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: Date) { +async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: Date, { log = false } = {}) { const client = getElasticSearchClient({ throwIfUnavailable: true }); const adapter = ElasticSearchModelsAdapters[indexName]; const pageSize = 20000; // We're only fetching the id, so we can fetch more entries at once @@ -44,7 +43,10 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: if (deletedEntries.length === 0) { return; + } else if (log) { + logger.info(`Deleting ${deletedEntries.length} entries...`); } + await client.bulk({ index: formatIndexNameForElasticSearch(indexName), body: deletedEntries.flatMap(entry => [{ delete: { _id: entry.id } }]), @@ -53,130 +55,6 @@ async function removeDeletedEntries(indexName: ElasticSearchIndexName, fromDate: } while (deletedEntries.length === pageSize); } -/** - * Takes a list like [1,2,3,4,0,165,7,8,9] and simplifies it to [[0,4],[7,9],165] - * to reduce the cost of running SQL queries with large IN clauses. Also simplify smaller ranges: - * adding the number in `id NOT IN (a,b,c)` is cheaper than `id NOT BETWEEN a AND b`. We therefore only - * keep ranges of 5 or more numbers. - */ -const simplifyNumbersInRanges = ( - ids: number[], - minRangeSize = 5, -): { - ranges: number[][]; - lonelyNumbers: number[]; -} => { - const results: Array = []; - - // We need the list to be sorted - const sortedIds = uniq(ids).sort((a, b) => a - b); - - // Build the list like [[0,4],[7,9],165] - let firstRangeNumber = sortedIds[0]; - let consecutiveNumbers = 0; - for (let i = 1; i < sortedIds.length; i++) { - const id = sortedIds[i]; - if (id === firstRangeNumber + consecutiveNumbers + 1) { - consecutiveNumbers++; - } else { - if (consecutiveNumbers >= minRangeSize) { - results.push([firstRangeNumber, firstRangeNumber + consecutiveNumbers]); - } else { - for (let j = 0; j <= consecutiveNumbers; j++) { - results.push(firstRangeNumber + j); - } - } - firstRangeNumber = id; - consecutiveNumbers = 0; - } - } - - // Group by type - const [lonelyNumbers, ranges] = partition(results, x => typeof x === 'number'); - return { ranges, lonelyNumbers }; -}; - -export async function restoreUndeletedEntries(indexName: ElasticSearchIndexName, { log = false } = {}) { - const client = getElasticSearchClient({ throwIfUnavailable: true }); - const adapter = ElasticSearchModelsAdapters[indexName]; - - if (log) { - logger.info(`Fetching IDs of all undeleted entries in index ${indexName}...`); - } - - /* eslint-disable camelcase */ - let scrollSearch = await client.search({ - index: formatIndexNameForElasticSearch(indexName), - body: { _source: false }, - filter_path: ['hits.hits._id', '_scroll_id'], - size: 10_000, // Max value allowed by ES - scroll: '1m', // Keep the search context alive for 1 minute - }); - - let allIds = scrollSearch.hits.hits.map(hit => hit._id); - const scrollId = scrollSearch._scroll_id; - - // Continue scrolling through results - while (scrollSearch.hits.hits.length > 0) { - scrollSearch = await client.scroll({ scroll_id: scrollId, scroll: '1m' }); - allIds = allIds.concat(scrollSearch.hits.hits.map(hit => hit._id)); - logger.info(`Fetched ${allIds.length} IDs...`); - } - - // Clear the scroll when done - await client.clearScroll({ scroll_id: scrollId }); - /* eslint-enable camelcase */ - - // Search for entries that are not marked as deleted in the database - const { ranges, lonelyNumbers } = simplifyNumbersInRanges(allIds.map(Number)); - const undeletedEntries = (await adapter.getModel().findAll({ - attributes: ['id'], - raw: true, - where: { - [Op.and]: [ - { id: { [Op.notIn]: lonelyNumbers } }, - ...ranges.map(([start, end]) => ({ id: { [Op.notBetween]: [start, end] } })), - ], - }, - })) as unknown as Array<{ id: number }>; - - if (!undeletedEntries.length) { - if (log) { - logger.info('No undeleted entries found'); - } - return; - } else if (log) { - logger.info(`Restoring ${undeletedEntries.length} undeleted entries...`); - } - - // Restore undeleted entries - const undeletedIds = undeletedEntries.map(entry => entry.id); - const limit = 5_000; - let modelEntries = []; - let maxId = undefined; - let offset = 0; - - for (const ids of chunk(undeletedIds, limit)) { - modelEntries = await adapter.findEntriesToIndex({ offset, limit, ids }); - if (modelEntries.length === 0) { - return; - } else if (!maxId) { - maxId = modelEntries[0].id; - } - - // Send data to ElasticSearch - await client.bulk({ - index: formatIndexNameForElasticSearch(indexName), - body: modelEntries.flatMap(entry => [{ index: { _id: entry.id } }, adapter.mapModelInstanceToDocument(entry)]), - }); - - offset += limit; - if (log) { - logger.info(`... ${offset} entries synced`); - } - } -} - export async function syncElasticSearchIndex( indexName: ElasticSearchIndexName, options: { fromDate?: Date; log?: boolean } = {}, @@ -193,7 +71,6 @@ export async function syncElasticSearchIndex( // If there's a fromDate, it means we are doing a simple sync (not a full resync) and therefore need to look at deleted entries if (fromDate) { await removeDeletedEntries(indexName, fromDate); - await restoreUndeletedEntries(indexName); } // Sync new/edited entries