Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable cache #2568

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import {SchedulerRegistry} from '@nestjs/schedule';
import {Sequelize} from '@subql/x-sequelize';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {IStoreModelService} from '../indexer/storeCache/storeModel.service';
import {NodeConfig} from './NodeConfig';
import {IProjectUpgradeService, ProjectUpgradeService, upgradableSubqueryProject} from './ProjectUpgrade.service';

Expand Down Expand Up @@ -289,7 +290,7 @@ describe('Project Upgrades', () => {
describe('Upgradable subquery project', () => {
let upgradeService: ProjectUpgradeService<ISubqueryProject>;
let project: ISubqueryProject & IProjectUpgradeService<ISubqueryProject>;
let storeCache: StoreCacheService;
let storeCache: IStoreModelService;

beforeEach(async () => {
storeCache = new StoreCacheService({} as any, {} as any, {} as any, new SchedulerRegistry());
Expand Down
44 changes: 23 additions & 21 deletions packages/node-core/src/configure/ProjectUpgrade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ import {ParentProject} from '@subql/types-core';
import {Sequelize, Transaction} from '@subql/x-sequelize';
import {findLast, last, parseInt} from 'lodash';
import {SchemaMigrationService} from '../db';
import {CacheMetadataModel, ISubqueryProject, StoreCacheService, StoreService} from '../indexer';
import {
CacheMetadataModel,
IMetadata,
IStoreModelService,
ISubqueryProject,
StoreCacheService,
StoreService,
} from '../indexer';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {getStartHeight, mainThreadOnly} from '../utils';
Expand Down Expand Up @@ -107,27 +114,20 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
#currentHeight: number;
#currentProject: P;

#storeCache?: StoreCacheService;
#storeCache?: IStoreModelService;
#initialized = false;

private config?: NodeConfig;
private onProjectUpgrade?: OnProjectUpgradeCallback<P>;
private migrationService?: SchemaMigrationService;

private constructor(
private _projects: BlockHeightMap<P>,
currentHeight: number,
private _isRewindable = true
) {
private constructor(private _projects: BlockHeightMap<P>, currentHeight: number, private _isRewindable = true) {
logger.info(
`Projects: ${JSON.stringify(
[..._projects.getAll().entries()].reduce(
(acc, curr) => {
acc[curr[0]] = curr[1].id;
return acc;
},
{} as Record<number, string>
),
[..._projects.getAll().entries()].reduce((acc, curr) => {
acc[curr[0]] = curr[1].id;
return acc;
}, {} as Record<number, string>),
undefined,
2
)}`
Expand Down Expand Up @@ -172,7 +172,9 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
} catch (e: any) {
if (e instanceof EntryNotFoundError) {
throw new Error(
`Unable to find project for height ${this.#currentHeight}. If the project start height is increased it will not jump to that block. Please either reindex or specify blocks to bypass.`,
`Unable to find project for height ${
this.#currentHeight
}. If the project start height is increased it will not jump to that block. Please either reindex or specify blocks to bypass.`,
{cause: e}
);
}
Expand Down Expand Up @@ -207,7 +209,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
return this.#currentHeight;
}

private get metadata(): CacheMetadataModel {
private get metadata(): IMetadata {
assert(this.#storeCache?.metadata, 'Project Upgrades service has not been initialized, unable to update metadata');
return this.#storeCache.metadata;
}
Expand Down Expand Up @@ -243,7 +245,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
nextId = iterator.next();
}
// this remove any deployments in metadata beyond target height
await this.removeIndexedDeployments(targetBlockHeight);
await this.removeIndexedDeployments(targetBlockHeight, transaction);
}

private async migrate(
Expand All @@ -256,7 +258,7 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
assert(this.migrationService, 'MigrationService is undefined');
if (this.config.allowSchemaMigration) {
await this.migrationService.run(project.schema, newProject.schema, transaction);
this.metadata.setIncrement('schemaMigrationCount');
await this.metadata.setIncrement('schemaMigrationCount', undefined, transaction);
}
}
}
Expand Down Expand Up @@ -450,11 +452,11 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject

deployments[blockHeight] = id;

this.metadata.set('deployments', JSON.stringify(deployments));
await this.metadata.set('deployments', JSON.stringify(deployments));
}

// Remove metadata deployments beyond this blockHeight
async removeIndexedDeployments(blockHeight: number): Promise<void> {
async removeIndexedDeployments(blockHeight: number, tx?: Transaction): Promise<void> {
const deployments = await this.getDeploymentsMetadata();

// remove all future block heights
Expand All @@ -466,6 +468,6 @@ export class ProjectUpgradeService<P extends ISubqueryProject = ISubqueryProject
}
});

this.metadata.set('deployments', JSON.stringify(deployments));
await this.metadata.set('deployments', JSON.stringify(deployments), tx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export class SchemaMigrationService {
const sortedAddedModels = alignModelOrder<GraphQLModelsType[]>(sortedSchemaModels, addedModels);
const sortedModifiedModels = alignModelOrder<ModifiedModels>(sortedSchemaModels, modifiedModels);

await this.flushCache(true);
await this.storeService.storeCache._flushCache(true);
const migrationAction = await Migration.create(
this.sequelize,
this.storeService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import assert from 'assert';

import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {hexToU8a, u8aEq} from '@subql/utils';
import {Transaction} from '@subql/x-sequelize';
import {NodeConfig, IProjectUpgradeService} from '../../configure';
import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../events';
import {getLogger} from '../../logger';
import {exitWithError, monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {IQueue, mainThreadOnly} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {PoiBlock, PoiSyncService} from '../poi';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {CachePoiModel} from '../storeCache/cachePoi';
import {IStoreModelService} from '../storeCache';
import {IPoi} from '../storeCache/poi';
import {IBlock, IProjectService, ISubqueryProject} from '../types';

const logger = getLogger('BaseBlockDispatcherService');
Expand Down Expand Up @@ -63,7 +64,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
private projectUpgradeService: IProjectUpgradeService,
protected queue: Q,
protected storeService: StoreService,
private storeCacheService: StoreCacheService,
private storeModelService: IStoreModelService,
private poiSyncService: PoiSyncService,
protected monitorService?: MonitorServiceInterface
) {
Expand All @@ -75,7 +76,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
async init(onDynamicDsCreated: (height: number) => void): Promise<void> {
this._onDynamicDsCreated = onDynamicDsCreated;
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.setProcessedBlockCount((await this.storeCacheService.metadata.find('processedBlockCount', 0))!);
this.setProcessedBlockCount((await this.storeModelService.metadata.find('processedBlockCount', 0))!);
}

get queueSize(): number {
Expand Down Expand Up @@ -198,10 +199,10 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
throw e;
}
} else {
this.updateStoreMetadata(height);
await this.updateStoreMetadata(height, undefined, this.storeService.transaction);

const operationHash = this.storeService.getOperationMerkleRoot();
this.createPOI(height, blockHash, operationHash);
await this.createPOI(height, blockHash, operationHash, this.storeService.transaction);

if (dynamicDsCreated) {
this.onDynamicDsCreated(height);
Expand All @@ -215,21 +216,38 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
this.setLatestProcessedHeight(height);
}

if (this.nodeConfig.storeCacheAsync) {
// Flush all completed block data and don't wait
await this.storeCacheService.flushAndWaitForCapacity(false)?.catch((e) => {
exitWithError(new Error(`Flushing cache failed`, {cause: e}), logger);
});
} else {
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false);
}

if (!this.projectService.hasDataSourcesAfterHeight(height)) {
const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
await this.storeCacheService.flushCache(false);
exitWithError(msg, logger, 0);
}
await this.storeModelService.applyPendingChanges(height, !this.projectService.hasDataSourcesAfterHeight(height));

// if (this.storeModelService instanceof StoreCacheService) {
// if (this.nodeConfig.storeCacheAsync) {
// // Flush all completed block data and don't wait
// await this.storeModelService.flushAndWaitForCapacity(false)?.catch((e) => {
// exitWithError(new Error(`Flushing cache failed`, { cause: e }), logger);
// });
// } else {
// // Flush all data from cache and wait
// await this.storeModelService.flushCache(false);
// }

// if (!this.projectService.hasDataSourcesAfterHeight(height)) {
// const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
// await this.storeModelService.flushCache(false);
// exitWithError(msg, logger, 0);
// }
// } else if (this.storeModelService instanceof PlainStoreModelService) {
// const tx = this.storeService.transaction;
// if (!tx) {
// exitWithError(new Error('Transaction not found'), logger, 1);
// }
// await tx.commit();

// if (!this.projectService.hasDataSourcesAfterHeight(height)) {
// const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`;
// exitWithError(msg, logger, 0);
// }
// } else {
// exitWithError(new Error('Unknown store model service'), logger, 1);
// }
}

@OnEvent(AdminEvent.rewindTarget)
Expand Down Expand Up @@ -258,7 +276,12 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
* @param operationHash
* @private
*/
private createPOI(height: number, blockHash: string, operationHash: Uint8Array): void {
private async createPOI(
height: number,
blockHash: string,
operationHash: Uint8Array,
tx?: Transaction
): Promise<void> {
if (!this.nodeConfig.proofOfIndex) {
return;
}
Expand All @@ -268,30 +291,33 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB

const poiBlock = PoiBlock.create(height, blockHash, operationHash, this.project.id);
// This is the first creation of POI
this.poi.bulkUpsert([poiBlock]);
this.storeCacheService.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}]);
await this.poi.bulkUpsert([poiBlock], tx);
await this.storeModelService.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}], tx);
this.eventEmitter.emit(PoiEvent.PoiTarget, {
height,
timestamp: Date.now(),
});
}

@mainThreadOnly()
private updateStoreMetadata(height: number, updateProcessed = true): void {
const meta = this.storeCacheService.metadata;
private async updateStoreMetadata(height: number, updateProcessed = true, tx?: Transaction): Promise<void> {
const meta = this.storeModelService.metadata;
// Update store metadata
meta.setBulk([
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
]);
await meta.setBulk(
[
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
],
tx
);
// Db Metadata increase BlockCount, in memory ref to block-dispatcher _processedBlockCount
if (updateProcessed) {
meta.setIncrement('processedBlockCount');
await meta.setIncrement('processedBlockCount', undefined, tx);
}
}

private get poi(): CachePoiModel {
const poi = this.storeCacheService.poi;
private get poi(): IPoi {
const poi = this.storeModelService.poi;
if (!poi) {
throw new Error('Poi service expected poi repo but it was not found');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {monitorWrite} from '../../process';
import {AutoQueue, isTaskFlushedError} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IStoreModelService, StoreCacheService} from '../storeCache';
import {ISubqueryProject, IProjectService} from '../types';
import {isBlockUnavailableError} from '../worker/utils';
import {BaseBlockDispatcher} from './base-block-dispatcher';
Expand Down Expand Up @@ -58,7 +58,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
projectService: IProjectService<DS>,
projectUpgradeService: IProjectUpgradeService,
storeService: StoreService,
storeCacheService: StoreCacheService,
storeModelService: IStoreModelService,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
private createIndexerWorker: () => Promise<W>,
Expand All @@ -72,7 +72,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
projectUpgradeService,
initAutoQueue(nodeConfig.workers, nodeConfig.batchSize, nodeConfig.timeout, 'Worker'),
storeService,
storeCacheService,
storeModelService,
poiSyncService,
monitorService
);
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/dynamic-ds.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe('DynamicDsService', () => {
const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]);
await service.init(meta);

await service.resetDynamicDatasource(2);
await service.resetDynamicDatasource(2, null as any);

await expect(meta.find('dynamicDatasources')).resolves.toEqual([testParam1, testParam2]);
await expect(service.getDynamicDatasources()).resolves.toEqual([testParam1, testParam2]);
Expand All @@ -79,7 +79,7 @@ describe('DynamicDsService', () => {
const meta = mockMetadata([testParam1, testParam2]);
await service.init(meta);

meta.set('dynamicDatasources', [testParam1, testParam2, testParam3, testParam4]);
await meta.set('dynamicDatasources', [testParam1, testParam2, testParam3, testParam4]);

await expect(service.getDynamicDatasources()).resolves.toEqual([testParam1, testParam2]);
await expect(service.getDynamicDatasources(true)).resolves.toEqual([
Expand Down
Loading
Loading