From 65fad834eff5e93bfc14904ceec288950477a5df Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Wed, 24 Jul 2024 13:55:13 +1200 Subject: [PATCH] WIP implement blockchain service --- packages/node-core/src/blockchain.service.ts | 64 +++++ packages/node-core/src/index.ts | 1 + .../blockDispatcher/block-dispatcher.ts | 26 +- .../src/indexer/ds-processor.service.ts | 25 +- .../src/indexer/dynamic-ds.service.spec.ts | 10 +- .../src/indexer/dynamic-ds.service.ts | 25 +- .../src/indexer/fetch.service.spec.ts | 9 +- .../node-core/src/indexer/fetch.service.ts | 27 +- .../node-core/src/indexer/indexer.manager.ts | 23 +- .../src/indexer/project.service.spec.ts | 11 +- .../node-core/src/indexer/project.service.ts | 56 ++-- .../indexer/unfinalizedBlocks.service.spec.ts | 33 ++- .../src/indexer/unfinalizedBlocks.service.ts | 41 ++- .../src/indexer/worker/worker.core.module.ts | 21 +- packages/node-core/src/subcommands/index.ts | 1 + .../src/subcommands/testing.core.module.ts | 48 ++++ packages/node-core/src/utils/project.ts | 4 +- packages/node/src/blockchain.service.ts | 161 +++++++++++ .../configure/SchemaMigration.service.test.ts | 17 +- packages/node/src/indexer/api.service.spec.ts | 21 +- packages/node/src/indexer/api.service.test.ts | 30 +- packages/node/src/indexer/api.service.ts | 71 ++--- .../block-dispatcher.service.ts | 104 ------- .../node/src/indexer/blockDispatcher/index.ts | 4 +- .../worker-block-dispatcher.service.ts | 22 +- .../substrateDictionary.service.spec.ts | 14 +- .../dictionary/substrateDictionary.service.ts | 10 +- .../v1/substrateDictionaryV1.spec.ts | 14 +- .../dictionary/v1/substrateDictionaryV1.ts | 9 +- .../src/indexer/ds-processor.service.spec.ts | 19 +- .../node/src/indexer/ds-processor.service.ts | 22 -- .../node/src/indexer/dynamic-ds.service.ts | 54 ---- packages/node/src/indexer/fetch.module.ts | 86 +++--- .../node/src/indexer/fetch.service.spec.ts | 6 +- .../node/src/indexer/fetch.service.test.ts | 9 +- packages/node/src/indexer/fetch.service.ts | 63 +---- .../node/src/indexer/indexer.manager.spec.ts | 258 ------------------ packages/node/src/indexer/indexer.manager.ts | 135 +++++---- .../node/src/indexer/project.service.spec.ts | 28 +- packages/node/src/indexer/project.service.ts | 82 ------ .../indexer/runtime/base-runtime.service.ts | 9 +- .../src/indexer/runtime/runtimeService.ts | 16 +- .../node/src/indexer/store.service.test.ts | 18 +- .../src/indexer/unfinalizedBlocks.service.ts | 50 ---- .../src/indexer/worker/worker-fetch.module.ts | 42 +-- .../node/src/indexer/worker/worker.service.ts | 10 +- packages/node/src/indexer/worker/worker.ts | 2 +- packages/node/src/init.ts | 5 +- .../node/src/subcommands/reindex.module.ts | 9 +- .../node/src/subcommands/testing.module.ts | 48 ++-- .../node/src/subcommands/testing.service.ts | 27 +- 51 files changed, 807 insertions(+), 1093 deletions(-) create mode 100644 packages/node-core/src/blockchain.service.ts create mode 100644 packages/node-core/src/subcommands/testing.core.module.ts create mode 100644 packages/node/src/blockchain.service.ts delete mode 100644 packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts delete mode 100644 packages/node/src/indexer/ds-processor.service.ts delete mode 100644 packages/node/src/indexer/dynamic-ds.service.ts delete mode 100644 packages/node/src/indexer/indexer.manager.spec.ts delete mode 100644 packages/node/src/indexer/project.service.ts delete mode 100644 packages/node/src/indexer/unfinalizedBlocks.service.ts diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts new file mode 100644 index 0000000000..f899e7629e --- /dev/null +++ b/packages/node-core/src/blockchain.service.ts @@ -0,0 +1,64 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; +import {DatasourceParams, Header, IBlock, ISubqueryProject} from './indexer'; + +// TODO probably need to split this in 2 to have a worker specific subset + +export interface IBlockchainService< + DS extends BaseDataSource = BaseDataSource, + CDS extends DS & BaseCustomDataSource = BaseCustomDataSource & DS, + SubQueryProject extends ISubqueryProject = ISubqueryProject, + SafeAPI = any, + LightBlock = any, + FullBlock = any, +> { + /* The semver of the node */ + packageVersion: string; + + blockHandlerKind: string; + // TODO SubqueryProject methods + + // Block dispatcher service + fetchBlocks(blockNums: number[]): Promise[] | IBlock[]>; // TODO this probably needs to change to get light block type correct + + // Project service + onProjectChange(project: SubQueryProject): Promise | void; + /* Not all networks have a block timestamp, e.g. Shiden */ + getBlockTimestamp(height: number): Promise; + + // Fetch service + /** + * The finalized header. If the chain doesn't have concrete finalization this could be a probablilistic finalization + * */ + getFinalizedHeader(): Promise
; + /** + * Gets the latest height of the chain, this should be unfinalized. + * Or if the chain has instant finalization this would be the same as the finalized height. + * */ + getBestHeight(): Promise; + /** + * The chain interval in milliseconds, if it is not consistent then provide a best estimate + * */ + getChainInterval(): Promise; + + // Unfinalized blocks + getHeaderForHash(hash: string): Promise
; + getHeaderForHeight(height: number): Promise
; + + // Dynamic Ds sevice + /** + * Applies and validates parameters to a template DS + * */ + updateDynamicDs(params: DatasourceParams, template: DS | CDS): Promise; + + isCustomDs: (x: DS | CDS) => x is CDS; + isRuntimeDs: (x: DS | CDS) => x is DS; + + // Indexer manager + /** + * Gets an API instance to a specific height so any state queries return data as represented at that height. + * */ + getSafeApi(block: LightBlock | FullBlock): Promise; +} diff --git a/packages/node-core/src/index.ts b/packages/node-core/src/index.ts index 47fe1649ba..804fc03930 100644 --- a/packages/node-core/src/index.ts +++ b/packages/node-core/src/index.ts @@ -17,3 +17,4 @@ export * from './indexer'; export * from './subcommands'; export * from './yargs'; export * from './admin'; +export * from './blockchain.service'; diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 83071a864e..93386825c1 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -5,6 +5,8 @@ import {getHeapStatistics} from 'v8'; import {OnApplicationShutdown} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; import {Interval} from '@nestjs/schedule'; +import {BaseDataSource} from '@subql/types-core'; +import {IBlockchainService} from '../../blockchain.service'; import {NodeConfig} from '../../configure'; import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service'; import {IndexerEvent} from '../../events'; @@ -15,8 +17,8 @@ import {profilerWrap} from '../../profiler'; import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils'; import {StoreService} from '../store.service'; import {StoreCacheService} from '../storeCache'; -import {IProjectService, ISubqueryProject} from '../types'; -import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; +import {IIndexerManager, IProjectService, ISubqueryProject} from '../types'; +import {BaseBlockDispatcher} from './base-block-dispatcher'; const logger = getLogger('BlockDispatcherService'); @@ -25,7 +27,7 @@ type BatchBlockFetcher = (heights: number[]) => Promise[]>; /** * @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing */ -export abstract class BlockDispatcher +export class BlockDispatcher extends BaseBlockDispatcher | number>, DS, B> implements OnApplicationShutdown { @@ -37,8 +39,6 @@ export abstract class BlockDispatcher private fetching = false; private isShutdown = false; - protected abstract indexBlock(block: IBlock): Promise; - constructor( nodeConfig: NodeConfig, eventEmitter: EventEmitter2, @@ -48,7 +48,8 @@ export abstract class BlockDispatcher storeCacheService: StoreCacheService, poiSyncService: PoiSyncService, project: ISubqueryProject, - fetchBlocksBatches: BatchBlockFetcher + blockchainService: IBlockchainService, + private indexerManager: IIndexerManager ) { super( nodeConfig, @@ -64,9 +65,13 @@ export abstract class BlockDispatcher this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process'); this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch'); if (this.nodeConfig.profiler) { - this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches'); + this.fetchBlocksBatches = profilerWrap( + blockchainService.fetchBlocks.bind(blockchainService), + 'BlockDispatcher', + 'fetchBlocksBatches' + ); } else { - this.fetchBlocksBatches = fetchBlocksBatches; + this.fetchBlocksBatches = blockchainService.fetchBlocks.bind(blockchainService); } } @@ -164,7 +169,10 @@ export abstract class BlockDispatcher await this.preProcessBlock(blockHeight); monitorWrite(`Processing from main thread`); // Inject runtimeVersion here to enhance api.at preparation - const processBlockResponse = await this.indexBlock(block); + const processBlockResponse = await this.indexerManager.indexBlock( + block, + await this.projectService.getDataSources(block.getHeader().blockHeight) + ); await this.postProcessBlock(blockHeight, processBlockResponse); //set block to null for garbage collection (block as any) = null; diff --git a/packages/node-core/src/indexer/ds-processor.service.ts b/packages/node-core/src/indexer/ds-processor.service.ts index 99e7b29b29..fc322fc549 100644 --- a/packages/node-core/src/indexer/ds-processor.service.ts +++ b/packages/node-core/src/indexer/ds-processor.service.ts @@ -3,7 +3,7 @@ import fs from 'fs'; import path from 'path'; -import {Inject} from '@nestjs/common'; +import {Inject, Injectable} from '@nestjs/common'; import { HandlerInputMap, BaseCustomDataSource, @@ -14,6 +14,7 @@ import { IProjectNetworkConfig, } from '@subql/types-core'; import {VMScript} from 'vm2'; +import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; import {getLogger} from '../logger'; import {Sandbox, SandboxOption} from './sandbox'; @@ -27,7 +28,7 @@ function isSecondLayerHandlerProcessor_0_0_0< F extends Record, E, API, - DS extends BaseCustomDataSource = BaseCustomDataSource + DS extends BaseCustomDataSource = BaseCustomDataSource, >( processor: | SecondLayerHandlerProcessor_0_0_0 @@ -43,7 +44,7 @@ function isSecondLayerHandlerProcessor_1_0_0< F extends Record, E, API, - DS extends BaseCustomDataSource = BaseCustomDataSource + DS extends BaseCustomDataSource = BaseCustomDataSource, >( processor: | SecondLayerHandlerProcessor_0_0_0 @@ -58,7 +59,7 @@ export function asSecondLayerHandlerProcessor_1_0_0< F extends Record, E, API, - DS extends BaseCustomDataSource = BaseCustomDataSource + DS extends BaseCustomDataSource = BaseCustomDataSource, >( processor: | SecondLayerHandlerProcessor_0_0_0 @@ -99,7 +100,7 @@ class DsPluginSandbox

extends Sandbox { export function getDsProcessor< P, DS extends BaseDataSource = BaseDataSource, - CDS extends DS & BaseCustomDataSource = DS & BaseCustomDataSource + CDS extends DS & BaseCustomDataSource = DS & BaseCustomDataSource, >( ds: CDS, isCustomDs: (ds: any) => boolean, @@ -130,17 +131,17 @@ export function getDsProcessor< return processorCache[ds.processor.file] as unknown as P; } -export abstract class BaseDsProcessorService< +@Injectable() +export class DsProcessorService< DS extends BaseDataSource = BaseDataSource, CDS extends DS & BaseCustomDataSource = DS & BaseCustomDataSource, - P extends DsProcessor = DsProcessor + P extends DsProcessor = DsProcessor, > { private processorCache: Record = {}; - protected abstract isCustomDs(ds: DS): ds is CDS; - constructor( @Inject('ISubqueryProject') private readonly project: ISubqueryProject, + @Inject('IBlockchainService') private blockchainService: IBlockchainService, private readonly nodeConfig: NodeConfig ) {} @@ -166,11 +167,11 @@ export abstract class BaseDsProcessorService< } async validateProjectCustomDatasources(dataSources: DS[]): Promise { - await this.validateCustomDs(dataSources.filter((ds) => this.isCustomDs(ds)) as unknown as CDS[]); + await this.validateCustomDs(dataSources.filter((ds) => this.blockchainService.isCustomDs(ds)) as unknown as CDS[]); } getDsProcessor(ds: CDS): P { - if (!this.isCustomDs(ds)) { + if (!this.blockchainService.isCustomDs(ds)) { throw new Error(`data source is not a custom data source`); } if (!this.processorCache[ds.processor.file]) { @@ -194,7 +195,7 @@ export abstract class BaseDsProcessorService< // eslint-disable-next-line @typescript-eslint/require-await async getAssets(ds: CDS): Promise> { - if (!this.isCustomDs(ds)) { + if (!this.blockchainService.isCustomDs(ds)) { throw new Error(`data source is not a custom data source`); } diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 3d242cda57..0f4d0bc3d7 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -1,13 +1,17 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 +import {BaseDataSource} from '@subql/types-core'; +import {IBlockchainService} from '../blockchain.service'; import {DatasourceParams, DynamicDsService} from './dynamic-ds.service'; import {CacheMetadataModel} from './storeCache'; import {ISubqueryProject} from './types'; -class TestDynamicDsService extends DynamicDsService { - protected async getDatasource(params: DatasourceParams): Promise { - return Promise.resolve(params); +class TestDynamicDsService extends DynamicDsService { + constructor(project: ISubqueryProject) { + super(project, { + updateDynamicDs: () => Promise.resolve(undefined), // Return the same value + } as unknown as IBlockchainService); } getTemplate[number], 'name'> & {startBlock?: number}>( diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 8203dc96f9..371b6007a2 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -1,7 +1,10 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 +import {Inject, Injectable} from '@nestjs/common'; +import {BaseDataSource} from '@subql/types-core'; import {cloneDeep} from 'lodash'; +import {IBlockchainService} from '../blockchain.service'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; import {CacheMetadataModel} from './storeCache/cacheMetadata'; @@ -23,16 +26,18 @@ export interface IDynamicDsService { getDynamicDatasources(forceReload?: boolean): Promise; } -export abstract class DynamicDsService +@Injectable() +export class DynamicDsService implements IDynamicDsService { private _metadata?: CacheMetadataModel; private _datasources?: DS[]; private _datasourceParams?: DatasourceParams[]; - protected abstract getDatasource(params: DatasourceParams): Promise; - - constructor(protected readonly project: P) {} + constructor( + @Inject('ISubqueryProject') private readonly project: P, + @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService + ) {} async init(metadata: CacheMetadataModel): Promise { this._metadata = metadata; @@ -124,4 +129,16 @@ export abstract class DynamicDsService { + const dsObj = this.getTemplate(params.templateName, params.startBlock); + + try { + await this.blockchainService.updateDynamicDs(params, dsObj); + + return dsObj; + } catch (e) { + throw new Error(`Unable to create dynamic datasource.\n ${(e as any).message}`); + } + } } diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index c290757739..f24625755e 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -6,7 +6,7 @@ import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core'; import {range} from 'lodash'; import { - BaseUnfinalizedBlocksService, + UnfinalizedBlocksService, BlockDispatcher, delay, Header, @@ -14,6 +14,7 @@ import { IBlockDispatcher, IProjectService, NodeConfig, + IBlockchainService, } from '../'; import {BlockHeightMap} from '../utils/blockHeightMap'; import {DictionaryService} from './dictionary/dictionary.service'; @@ -166,7 +167,8 @@ describe('Fetch Service', () => { let dictionaryService: DictionaryService; let networkConfig: IProjectNetworkConfig; let dataSources: BaseDataSource[]; - let unfinalizedBlocksService: BaseUnfinalizedBlocksService; + let unfinalizedBlocksService: UnfinalizedBlocksService; + let blockchainService: IBlockchainService; let spyOnEnqueueSequential: jest.SpyInstance< void | Promise, @@ -216,7 +218,8 @@ describe('Fetch Service', () => { metadata: { set: jest.fn(), }, - } as any + } as any, + blockchainService ); spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 17d5f10539..4cdc5a2c1d 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -8,16 +8,17 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; import {range, without} from 'lodash'; +import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; import {IndexerEvent} from '../events'; import {getLogger} from '../logger'; -import {cleanedBatchBlocks, delay, transformBypassBlocks, waitForBatchSize} from '../utils'; +import {cleanedBatchBlocks, delay, getModulos, transformBypassBlocks, waitForBatchSize} from '../utils'; import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {getBlockHeight, mergeNumAndBlocks} from './dictionary/utils'; import {StoreCacheService} from './storeCache'; -import {Header, IBlock, IProjectService} from './types'; +import {IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; const logger = getLogger('FetchService'); @@ -30,15 +31,6 @@ export abstract class BaseFetchService; - protected abstract getBestHeight(): Promise; - - // The rough interval at which new blocks are produced - protected abstract getChainInterval(): Promise; - // This return modulo numbers with given dataSources (number in the filters) - protected abstract getModulos(dataSources: DS[]): number[]; - protected abstract initBlockDispatcher(): Promise; // Gets called just before the loop is started @@ -54,7 +46,8 @@ export abstract class BaseFetchService ) {} private get latestBestHeight(): number { @@ -67,6 +60,10 @@ export abstract class BaseFetchService { try { - const currentFinalizedHeader = await this.getFinalizedHeader(); + const currentFinalizedHeader = await this.blockchainSevice.getFinalizedHeader(); // Rpc could return finalized height below last finalized height due to unmatched nodes, and this could lead indexing stall // See how this could happen in https://gist.github.com/jiqiang90/ea640b07d298bca7cbeed4aee50776de if ( @@ -170,7 +167,7 @@ export abstract class BaseFetchService { try { - const currentBestHeight = await this.getBestHeight(); + const currentBestHeight = await this.blockchainSevice.getBestHeight(); if (this._latestBestHeight !== currentBestHeight) { this._latestBestHeight = currentBestHeight; this.eventEmitter.emit(IndexerEvent.BlockBest, { diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 92ee79566e..16717a56a0 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -2,18 +2,19 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import {BaseCustomDataSource, BaseDataSource} from '@subql/types-core'; +import {BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; import {IApi} from '../api.service'; +import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; import {getLogger} from '../logger'; import {exitWithError, monitorWrite} from '../process'; import {profilerWrap} from '../profiler'; import {handledStringify} from './../utils'; import {ProcessBlockResponse} from './blockDispatcher'; -import {asSecondLayerHandlerProcessor_1_0_0, BaseDsProcessorService} from './ds-processor.service'; +import {asSecondLayerHandlerProcessor_1_0_0, DsProcessorService} from './ds-processor.service'; import {DynamicDsService} from './dynamic-ds.service'; import {IndexerSandbox} from './sandbox'; -import {IBlock, IIndexerManager} from './types'; +import {IBlock, IIndexerManager, ISubqueryProject} from './types'; import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service'; const logger = getLogger('indexer'); @@ -46,10 +47,7 @@ export abstract class BaseIndexerManager< HandlerInputMap extends HandlerInputTypeMap, > implements IIndexerManager { - abstract indexBlock(block: IBlock, datasources: DS[], ...args: any[]): Promise; - - protected abstract isRuntimeDs(ds: DS): ds is DS; - protected abstract isCustomDs(ds: DS): ds is CDS; + abstract indexBlock(block: IBlock, datasources: DS[]): Promise; protected abstract indexBlockData( block: B, @@ -64,11 +62,12 @@ export abstract class BaseIndexerManager< protected readonly apiService: API, protected readonly nodeConfig: NodeConfig, protected sandboxService: {getDsProcessor: (ds: DS, api: SA, unsafeApi: A) => IndexerSandbox}, - private dsProcessorService: BaseDsProcessorService, + private dsProcessorService: DsProcessorService, private dynamicDsService: DynamicDsService, private unfinalizedBlocksService: IUnfinalizedBlocksService, private filterMap: FilterMap, - private processorMap: ProcessorMap + private processorMap: ProcessorMap, + protected blockchainService: IBlockchainService, SA, B, B> ) { logger.info('indexer manager start'); } @@ -146,7 +145,7 @@ export abstract class BaseIndexerManager< // perform filter for custom ds filteredDs = filteredDs.filter((ds) => { - if (this.isCustomDs(ds)) { + if (this.blockchainService.isCustomDs(ds)) { return this.dsProcessorService.getDsProcessor(ds).dsFilterProcessor(ds, this.apiService.unsafeApi); } else { return true; @@ -180,7 +179,7 @@ export abstract class BaseIndexerManager< let vm: IndexerSandbox; assert(this.filterMap[kind], `Unsupported handler kind: ${kind.toString()}`); - if (this.isRuntimeDs(ds)) { + if (this.blockchainService.isRuntimeDs(ds)) { const handlers = ds.mapping.handlers.filter( (h) => h.kind === kind && this.filterMap[kind](data as any, h.filter, ds) ); @@ -200,7 +199,7 @@ export abstract class BaseIndexerManager< )(handler.handler, [parsedData]) : await vm.securedExec(handler.handler, [parsedData]); } - } else if (this.isCustomDs(ds)) { + } else if (this.blockchainService.isCustomDs(ds)) { const handlers = this.filterCustomDsHandlers(ds, data, this.processorMap[kind], (data, baseFilter) => { if (!baseFilter.length) return true; diff --git a/packages/node-core/src/indexer/project.service.spec.ts b/packages/node-core/src/indexer/project.service.spec.ts index 4b224c2e75..b529b9e868 100644 --- a/packages/node-core/src/indexer/project.service.spec.ts +++ b/packages/node-core/src/indexer/project.service.spec.ts @@ -2,12 +2,12 @@ // SPDX-License-Identifier: GPL-3.0 import {NodeConfig} from '../configure'; -import {BaseDsProcessorService} from './ds-processor.service'; +import {DsProcessorService} from './ds-processor.service'; import {DynamicDsService} from './dynamic-ds.service'; -import {BaseProjectService} from './project.service'; +import {ProjectService} from './project.service'; import {ISubqueryProject} from './types'; -class TestProjectService extends BaseProjectService { +class TestProjectService extends ProjectService { packageVersion = '1.0.0'; async getBlockTimestamp(height: number): Promise { @@ -24,7 +24,7 @@ describe('BaseProjectService', () => { beforeEach(() => { service = new TestProjectService( - null as unknown as BaseDsProcessorService, + null as unknown as DsProcessorService, null as unknown as any, null as unknown as any, null as unknown as any, @@ -35,7 +35,8 @@ describe('BaseProjectService', () => { {unsafe: false} as unknown as NodeConfig, {getDynamicDatasources: jest.fn()} as unknown as DynamicDsService, null as unknown as any, - null as unknown as any + null as unknown as any, + null as any //Blockchain Service ); }); diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 36853fcdeb..e605ab5dc8 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -3,17 +3,19 @@ import assert from 'assert'; import {isMainThread} from 'worker_threads'; +import {Inject} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; import {Sequelize} from '@subql/x-sequelize'; import {IApi} from '../api.service'; +import {IBlockchainService} from '../blockchain.service'; import {IProjectUpgradeService, NodeConfig} from '../configure'; import {IndexerEvent} from '../events'; import {getLogger} from '../logger'; import {exitWithError, monitorWrite} from '../process'; import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils'; import {BlockHeightMap} from '../utils/blockHeightMap'; -import {BaseDsProcessorService} from './ds-processor.service'; +import {DsProcessorService} from './ds-processor.service'; import {DynamicDsService} from './dynamic-ds.service'; import {MetadataKeys} from './entities'; import {PoiSyncService} from './poi'; @@ -30,9 +32,9 @@ class NotInitError extends Error { } } -export abstract class BaseProjectService< - API extends IApi, - DS extends BaseDataSource, +export class ProjectService< + DS extends BaseDataSource = BaseDataSource, + API extends IApi = IApi, UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService, > implements IProjectService { @@ -40,33 +42,29 @@ export abstract class BaseProjectService< private _startHeight?: number; private _blockOffset?: number; - protected abstract packageVersion: string; - protected abstract getBlockTimestamp(height: number): Promise; - protected abstract onProjectChange(project: ISubqueryProject): void | Promise; - constructor( - private readonly dsProcessorService: BaseDsProcessorService, - protected readonly apiService: API, - private readonly poiService: PoiService, - private readonly poiSyncService: PoiSyncService, - protected readonly sequelize: Sequelize, - protected readonly project: ISubqueryProject, - protected readonly projectUpgradeService: IProjectUpgradeService, - protected readonly storeService: StoreService, - protected readonly nodeConfig: NodeConfig, - protected readonly dynamicDsService: DynamicDsService, + private readonly dsProcessorService: DsProcessorService, + @Inject('APIService') protected readonly apiService: API, + @Inject(isMainThread ? PoiService : 'Null') private readonly poiService: PoiService, + @Inject(isMainThread ? PoiSyncService : 'Null') private readonly poiSyncService: PoiSyncService, + @Inject(isMainThread ? Sequelize : 'Null') private readonly sequelize: Sequelize, + @Inject('ISubqueryProject') private readonly project: ISubqueryProject, + @Inject('IProjectUpgradeService') private readonly projectUpgradeService: IProjectUpgradeService, + @Inject(isMainThread ? StoreService : 'Null') private readonly storeService: StoreService, + private readonly nodeConfig: NodeConfig, + private readonly dynamicDsService: DynamicDsService, private eventEmitter: EventEmitter2, - protected readonly unfinalizedBlockService: UnfinalizedBlocksService + @Inject('IUnfinalizedBlocksService') private readonly unfinalizedBlockService: UnfinalizedBlocksService, + @Inject('IBlockchainService') private blockchainService: IBlockchainService ) { + if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) { + throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time'); + } if (this.nodeConfig.unsafe) { logger.warn( 'UNSAFE MODE IS ENABLED. This is not recommended for most projects and will not be supported by our hosted service' ); } - - if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) { - throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time'); - } } protected get schema(): string { @@ -95,7 +93,7 @@ export abstract class BaseProjectService< this.ensureTimezone(); for await (const [, project] of this.projectUpgradeService.projects) { - await project.applyCronTimestamps(this.getBlockTimestamp.bind(this)); + await project.applyCronTimestamps(this.blockchainService.getBlockTimestamp.bind(this)); } // Do extra work on main thread to setup stuff @@ -152,7 +150,7 @@ export abstract class BaseProjectService< this.projectUpgradeService.initWorker(startHeight, this.handleProjectChange.bind(this)); // Called to allow handling the first project - await this.onProjectChange(this.project); + await this.blockchainService.onProjectChange(this.project); } // Used to load assets into DS-processor, has to be done in any thread @@ -249,8 +247,8 @@ export abstract class BaseProjectService< metadata.set('processedBlockCount', 0); } - if (existing.indexerNodeVersion !== this.packageVersion) { - metadata.set('indexerNodeVersion', this.packageVersion); + if (existing.indexerNodeVersion !== this.blockchainService.packageVersion) { + metadata.set('indexerNodeVersion', this.blockchainService.packageVersion); } if (!existing.schemaMigrationCount) { metadata.set('schemaMigrationCount', 0); @@ -403,7 +401,7 @@ export abstract class BaseProjectService< ); // Called to allow handling the first project - await this.onProjectChange(this.project); + await this.blockchainService.onProjectChange(this.project); if (isMainThread) { const lastProcessedHeight = await this.getLastProcessedHeight(); @@ -443,7 +441,7 @@ export abstract class BaseProjectService< // Reload the dynamic ds with new project await this.dynamicDsService.getDynamicDatasources(true); - await this.onProjectChange(this.project); + await this.blockchainService.onProjectChange(this.project); } async reindex(targetBlockHeight: number): Promise { diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts index 68907abd00..97603639b3 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts @@ -4,44 +4,43 @@ // import { Header } from '@polkadot/types/interfaces'; import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; +import {IBlockchainService} from '../blockchain.service'; import {Header, IBlock} from '../indexer'; import {StoreCacheService, CacheMetadataModel} from './storeCache'; import { METADATA_LAST_FINALIZED_PROCESSED_KEY, METADATA_UNFINALIZED_BLOCKS_KEY, - BaseUnfinalizedBlocksService, + UnfinalizedBlocksService, } from './unfinalizedBlocks.service'; /* Notes: * Block hashes all have the format '0xabc' + block number * If they are forked they will have an `f` at the end */ -class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService> { - protected async getFinalizedHead(): Promise

{ +const BlockchainService = { + async getFinalizedHeader(): Promise
{ return Promise.resolve({ blockHeight: 91, blockHash: `0xabc91f`, parentHash: `0xabc90f`, }); - } - - protected async getHeaderForHash(hash: string): Promise
{ + }, + async getHeaderForHash(hash: string): Promise
{ const num = Number(hash.toString().replace('0xabc', '').replace('f', '')); return Promise.resolve({ blockHeight: num, blockHash: hash, parentHash: `0xabc${num - 1}f`, }); - } - - protected async getHeaderForHeight(height: number): Promise
{ + }, + async getHeaderForHeight(height: number): Promise
{ return Promise.resolve({ blockHeight: height, blockHash: `0xabc${height}f`, parentHash: `0xabc${height - 1}f`, }); - } -} + }, +} as IBlockchainService; function getMockMetadata(): any { const data: Record = {}; @@ -78,7 +77,11 @@ describe('UnfinalizedBlocksService', () => { let unfinalizedBlocksService: UnfinalizedBlocksService; beforeEach(async () => { - unfinalizedBlocksService = new UnfinalizedBlocksService({unfinalizedBlocks: true} as any, mockStoreCache()); + unfinalizedBlocksService = new UnfinalizedBlocksService( + {unfinalizedBlocks: true} as any, + mockStoreCache(), + BlockchainService + ); await unfinalizedBlocksService.init(() => Promise.resolve()); }); @@ -260,7 +263,11 @@ describe('UnfinalizedBlocksService', () => { ]) ); storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, 90); - const unfinalizedBlocksService2 = new UnfinalizedBlocksService({unfinalizedBlocks: false} as any, storeCache); + const unfinalizedBlocksService2 = new UnfinalizedBlocksService( + {unfinalizedBlocks: false} as any, + storeCache, + BlockchainService + ); const reindex = jest.fn().mockReturnValue(Promise.resolve()); diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index e6dca92ccf..0a35c40352 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -2,7 +2,9 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; +import {Inject, Injectable} from '@nestjs/common'; import {isEqual, last} from 'lodash'; +import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; import {Header, IBlock} from '../indexer/types'; import {getLogger} from '../logger'; @@ -36,34 +38,22 @@ export interface IUnfinalizedBlocksServiceUtil { registerFinalizedBlock(header: Header): void; } -export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlocksService { +@Injectable() +export class UnfinalizedBlocksService implements IUnfinalizedBlocksService { private _unfinalizedBlocks?: UnfinalizedBlocks; private _finalizedHeader?: Header; protected lastCheckedBlockHeight?: number; - // protected abstract blockToHeader(block: B): Header; - protected abstract getFinalizedHead(): Promise
; - protected abstract getHeaderForHash(hash: string): Promise
; - protected abstract getHeaderForHeight(height: number): Promise
; - @mainThreadOnly() - protected blockToHeader(block: IBlock): Header { + private blockToHeader(block: IBlock): Header { return block.getHeader(); } - private set unfinalizedBlocks(unfinalizedBlocks: UnfinalizedBlocks) { - this._unfinalizedBlocks = unfinalizedBlocks; - } - protected get unfinalizedBlocks(): UnfinalizedBlocks { assert(this._unfinalizedBlocks !== undefined, new Error('Unfinalized blocks service has not been initialized')); return this._unfinalizedBlocks; } - private set finalizedHeader(finalizedHeader: Header) { - this._finalizedHeader = finalizedHeader; - } - protected get finalizedHeader(): Header { assert(this._finalizedHeader !== undefined, new Error('Unfinalized blocks service has not been initialized')); return this._finalizedHeader; @@ -71,15 +61,16 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo constructor( protected readonly nodeConfig: NodeConfig, - protected readonly storeCache: StoreCacheService + protected readonly storeCache: StoreCacheService, + @Inject('IBlockchainService') private blockchainService: IBlockchainService ) {} async init(reindex: (targetHeight: number) => Promise): Promise { logger.info(`Unfinalized blocks is ${this.nodeConfig.unfinalizedBlocks ? 'enabled' : 'disabled'}`); - this.unfinalizedBlocks = await this.getMetadataUnfinalizedBlocks(); + this._unfinalizedBlocks = await this.getMetadataUnfinalizedBlocks(); this.lastCheckedBlockHeight = await this.getLastFinalizedVerifiedHeight(); - this.finalizedHeader = await this.getFinalizedHead(); + this._finalizedHeader = await this.blockchainService.getFinalizedHeader(); if (this.unfinalizedBlocks.length) { logger.info('Processing unfinalized blocks'); @@ -130,7 +121,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo if (this.finalizedHeader && this.finalizedBlockNumber >= header.blockHeight) { return; } - this.finalizedHeader = header; + this._finalizedHeader = header; } private registerUnfinalizedBlock(header: Header): void { @@ -160,7 +151,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo // remove any records less and equal than input finalized blockHeight private removeFinalized(blockHeight: number): void { - this.unfinalizedBlocks = this.unfinalizedBlocks.filter(({blockHeight: height}) => height > blockHeight); + this._unfinalizedBlocks = this.unfinalizedBlocks.filter(({blockHeight: height}) => height > blockHeight); } // find closest record from block heights @@ -197,14 +188,14 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo * If we're off by a large number of blocks we can optimise by getting the block hash directly */ if (header.blockHeight - lastVerifiableBlock.blockHeight > UNFINALIZED_THRESHOLD) { - header = await this.getHeaderForHeight(lastVerifiableBlock.blockHeight); + header = await this.blockchainService.getHeaderForHeight(lastVerifiableBlock.blockHeight); } else { while (lastVerifiableBlock.blockHeight !== header.blockHeight) { assert( header.parentHash, 'When iterate back parent hashes to find matching height, we expect parentHash to be exist' ); - header = await this.getHeaderForHash(header.parentHash); + header = await this.blockchainService.getHeaderForHash(header.parentHash); } } @@ -234,7 +225,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo // Get the new parent assert(checkingHeader.parentHash, 'Expect checking header parentHash to be exist'); - checkingHeader = await this.getHeaderForHash(checkingHeader.parentHash); + checkingHeader = await this.blockchainService.getHeaderForHash(checkingHeader.parentHash); } return this.lastCheckedBlockHeight; @@ -259,7 +250,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo // Work backwards to find a block on chain that matches POI for (const indexedBlock of indexedBlocks) { - const chainHeader = await this.getHeaderForHeight(indexedBlock.id); + const chainHeader = await this.blockchainService.getHeaderForHeight(indexedBlock.id); // Need to convert to PoiBlock to encode block hash to Uint8Array properly const testPoiBlock = PoiBlock.create( @@ -292,7 +283,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo resetUnfinalizedBlocks(): void { this.storeCache.metadata.set(METADATA_UNFINALIZED_BLOCKS_KEY, '[]'); - this.unfinalizedBlocks = []; + this._unfinalizedBlocks = []; } resetLastFinalizedVerifiedHeight(): void { diff --git a/packages/node-core/src/indexer/worker/worker.core.module.ts b/packages/node-core/src/indexer/worker/worker.core.module.ts index 0aada5be69..b6d6790a12 100644 --- a/packages/node-core/src/indexer/worker/worker.core.module.ts +++ b/packages/node-core/src/indexer/worker/worker.core.module.ts @@ -4,12 +4,16 @@ import {Module} from '@nestjs/common'; import {ConnectionPoolService} from '../connectionPool.service'; import {ConnectionPoolStateManager} from '../connectionPoolState.manager'; +import {DynamicDsService} from '../dynamic-ds.service'; import {InMemoryCacheService} from '../inMemoryCache.service'; import {MonitorService} from '../monitor.service'; import {SandboxService} from '../sandbox.service'; +import {UnfinalizedBlocksService} from '../unfinalizedBlocks.service'; import {WorkerInMemoryCacheService} from './worker.cache.service'; import {WorkerConnectionPoolStateManager} from './worker.connectionPoolState.manager'; +import {WorkerDynamicDsService} from './worker.dynamic-ds.service'; import {WorkerMonitorService} from './worker.monitor.service'; +import {WorkerUnfinalizedBlocksService} from './worker.unfinalizedBlocks.service'; @Module({ providers: [ @@ -27,7 +31,22 @@ import {WorkerMonitorService} from './worker.monitor.service'; provide: InMemoryCacheService, useFactory: () => new WorkerInMemoryCacheService((global as any).host), }, + { + provide: 'IUnfinalizedBlocksService', + useFactory: () => new WorkerUnfinalizedBlocksService((global as any).host), + }, + { + provide: DynamicDsService, + useFactory: () => new WorkerDynamicDsService((global as any).host), + }, + ], + exports: [ + ConnectionPoolService, + SandboxService, + MonitorService, + InMemoryCacheService, + 'IUnfinalizedBlocksService', + DynamicDsService, ], - exports: [ConnectionPoolService, SandboxService, MonitorService, InMemoryCacheService], }) export class WorkerCoreModule {} diff --git a/packages/node-core/src/subcommands/index.ts b/packages/node-core/src/subcommands/index.ts index 9badc6fa70..62e8687bc4 100644 --- a/packages/node-core/src/subcommands/index.ts +++ b/packages/node-core/src/subcommands/index.ts @@ -6,3 +6,4 @@ export * from './forceClean.module'; export * from './foreceClean.init'; export * from './reindex.init'; export * from './reindex.service'; +export * from './testing.core.module'; diff --git a/packages/node-core/src/subcommands/testing.core.module.ts b/packages/node-core/src/subcommands/testing.core.module.ts new file mode 100644 index 0000000000..673d595b1f --- /dev/null +++ b/packages/node-core/src/subcommands/testing.core.module.ts @@ -0,0 +1,48 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {Module} from '@nestjs/common'; + +import {EventEmitter2} from '@nestjs/event-emitter'; +import {SchedulerRegistry} from '@nestjs/schedule'; +import { + ConnectionPoolService, + ConnectionPoolStateManager, + InMemoryCacheService, + PoiService, + PoiSyncService, + StoreCacheService, + StoreService, + SandboxService, + DsProcessorService, + UnfinalizedBlocksService, + DynamicDsService, +} from '@subql/node-core'; + +@Module({ + providers: [ + SchedulerRegistry, + InMemoryCacheService, + StoreService, + StoreCacheService, + EventEmitter2, + PoiService, + PoiSyncService, + SandboxService, + DsProcessorService, + DynamicDsService, + UnfinalizedBlocksService, + ConnectionPoolStateManager, + ConnectionPoolService, + ], + exports: [ + DsProcessorService, + PoiService, + PoiSyncService, + StoreService, + DynamicDsService, + UnfinalizedBlocksService, + SandboxService, + ], +}) +export class TestingCoreModule {} diff --git a/packages/node-core/src/utils/project.ts b/packages/node-core/src/utils/project.ts index a3c64a9ff0..54dd88d066 100644 --- a/packages/node-core/src/utils/project.ts +++ b/packages/node-core/src/utils/project.ts @@ -11,14 +11,12 @@ import { BaseDataSource, BaseTemplateDataSource, Reader, - TemplateBase, } from '@subql/types-core'; -import {getAllEntitiesRelations} from '@subql/utils'; import {QueryTypes, Sequelize} from '@subql/x-sequelize'; import {stringToArray, getSchedule} from 'cron-converter'; import tar from 'tar'; import {NodeConfig} from '../configure/NodeConfig'; -import {ISubqueryProject, StoreService} from '../indexer'; +import {StoreService} from '../indexer'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts new file mode 100644 index 0000000000..8d1788e29d --- /dev/null +++ b/packages/node/src/blockchain.service.ts @@ -0,0 +1,161 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import { isMainThread } from 'worker_threads'; +import { Inject, Injectable } from '@nestjs/common'; +import { isCustomDs, isRuntimeDs } from '@subql/common-substrate'; +import { + DatasourceParams, + Header, + IBlock, + IBlockchainService, + mainThreadOnly, +} from '@subql/node-core'; +import { + SubstrateCustomDatasource, + SubstrateCustomHandler, + SubstrateDatasource, + SubstrateHandlerKind, + SubstrateMapping, +} from '@subql/types'; +import { + SubqueryProject, + SubstrateProjectDs, +} from './configure/SubqueryProject'; +import { ApiService } from './indexer/api.service'; +import { RuntimeService } from './indexer/runtime/runtimeService'; +import { + ApiAt, + BlockContent, + isFullBlock, + LightBlockContent, +} from './indexer/types'; +import { + calcInterval, + getBlockByHeight, + getTimestamp, + substrateHeaderToHeader, +} from './utils/substrate'; + +const BLOCK_TIME_VARIANCE = 5000; //ms +const INTERVAL_PERCENT = 0.9; + +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { version: packageVersion } = require('../package.json'); + +@Injectable() +export class BlockchainService + implements + IBlockchainService< + SubstrateDatasource, + SubstrateCustomDatasource< + string, + SubstrateMapping + >, + SubqueryProject, + ApiAt, + LightBlockContent, + BlockContent + > +{ + constructor( + @Inject('APIService') private apiService: ApiService, + @Inject(isMainThread ? RuntimeService : 'Null') + private runtimeService: RuntimeService, + ) {} + + isCustomDs = isCustomDs; + isRuntimeDs = isRuntimeDs; + blockHandlerKind = SubstrateHandlerKind.Block; + packageVersion = packageVersion; + + async fetchBlocks( + blockNums: number[], + ): Promise[] | IBlock[]> { + const specChanged = await this.runtimeService.specChanged( + blockNums[blockNums.length - 1], + ); + + // If specVersion not changed, a known overallSpecVer will be pass in + // Otherwise use api to fetch runtimes + return this.apiService.fetchBlocks( + blockNums, + specChanged ? undefined : this.runtimeService.parentSpecVersion, + ); + } + + async onProjectChange(project: SubqueryProject): Promise { + // Only network with chainTypes require to reload + await this.apiService.updateChainTypes(); + this.apiService.updateBlockFetching(); + } + + async getBlockTimestamp(height: number): Promise { + const block = await getBlockByHeight(this.apiService.api, height); + return getTimestamp(block); + } + + async getFinalizedHeader(): Promise
{ + const finalizedHash = + await this.apiService.unsafeApi.rpc.chain.getFinalizedHead(); + const finalizedHeader = + await this.apiService.unsafeApi.rpc.chain.getHeader(finalizedHash); + return substrateHeaderToHeader(finalizedHeader); + } + + async getBestHeight(): Promise { + const bestHeader = await this.apiService.unsafeApi.rpc.chain.getHeader(); + return bestHeader.number.toNumber(); + } + // eslint-disable-next-line @typescript-eslint/require-await + async getChainInterval(): Promise { + const chainInterval = calcInterval(this.apiService.unsafeApi) + .muln(INTERVAL_PERCENT) + .toNumber(); + + return Math.min(BLOCK_TIME_VARIANCE, chainInterval); + } + + // TODO can this decorator be in unfinalizedBlocks Service? + @mainThreadOnly() + async getHeaderForHash(hash: string): Promise
{ + return substrateHeaderToHeader( + await this.apiService.unsafeApi.rpc.chain.getHeader(hash), + ); + } + + // TODO can this decorator be in unfinalizedBlocks Service? + @mainThreadOnly() + async getHeaderForHeight(height: number): Promise
{ + const hash = await this.apiService.unsafeApi.rpc.chain.getBlockHash(height); + return this.getHeaderForHash(hash.toHex()); + } + + // eslint-disable-next-line @typescript-eslint/require-await + async updateDynamicDs( + params: DatasourceParams, + dsObj: SubstrateProjectDs, + ): Promise { + if (isCustomDs(dsObj)) { + dsObj.processor.options = { + ...dsObj.processor.options, + ...params.args, + }; + // TODO needs dsProcessorService + // await this.dsProcessorService.validateCustomDs([dsObj]); + } else if (isRuntimeDs(dsObj)) { + // XXX add any modifications to the ds here + } + } + + async getSafeApi(block: LightBlockContent | BlockContent): Promise { + const runtimeVersion = !isFullBlock(block) + ? undefined + : await this.runtimeService.getRuntimeVersion(block.block); + + return this.apiService.getPatchedApi( + block.block.block.header, + runtimeVersion, + ); + } +} diff --git a/packages/node/src/configure/SchemaMigration.service.test.ts b/packages/node/src/configure/SchemaMigration.service.test.ts index e8b486e0bd..4fc01ff016 100644 --- a/packages/node/src/configure/SchemaMigration.service.test.ts +++ b/packages/node/src/configure/SchemaMigration.service.test.ts @@ -3,11 +3,9 @@ import { promisify } from 'util'; import { INestApplication } from '@nestjs/common'; -import { DbOption, StoreCacheService } from '@subql/node-core'; +import { DbOption, StoreCacheService, ProjectService } from '@subql/node-core'; import { QueryTypes, Sequelize } from '@subql/x-sequelize'; import rimraf from 'rimraf'; -import { ApiService } from '../indexer/api.service'; -import { ProjectService } from '../indexer/project.service'; import { prepareApp } from '../utils/test.utils'; const option: DbOption = { @@ -52,11 +50,7 @@ describe('SchemaMigration integration tests', () => { schemaName = 'test-migrations-6'; app = await prepareApp(schemaName, cid); - projectService = app.get('IProjectService'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); const dbResults = await sequelize.query( @@ -79,9 +73,6 @@ describe('SchemaMigration integration tests', () => { app = await prepareApp(schemaName, cid); projectService = app.get('IProjectService'); - - const apiService = app.get(ApiService); - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; @@ -100,9 +91,6 @@ describe('SchemaMigration integration tests', () => { const projectUpgradeService = app.get('IProjectUpgradeService'); const storeCache = app.get(StoreCacheService); const cacheSpy = jest.spyOn(storeCache, 'updateModels'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; @@ -138,9 +126,6 @@ describe('SchemaMigration integration tests', () => { projectService = app.get('IProjectService'); const projectUpgradeService = app.get('IProjectUpgradeService'); const storeCache = app.get(StoreCacheService); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; diff --git a/packages/node/src/indexer/api.service.spec.ts b/packages/node/src/indexer/api.service.spec.ts index 2f80b7c6c7..2937ac36f0 100644 --- a/packages/node/src/indexer/api.service.spec.ts +++ b/packages/node/src/indexer/api.service.spec.ts @@ -98,7 +98,7 @@ describe('ApiService', () => { it('read custom types from project manifest', async () => { const createSpy = jest.spyOn(ApiPromise, 'create'); - apiService = new ApiService( + apiService = await ApiService.create( project, new ConnectionPoolService( nodeConfig, @@ -107,7 +107,6 @@ describe('ApiService', () => { new EventEmitter2(), nodeConfig, ); - await apiService.init(); const { version } = require('../../package.json'); expect(WsProvider).toHaveBeenCalledWith( Object.keys(testNetwork.endpoint)[0], @@ -133,16 +132,16 @@ describe('ApiService', () => { subquery: 'example', }); - apiService = new ApiService( - project, - new ConnectionPoolService( + await expect( + ApiService.create( + project, + new ConnectionPoolService( + nodeConfig, + new ConnectionPoolStateManager(), + ), + new EventEmitter2(), nodeConfig, - new ConnectionPoolStateManager(), ), - new EventEmitter2(), - nodeConfig, - ); - - await expect(apiService.init()).rejects.toThrow(); + ).rejects.toThrow(); }); }); diff --git a/packages/node/src/indexer/api.service.test.ts b/packages/node/src/indexer/api.service.test.ts index 405756958c..4bee48a505 100644 --- a/packages/node/src/indexer/api.service.test.ts +++ b/packages/node/src/indexer/api.service.test.ts @@ -72,16 +72,23 @@ describe('ApiService', () => { useFactory: () => ({}), }, EventEmitter2, - ApiService, + { + provide: ApiService, + useFactory: ApiService.create, + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], + }, ], imports: [EventEmitterModule.forRoot()], }).compile(); app = module.createNestApplication(); await app.init(); - const apiService = app.get(ApiService); - await apiService.init(); - return apiService; + return app.get(ApiService); }; it('can instantiate api', async () => { @@ -481,16 +488,23 @@ describe('Load chain type hasher', () => { useFactory: () => ({}), }, EventEmitter2, - ApiService, + { + provide: ApiService, + useFactory: ApiService.create, + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], + }, ], imports: [EventEmitterModule.forRoot()], }).compile(); app = module.createNestApplication(); await app.init(); - const apiService = app.get(ApiService); - await apiService.init(); - return apiService; + return app.get(ApiService); }; it('should use new hasher function, types hasher string should be replaced with function', async () => { diff --git a/packages/node/src/indexer/api.service.ts b/packages/node/src/indexer/api.service.ts index a0d29ba653..9a28acb709 100644 --- a/packages/node/src/indexer/api.service.ts +++ b/packages/node/src/indexer/api.service.ts @@ -122,7 +122,7 @@ export class ApiService private nodeConfig: SubstrateNodeConfig; - constructor( + private constructor( @Inject('ISubqueryProject') private project: SubqueryProject, connectionPoolService: ConnectionPoolService, eventEmitter: EventEmitter2, @@ -134,34 +134,7 @@ export class ApiService this.updateBlockFetching(); } - private get fetchBlocksFunction(): FetchFunc { - assert(this._fetchBlocksFunction, 'fetchBlocksFunction not initialized'); - return this._fetchBlocksFunction; - } - - private get currentBlockHash(): string { - assert(this._currentBlockHash, 'currentBlockHash not initialized'); - return this._currentBlockHash; - } - - private set currentBlockHash(value: string) { - this._currentBlockHash = value; - } - - private get currentBlockNumber(): number { - assert(this._currentBlockNumber, 'currentBlockNumber not initialized'); - return this._currentBlockNumber; - } - - private set currentBlockNumber(value: number) { - this._currentBlockNumber = value; - } - - async onApplicationShutdown(): Promise { - await this.connectionPoolService.onApplicationShutdown(); - } - - async init(): Promise { + private async init(): Promise { overrideConsoleWarn(); let chainTypes: RegisteredTypes | undefined; let network: SubstrateNetworkConfig; @@ -221,6 +194,42 @@ export class ApiService await this.connectionPoolService.updateChainTypes(chainTypes); } + static async create( + @Inject('ISubqueryProject') project: SubqueryProject, + connectionPoolService: ConnectionPoolService, + eventEmitter: EventEmitter2, + nodeConfig: NodeConfig, + ): Promise { + const apiService = new ApiService( + project, + connectionPoolService, + eventEmitter, + nodeConfig, + ); + + await apiService.init(); + return apiService; + } + + private get fetchBlocksFunction(): FetchFunc { + assert(this._fetchBlocksFunction, 'fetchBlocksFunction not initialized'); + return this._fetchBlocksFunction; + } + + private get currentBlockHash(): string { + assert(this._currentBlockHash, 'currentBlockHash not initialized'); + return this._currentBlockHash; + } + + private get currentBlockNumber(): number { + assert(this._currentBlockNumber, 'currentBlockNumber not initialized'); + return this._currentBlockNumber; + } + + async onApplicationShutdown(): Promise { + await this.connectionPoolService.onApplicationShutdown(); + } + updateBlockFetching(): void { const onlyEventHandlers = isOnlyEventHandlers(this.project); const skipTransactions = @@ -269,8 +278,8 @@ export class ApiService header: Header, runtimeVersion?: RuntimeVersion, ): Promise { - this.currentBlockHash = header.hash.toString(); - this.currentBlockNumber = header.number.toNumber(); + this._currentBlockHash = header.hash.toString(); + this._currentBlockNumber = header.number.toNumber(); const api = this.api; const apiAt = (await api.at( diff --git a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts deleted file mode 100644 index d5e74ad5dd..0000000000 --- a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import assert from 'assert'; -import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { - NodeConfig, - StoreCacheService, - StoreService, - IProjectService, - BlockDispatcher, - ProcessBlockResponse, - IProjectUpgradeService, - PoiSyncService, - IBlock, -} from '@subql/node-core'; -import { SubstrateDatasource } from '@subql/types'; -import { SubqueryProject } from '../../configure/SubqueryProject'; -import { ApiService } from '../api.service'; -import { IndexerManager } from '../indexer.manager'; -import { RuntimeService } from '../runtime/runtimeService'; -import { BlockContent, isFullBlock, LightBlockContent } from '../types'; - -/** - * @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing - */ -@Injectable() -export class BlockDispatcherService - extends BlockDispatcher - implements OnApplicationShutdown -{ - private _runtimeService?: RuntimeService; - - constructor( - private apiService: ApiService, - nodeConfig: NodeConfig, - private indexerManager: IndexerManager, - eventEmitter: EventEmitter2, - @Inject('IProjectService') - projectService: IProjectService, - @Inject('IProjectUpgradeService') - projectUpgradeService: IProjectUpgradeService, - storeService: StoreService, - storeCacheService: StoreCacheService, - poiSyncService: PoiSyncService, - @Inject('ISubqueryProject') project: SubqueryProject, - ) { - super( - nodeConfig, - eventEmitter, - projectService, - projectUpgradeService, - storeService, - storeCacheService, - poiSyncService, - project, - async ( - blockNums: number[], - ): Promise[] | IBlock[]> => { - const specChanged = await this.runtimeService.specChanged( - blockNums[blockNums.length - 1], - ); - - // If specVersion not changed, a known overallSpecVer will be pass in - // Otherwise use api to fetch runtimes - return this.apiService.fetchBlocks( - blockNums, - specChanged ? undefined : this.runtimeService.parentSpecVersion, - ); - }, - ); - } - - private get runtimeService(): RuntimeService { - assert(this._runtimeService, 'Runtime service not initialized'); - return this._runtimeService; - } - private set runtimeService(value: RuntimeService) { - this._runtimeService = value; - } - - async init( - onDynamicDsCreated: (height: number) => void, - runtimeService?: RuntimeService, - ): Promise { - await super.init(onDynamicDsCreated); - if (runtimeService) this.runtimeService = runtimeService; - } - - protected async indexBlock( - block: IBlock | IBlock, - ): Promise { - const runtimeVersion = !isFullBlock(block.block) - ? undefined - : await this.runtimeService.getRuntimeVersion(block.block.block); - - return this.indexerManager.indexBlock( - block, - await this.projectService.getDataSources(block.getHeader().blockHeight), - runtimeVersion, - ); - } -} diff --git a/packages/node/src/indexer/blockDispatcher/index.ts b/packages/node/src/indexer/blockDispatcher/index.ts index 8b646d2a9c..0081a692eb 100644 --- a/packages/node/src/indexer/blockDispatcher/index.ts +++ b/packages/node/src/indexer/blockDispatcher/index.ts @@ -1,7 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { BlockDispatcherService } from './block-dispatcher.service'; +// import { BlockDispatcherService } from './block-dispatcher.service'; import { WorkerBlockDispatcherService } from './worker-block-dispatcher.service'; -export { BlockDispatcherService, WorkerBlockDispatcherService }; +export { WorkerBlockDispatcherService }; diff --git a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts index 200f84754c..98ece59a80 100644 --- a/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/worker-block-dispatcher.service.ts @@ -17,14 +17,14 @@ import { InMemoryCacheService, createIndexerWorker as createIndexerWorkerCore, MonitorServiceInterface, + UnfinalizedBlocksService, + DynamicDsService, } from '@subql/node-core'; -import { SubstrateBlock, SubstrateDatasource } from '@subql/types'; +import { SubstrateDatasource } from '@subql/types'; import { SubqueryProject } from '../../configure/SubqueryProject'; import { ApiPromiseConnection } from '../apiPromise.connection'; -import { DynamicDsService } from '../dynamic-ds.service'; import { RuntimeService } from '../runtime/runtimeService'; -import { BlockContent } from '../types'; -import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service'; +import { BlockContent, LightBlockContent } from '../types'; import { IIndexerWorker } from '../worker/worker'; type IndexerWorker = IIndexerWorker & { @@ -36,7 +36,7 @@ export class WorkerBlockDispatcherService extends WorkerBlockDispatcher< SubstrateDatasource, IndexerWorker, - SubstrateBlock + BlockContent | LightBlockContent > implements OnApplicationShutdown { @@ -54,8 +54,10 @@ export class WorkerBlockDispatcherService storeCacheService: StoreCacheService, poiSyncService: PoiSyncService, @Inject('ISubqueryProject') project: SubqueryProject, - dynamicDsService: DynamicDsService, - unfinalizedBlocksService: UnfinalizedBlocksService, + dynamicDsService: DynamicDsService, + unfinalizedBlocksService: UnfinalizedBlocksService< + BlockContent | LightBlockContent + >, connectionPoolState: ConnectionPoolStateManager, monitorService?: MonitorServiceInterface, ) { @@ -95,17 +97,13 @@ export class WorkerBlockDispatcherService return this._runtimeService; } - private set runtimeService(runtimeService: RuntimeService) { - this._runtimeService = runtimeService; - } - async init( onDynamicDsCreated: (height: number) => void, runtimeService?: RuntimeService, ): Promise { await super.init(onDynamicDsCreated); // Sync workers runtime from main - if (runtimeService) this.runtimeService = runtimeService; + if (runtimeService) this._runtimeService = runtimeService; this.syncWorkerRuntimes(); } diff --git a/packages/node/src/indexer/dictionary/substrateDictionary.service.spec.ts b/packages/node/src/indexer/dictionary/substrateDictionary.service.spec.ts index f0c29297a1..bbb3500c46 100644 --- a/packages/node/src/indexer/dictionary/substrateDictionary.service.spec.ts +++ b/packages/node/src/indexer/dictionary/substrateDictionary.service.spec.ts @@ -2,12 +2,16 @@ // SPDX-License-Identifier: GPL-3.0 import { EventEmitter2 } from '@nestjs/event-emitter'; +import { isCustomDs } from '@subql/common-substrate'; -import { NodeConfig } from '@subql/node-core'; +import { + NodeConfig, + DsProcessorService, + IBlockchainService, +} from '@subql/node-core'; import axios from 'axios'; import { GraphQLSchema } from 'graphql'; import { SubqueryProject } from '../../configure/SubqueryProject'; -import { DsProcessorService } from '../ds-processor.service'; import { SubstrateDictionaryService } from './substrateDictionary.service'; import { SubstrateDictionaryV1 } from './v1'; import { SubstrateDictionaryV2 } from './v2'; @@ -50,7 +54,11 @@ describe('Substrate Dictionary service', function () { ['wss://polkadot.api.onfinality.io/public-ws'], '0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3', ); - const dsProcessor = new DsProcessorService(project, nodeConfig); + const dsProcessor = new DsProcessorService( + project, + { isCustomDs } as IBlockchainService, + nodeConfig, + ); dictionaryService = new SubstrateDictionaryService( project, diff --git a/packages/node/src/indexer/dictionary/substrateDictionary.service.ts b/packages/node/src/indexer/dictionary/substrateDictionary.service.ts index 8c09f6f037..383b0b3db1 100644 --- a/packages/node/src/indexer/dictionary/substrateDictionary.service.ts +++ b/packages/node/src/indexer/dictionary/substrateDictionary.service.ts @@ -5,12 +5,14 @@ import assert from 'assert'; import { Inject, Injectable } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { NETWORK_FAMILY } from '@subql/common'; -import { isCustomDs } from '@subql/common-substrate'; -import { NodeConfig, DictionaryService, getLogger } from '@subql/node-core'; +import { + NodeConfig, + DictionaryService, + getLogger, + DsProcessorService, +} from '@subql/node-core'; import { SubstrateBlock, SubstrateDatasource } from '@subql/types'; -import { DsProcessor } from '@subql/types-core'; import { SubqueryProject } from '../../configure/SubqueryProject'; -import { DsProcessorService } from '../ds-processor.service'; import { SpecVersion } from './types'; import { SubstrateDictionaryV1 } from './v1'; import { SubstrateDictionaryV2 } from './v2'; diff --git a/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.spec.ts b/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.spec.ts index b648e85730..c5055547d4 100644 --- a/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.spec.ts +++ b/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.spec.ts @@ -2,7 +2,12 @@ // SPDX-License-Identifier: GPL-3.0 import { EventEmitter2 } from '@nestjs/event-emitter'; -import { NodeConfig } from '@subql/node-core'; +import { isCustomDs } from '@subql/common-substrate'; +import { + NodeConfig, + DsProcessorService, + IBlockchainService, +} from '@subql/node-core'; import { SubstrateBlockHandler, SubstrateCallHandler, @@ -13,7 +18,6 @@ import { } from '@subql/types'; import { GraphQLSchema } from 'graphql'; import { SubqueryProject } from '../../../configure/SubqueryProject'; -import { DsProcessorService } from '../../ds-processor.service'; import { SubstrateDictionaryService } from '../substrateDictionary.service'; import { buildDictionaryV1QueryEntries } from './substrateDictionaryV1'; @@ -44,7 +48,11 @@ describe('Substrate DictionaryService', () => { project, nodeConfig, new EventEmitter2(), - new DsProcessorService(project, nodeConfig), + new DsProcessorService( + project, + { isCustomDs } as IBlockchainService, + nodeConfig, + ), ); // prepare dictionary service diff --git a/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.ts b/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.ts index 15b0b94eb0..596e2f7400 100644 --- a/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.ts +++ b/packages/node/src/indexer/dictionary/v1/substrateDictionaryV1.ts @@ -14,7 +14,13 @@ import { SubstrateHandlerKind, SubstrateRuntimeHandlerFilter, } from '@subql/common-substrate'; -import { NodeConfig, DictionaryV1, timeout, getLogger } from '@subql/node-core'; +import { + NodeConfig, + DictionaryV1, + timeout, + getLogger, + DsProcessorService, +} from '@subql/node-core'; import { SubstrateBlockFilter, SubstrateDatasource } from '@subql/types'; import { DictionaryQueryCondition, @@ -24,7 +30,6 @@ import { buildQuery, GqlNode, GqlQuery } from '@subql/utils'; import { sortBy, uniqBy } from 'lodash'; import { SubqueryProject } from '../../../configure/SubqueryProject'; import { isBaseHandler, isCustomHandler } from '../../../utils/project'; -import { DsProcessorService } from '../../ds-processor.service'; import { SpecVersion, SpecVersionDictionary } from '../types'; type GetDsProcessor = DsProcessorService['getDsProcessor']; diff --git a/packages/node/src/indexer/ds-processor.service.spec.ts b/packages/node/src/indexer/ds-processor.service.spec.ts index 66edc8e82a..e6c0a40fde 100644 --- a/packages/node/src/indexer/ds-processor.service.spec.ts +++ b/packages/node/src/indexer/ds-processor.service.spec.ts @@ -3,11 +3,10 @@ import path from 'path'; import { isCustomDs } from '@subql/common-substrate'; -import { NodeConfig } from '@subql/node-core'; -import { SubstrateCustomDatasource } from '@subql/types'; +import { NodeConfig, DsProcessorService } from '@subql/node-core'; +import { SubstrateCustomDatasource, SubstrateDatasource } from '@subql/types'; import { GraphQLSchema } from 'graphql'; import { SubqueryProject } from '../configure/SubqueryProject'; -import { DsProcessorService } from './ds-processor.service'; function getTestProject( extraDataSources: SubstrateCustomDatasource[], @@ -40,12 +39,16 @@ const nodeConfig = new NodeConfig({ }); describe('DsProcessorService', () => { - let service: DsProcessorService; + let service: DsProcessorService; let project: SubqueryProject; beforeEach(() => { project = getTestProject([]); - service = new DsProcessorService(project, nodeConfig); + service = new DsProcessorService( + project, + { isCustomDs } as any, + nodeConfig, + ); }); it('can validate custom ds', async () => { @@ -66,7 +69,11 @@ describe('DsProcessorService', () => { }; project = getTestProject([badDs]); - service = new DsProcessorService(project, nodeConfig); + service = new DsProcessorService( + project, + { isCustomDs } as any, + nodeConfig, + ); await expect( service.validateProjectCustomDatasources(project.dataSources), diff --git a/packages/node/src/indexer/ds-processor.service.ts b/packages/node/src/indexer/ds-processor.service.ts deleted file mode 100644 index cead77c1bd..0000000000 --- a/packages/node/src/indexer/ds-processor.service.ts +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { Injectable } from '@nestjs/common'; -import { isCustomDs } from '@subql/common-substrate'; -import { BaseDsProcessorService } from '@subql/node-core'; -import { - SubstrateCustomDatasource, - SubstrateCustomHandler, - SubstrateDatasource, - SubstrateDatasourceProcessor, - SubstrateMapping, -} from '@subql/types'; - -@Injectable() -export class DsProcessorService extends BaseDsProcessorService< - SubstrateDatasource, - SubstrateCustomDatasource>, - SubstrateDatasourceProcessor> -> { - protected isCustomDs = isCustomDs; -} diff --git a/packages/node/src/indexer/dynamic-ds.service.ts b/packages/node/src/indexer/dynamic-ds.service.ts deleted file mode 100644 index 0db1c87b27..0000000000 --- a/packages/node/src/indexer/dynamic-ds.service.ts +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { Inject, Injectable } from '@nestjs/common'; -import { isCustomDs, isRuntimeDs } from '@subql/common-substrate'; -import { - DatasourceParams, - DynamicDsService as BaseDynamicDsService, -} from '@subql/node-core'; -import { - SubqueryProject, - SubstrateProjectDs, -} from '../configure/SubqueryProject'; -import { DsProcessorService } from './ds-processor.service'; - -@Injectable() -export class DynamicDsService extends BaseDynamicDsService< - SubstrateProjectDs, - SubqueryProject -> { - constructor( - private readonly dsProcessorService: DsProcessorService, - @Inject('ISubqueryProject') project: SubqueryProject, - ) { - super(project); - } - - protected async getDatasource( - params: DatasourceParams, - ): Promise { - const dsObj = this.getTemplate( - params.templateName, - params.startBlock, - ); - - try { - if (isCustomDs(dsObj)) { - dsObj.processor.options = { - ...dsObj.processor.options, - ...params.args, - }; - await this.dsProcessorService.validateCustomDs([dsObj]); - } else if (isRuntimeDs(dsObj)) { - // XXX add any modifications to the ds here - } - - return dsObj; - } catch (e) { - throw new Error( - `Unable to create dynamic datasource.\n ${(e as any).message}`, - ); - } - } -} diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index 218a01919f..1eac85f17b 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -13,48 +13,79 @@ import { InMemoryCacheService, MonitorService, CoreModule, + UnfinalizedBlocksService, + BlockDispatcher, + ConnectionPoolService, + DsProcessorService, + ProjectService, + DynamicDsService, } from '@subql/node-core'; +import { SubstrateDatasource } from '@subql/types'; +import { BlockchainService } from '../blockchain.service'; import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; import { ApiPromiseConnection } from './apiPromise.connection'; -import { - BlockDispatcherService, - WorkerBlockDispatcherService, -} from './blockDispatcher'; +import { WorkerBlockDispatcherService } from './blockDispatcher'; import { SubstrateDictionaryService } from './dictionary/substrateDictionary.service'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; import { FetchService } from './fetch.service'; import { IndexerManager } from './indexer.manager'; -import { ProjectService } from './project.service'; import { RuntimeService } from './runtime/runtimeService'; -import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; @Module({ imports: [CoreModule], providers: [ - ApiService, + { + provide: 'APIService', + useFactory: ApiService.create, + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], + }, + { + provide: RuntimeService, // TODO DOING this because of circular reference with dictionary service + useFactory: (apiService: ApiService) => new RuntimeService(apiService), + inject: ['APIService'], + }, + { + provide: 'IBlockchainService', + useClass: BlockchainService, + }, + /* START: Move to node core */ + DsProcessorService, + DynamicDsService, + { + provide: 'IUnfinalizedBlocksService', + useClass: UnfinalizedBlocksService, + }, + { + useClass: ProjectService, + provide: 'IProjectService', + }, + /* END: Move to node core */ IndexerManager, { provide: 'IBlockDispatcher', useFactory: ( nodeConfig: NodeConfig, eventEmitter: EventEmitter2, - projectService: ProjectService, + projectService: ProjectService, projectUpgradeService: IProjectUpgradeService, - apiService: ApiService, - indexerManager: IndexerManager, cacheService: InMemoryCacheService, storeService: StoreService, storeCacheService: StoreCacheService, poiSyncService: PoiSyncService, project: SubqueryProject, - dynamicDsService: DynamicDsService, + dynamicDsService: DynamicDsService, unfinalizedBlocks: UnfinalizedBlocksService, connectionPoolState: ConnectionPoolStateManager, + blockchainService: BlockchainService, + indexerManager: IndexerManager, monitorService?: MonitorService, - ) => - nodeConfig.workers + ) => { + return nodeConfig.workers ? new WorkerBlockDispatcherService( nodeConfig, eventEmitter, @@ -70,10 +101,8 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; connectionPoolState, monitorService, ) - : new BlockDispatcherService( - apiService, + : new BlockDispatcher( nodeConfig, - indexerManager, eventEmitter, projectService, projectUpgradeService, @@ -81,35 +110,30 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; storeCacheService, poiSyncService, project, - ), + blockchainService, + indexerManager, + ); + }, inject: [ NodeConfig, EventEmitter2, 'IProjectService', 'IProjectUpgradeService', - ApiService, - IndexerManager, InMemoryCacheService, StoreService, StoreCacheService, PoiSyncService, 'ISubqueryProject', DynamicDsService, - UnfinalizedBlocksService, + 'IUnfinalizedBlocksService', ConnectionPoolStateManager, + 'IBlockchainService', + IndexerManager, MonitorService, ], }, - FetchService, SubstrateDictionaryService, - DsProcessorService, - DynamicDsService, - { - useClass: ProjectService, - provide: 'IProjectService', - }, - UnfinalizedBlocksService, - RuntimeService, + FetchService, ], }) export class FetchModule {} diff --git a/packages/node/src/indexer/fetch.service.spec.ts b/packages/node/src/indexer/fetch.service.spec.ts index 2b6efcc81b..37798f0bc3 100644 --- a/packages/node/src/indexer/fetch.service.spec.ts +++ b/packages/node/src/indexer/fetch.service.spec.ts @@ -1,6 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 +import { ProjectService } from '@subql/node-core'; import { SubstrateBlockHandler, SubstrateCallHandler, @@ -12,9 +13,8 @@ import { } from '@subql/types'; import { DictionaryQueryEntry } from '@subql/types-core'; import { FetchService } from './fetch.service'; -import { ProjectService } from './project.service'; -const projectService: ProjectService = { +const projectService: ProjectService = { getAllDataSources: () => { return [ makeDs([{ ...blockHandler, filter: { modulo: 5 } }]), @@ -60,7 +60,6 @@ describe('FetchSevice', () => { beforeEach(() => { fetchService = new FetchService( - null as any, // ApiService null as any, // NodeConfig projectService, // ProjectService {} as any, // Project @@ -71,6 +70,7 @@ describe('FetchSevice', () => { null as any, // SchedulerRegistry null as any, // RuntimeService null as any, // StoreCacheService + null as any, // BlockchainService ) as any; }); diff --git a/packages/node/src/indexer/fetch.service.test.ts b/packages/node/src/indexer/fetch.service.test.ts index e1efc082f5..ff1f4b81e6 100644 --- a/packages/node/src/indexer/fetch.service.test.ts +++ b/packages/node/src/indexer/fetch.service.test.ts @@ -2,12 +2,13 @@ // SPDX-License-Identifier: GPL-3.0 import { ApiPromise, HttpProvider } from '@polkadot/api'; -import { ApiService } from './api.service'; import { FetchService } from './fetch.service'; import { createCachedProvider } from './x-provider/cachedProvider'; const POLKADOT_ENDPOINT = 'https://rpc.polkadot.io'; +// TODO move these tests to Blockchain service + describe('FetchService', () => { let fetchService: FetchService; let api: ApiPromise; @@ -18,12 +19,7 @@ describe('FetchService', () => { provider: createCachedProvider(new HttpProvider(POLKADOT_ENDPOINT)), }); - const apiService = { - unsafeApi: api, - } as any as ApiService; - fetchService = new FetchService( - apiService, // ApiService null as any, // NodeConfig null as any, // ProjectService {} as any, // Project @@ -38,6 +34,7 @@ describe('FetchService', () => { null as any, // SchedulerRegistry null as any, // RuntimeService null as any, // StoreCacheService + null as any, // BlockchainService ) as any; }, 10000); diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index 28ad4bfcce..e5f69ceaf1 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -4,28 +4,19 @@ import { Inject, Injectable } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { SchedulerRegistry } from '@nestjs/schedule'; -import { ApiPromise } from '@polkadot/api'; - -import { isCustomDs, SubstrateHandlerKind } from '@subql/common-substrate'; import { NodeConfig, BaseFetchService, - getModulos, - Header, StoreCacheService, + UnfinalizedBlocksService, + ProjectService, } from '@subql/node-core'; import { SubstrateDatasource, SubstrateBlock } from '@subql/types'; +import { BlockchainService } from '../blockchain.service'; import { SubqueryProject } from '../configure/SubqueryProject'; -import { calcInterval, substrateHeaderToHeader } from '../utils/substrate'; -import { ApiService } from './api.service'; import { ISubstrateBlockDispatcher } from './blockDispatcher/substrate-block-dispatcher'; import { SubstrateDictionaryService } from './dictionary/substrateDictionary.service'; -import { ProjectService } from './project.service'; import { RuntimeService } from './runtime/runtimeService'; -import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; - -const BLOCK_TIME_VARIANCE = 5000; //ms -const INTERVAL_PERCENT = 0.9; @Injectable() export class FetchService extends BaseFetchService< @@ -34,18 +25,20 @@ export class FetchService extends BaseFetchService< SubstrateBlock > { constructor( - private apiService: ApiService, nodeConfig: NodeConfig, - @Inject('IProjectService') projectService: ProjectService, + @Inject('IProjectService') + projectService: ProjectService, @Inject('ISubqueryProject') project: SubqueryProject, @Inject('IBlockDispatcher') blockDispatcher: ISubstrateBlockDispatcher, dictionaryService: SubstrateDictionaryService, - unfinalizedBlocksService: UnfinalizedBlocksService, + @Inject('IUnfinalizedBlocksService') + unfinalizedBlocksService: UnfinalizedBlocksService, eventEmitter: EventEmitter2, schedulerRegistry: SchedulerRegistry, private runtimeService: RuntimeService, storeCacheService: StoreCacheService, + @Inject('IBlockchainService') blockchainService: BlockchainService, ) { super( nodeConfig, @@ -57,37 +50,10 @@ export class FetchService extends BaseFetchService< schedulerRegistry, unfinalizedBlocksService, storeCacheService, + blockchainService, ); } - get api(): ApiPromise { - return this.apiService.unsafeApi; - } - - protected async getFinalizedHeader(): Promise
{ - const finalizedHash = await this.api.rpc.chain.getFinalizedHead(); - const finalizedHeader = await this.api.rpc.chain.getHeader(finalizedHash); - return substrateHeaderToHeader(finalizedHeader); - } - - protected async getBestHeight(): Promise { - const bestHeader = await this.api.rpc.chain.getHeader(); - return bestHeader.number.toNumber(); - } - - // eslint-disable-next-line @typescript-eslint/require-await - protected async getChainInterval(): Promise { - const chainInterval = calcInterval(this.api) - .muln(INTERVAL_PERCENT) - .toNumber(); - - return Math.min(BLOCK_TIME_VARIANCE, chainInterval); - } - - protected getModulos(dataSources: SubstrateDatasource[]): number[] { - return getModulos(dataSources, isCustomDs, SubstrateHandlerKind.Block); - } - protected async initBlockDispatcher(): Promise { await this.blockDispatcher.init( this.resetForNewDs.bind(this), @@ -100,12 +66,9 @@ export class FetchService extends BaseFetchService< }: { startHeight: number; }): Promise { - this.runtimeService.init(this.getLatestFinalizedHeight.bind(this)); - - await this.runtimeService.syncDictionarySpecVersions(startHeight); - - // setup parentSpecVersion - await this.runtimeService.specChanged(startHeight); - await this.runtimeService.prefetchMeta(startHeight); + await this.runtimeService.init( + startHeight, + this.getLatestFinalizedHeight(), + ); } } diff --git a/packages/node/src/indexer/indexer.manager.spec.ts b/packages/node/src/indexer/indexer.manager.spec.ts deleted file mode 100644 index 52dc504ccb..0000000000 --- a/packages/node/src/indexer/indexer.manager.spec.ts +++ /dev/null @@ -1,258 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { - SubstrateDatasourceKind, - SubstrateHandlerKind, -} from '@subql/common-substrate'; -import { - StoreService, - PoiService, - PoiSyncService, - NodeConfig, - ConnectionPoolService, - StoreCacheService, - IProjectUpgradeService, - InMemoryCacheService, - SandboxService, -} from '@subql/node-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { GraphQLSchema } from 'graphql'; -import { SubqueryProject } from '../configure/SubqueryProject'; -import { ApiService } from './api.service'; -import { ApiPromiseConnection } from './apiPromise.connection'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; -import { IndexerManager } from './indexer.manager'; -import { ProjectService } from './project.service'; -import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; - -jest.mock('@subql/x-sequelize', () => { - const mSequelize = { - authenticate: jest.fn(), - define: () => ({ - findOne: jest.fn(), - create: (input: any) => input, - }), - query: () => [{ nextval: 1 }], - showAllSchemas: () => ['subquery_1'], - model: () => ({ upsert: jest.fn() }), - sync: jest.fn(), - transaction: () => ({ - commit: jest.fn(), - rollback: jest.fn(), - afterCommit: jest.fn(), - }), - // createSchema: jest.fn(), - }; - const actualSequelize = jest.requireActual('@subql/x-sequelize'); - return { - ...actualSequelize, - Sequelize: jest.fn(() => mSequelize), - }; -}); - -jest.setTimeout(200000); - -const nodeConfig = new NodeConfig({ - subquery: 'asdf', - subqueryName: 'asdf', - networkEndpoint: { 'wss://polkadot.api.onfinality.io/public-ws': {} }, -}); - -function testSubqueryProject_1(): SubqueryProject { - return { - id: 'test', - root: './', - network: { - chainId: '0x', - endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], - }, - dataSources: [ - { - kind: SubstrateDatasourceKind.Runtime, - startBlock: 1, - mapping: { - file: '', - handlers: [ - { handler: 'testSandbox', kind: SubstrateHandlerKind.Event }, - ], - }, - }, - { - kind: SubstrateDatasourceKind.Runtime, - startBlock: 1, - mapping: { - file: '', - handlers: [ - { handler: 'testSandbox', kind: SubstrateHandlerKind.Event }, - ], - }, - }, - ], - schema: new GraphQLSchema({}), - templates: [], - } as unknown as SubqueryProject; -} - -function testSubqueryProject_2(): SubqueryProject { - return { - id: 'test', - root: './', - network: { - endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], - dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`, - chainId: - '0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3', - }, - dataSources: [ - { - kind: SubstrateDatasourceKind.Runtime, - startBlock: 1, - mapping: { - file: '', - handlers: [ - { handler: 'testSandbox', kind: SubstrateHandlerKind.Event }, - ], - }, - }, - ], - schema: new GraphQLSchema({}), - templates: [], - } as unknown as SubqueryProject; -} - -// eslint-disable-next-line jest/no-export -export function mockProjectUpgradeService( - project: SubqueryProject, -): IProjectUpgradeService { - const startBlock = Math.min( - ...project.dataSources.map((ds) => ds.startBlock || 1), - ); - - let currentHeight = startBlock; - return { - isRewindable: true, - init: jest.fn(), - initWorker: jest.fn(), - updateIndexedDeployments: jest.fn(), - currentHeight: currentHeight, - // eslint-disable-next-line @typescript-eslint/require-await - setCurrentHeight: async (height: number) => { - currentHeight = height; - }, - currentProject: project, - projects: new Map([[startBlock, project]]), - getProject: () => project, - rewind: () => Promise.resolve(), - }; -} - -function createIndexerManager( - project: SubqueryProject, - connectionPoolService: ConnectionPoolService, - nodeConfig: NodeConfig, -): IndexerManager { - const sequelize = new Sequelize(); - const eventEmitter = new EventEmitter2(); - const apiService = new ApiService( - project, - connectionPoolService, - eventEmitter, - nodeConfig, - ); - const dsProcessorService = new DsProcessorService(project, nodeConfig); - const dynamicDsService = new DynamicDsService(dsProcessorService, project); - - const storeCache = new StoreCacheService( - sequelize, - nodeConfig, - eventEmitter, - new SchedulerRegistry(), - ); - const storeService = new StoreService( - sequelize, - nodeConfig, - storeCache, - project, - ); - const cacheService = new InMemoryCacheService(); - const poiService = new PoiService(nodeConfig, storeCache); - - const poiSyncService = new PoiSyncService(nodeConfig, eventEmitter, project); - const unfinalizedBlocksService = new UnfinalizedBlocksService( - apiService, - nodeConfig, - storeCache, - ); - const sandboxService = new SandboxService( - storeService, - cacheService, - nodeConfig, - project, - ); - - const projectUpgradeService = mockProjectUpgradeService(project); - const projectService = new ProjectService( - dsProcessorService, - apiService, - poiService, - poiSyncService, - sequelize, - project, - projectUpgradeService, - storeService, - nodeConfig, - dynamicDsService, - eventEmitter, - unfinalizedBlocksService, - ); - - return new IndexerManager( - apiService, - nodeConfig, - sandboxService, - dsProcessorService, - dynamicDsService, - unfinalizedBlocksService, - ); -} - -/* - * These tests aren't run because of setup requirements with such a large number of dependencies - */ -describe('IndexerManager', () => { - let indexerManager: IndexerManager; - - afterEach(() => { - (indexerManager as any)?.fetchService.onApplicationShutdown(); - }); - - it.skip('should be able to start the manager (v0.0.1)', async () => { - // indexerManager = createIndexerManager( - // testSubqueryProject_1(), - // new ConnectionPoolService( - // nodeConfig, - // new ConnectionPoolStateManager(), - // ), - // nodeConfig, - // ); - // await expect(indexerManager.start()).resolves.toBe(undefined); - // expect(Object.keys((indexerManager as any).vms).length).toBe(1); - }); - - it.skip('should be able to start the manager (v0.2.0)', async () => { - // indexerManager = createIndexerManager( - // testSubqueryProject_2(), - // new ConnectionPoolService( - // nodeConfig, - // new ConnectionPoolStateManager(), - // ), - // nodeConfig, - // ); - // await expect(indexerManager.start()).resolves.toBe(undefined); - // expect(Object.keys((indexerManager as any).vms).length).toBe(1); - }); -}); diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index 1401c339ad..f131bac932 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -1,15 +1,12 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { ApiPromise } from '@polkadot/api'; -import { RuntimeVersion } from '@polkadot/types/interfaces'; import { isBlockHandlerProcessor, isCallHandlerProcessor, isEventHandlerProcessor, - isCustomDs, - isRuntimeDs, SubstrateCustomDataSource, SubstrateHandlerKind, SubstrateRuntimeHandlerInputMap, @@ -21,23 +18,28 @@ import { IndexerSandbox, ProcessBlockResponse, BaseIndexerManager, + DsProcessorService, IBlock, + UnfinalizedBlocksService, + IBlockchainService, + DynamicDsService, } from '@subql/node-core'; import { LightSubstrateEvent, SubstrateBlock, SubstrateBlockFilter, + SubstrateCustomDatasource, SubstrateDatasource, SubstrateEvent, SubstrateExtrinsic, } from '@subql/types'; -import { SubstrateProjectDs } from '../configure/SubqueryProject'; +import { + SubqueryProject, + SubstrateProjectDs, +} from '../configure/SubqueryProject'; import * as SubstrateUtil from '../utils/substrate'; import { ApiService as SubstrateApiService } from './api.service'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; import { ApiAt, BlockContent, isFullBlock, LightBlockContent } from './types'; -import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; @Injectable() export class IndexerManager extends BaseIndexerManager< @@ -51,16 +53,28 @@ export class IndexerManager extends BaseIndexerManager< typeof ProcessorTypeMap, SubstrateRuntimeHandlerInputMap > { - protected isRuntimeDs = isRuntimeDs; - protected isCustomDs = isCustomDs; - constructor( - apiService: SubstrateApiService, + @Inject('APIService') apiService: SubstrateApiService, nodeConfig: NodeConfig, sandboxService: SandboxService, - dsProcessorService: DsProcessorService, - dynamicDsService: DynamicDsService, - unfinalizedBlocksService: UnfinalizedBlocksService, + dsProcessorService: DsProcessorService< + SubstrateDatasource, + SubstrateCustomDatasource + >, + dynamicDsService: DynamicDsService, + @Inject('IUnfinalizedBlocksService') + unfinalizedBlocksService: UnfinalizedBlocksService< + BlockContent | LightBlockContent + >, + @Inject('IBlockchainService') + blockchainService: IBlockchainService< + SubstrateDatasource, + SubstrateCustomDatasource, + SubqueryProject, + ApiAt, + LightBlockContent, + BlockContent + >, ) { super( apiService, @@ -71,6 +85,7 @@ export class IndexerManager extends BaseIndexerManager< unfinalizedBlocksService, FilterTypeMap, ProcessorTypeMap, + blockchainService, ); } @@ -78,21 +93,9 @@ export class IndexerManager extends BaseIndexerManager< async indexBlock( block: IBlock, dataSources: SubstrateDatasource[], - runtimeVersion?: RuntimeVersion, ): Promise { return super.internalIndexBlock(block, dataSources, () => - this.getApi(block.block, runtimeVersion), - ); - } - - // eslint-disable-next-line @typescript-eslint/require-await - private async getApi( - block: LightBlockContent | BlockContent, - runtimeVersion?: RuntimeVersion, - ): Promise { - return this.apiService.getPatchedApi( - block.block.block.header, - runtimeVersion, + this.blockchainService.getSafeApi(block.block), ); } @@ -103,16 +106,28 @@ export class IndexerManager extends BaseIndexerManager< ): Promise { if (isFullBlock(blockContent)) { const { block, events, extrinsics } = blockContent; - await this.indexBlockContent(block, dataSources, getVM); + await this.indexContent(SubstrateHandlerKind.Block)( + block, + dataSources, + getVM, + ); // Run initialization events const initEvents = events.filter((evt) => evt.phase.isInitialization); for (const event of initEvents) { - await this.indexEvent(event, dataSources, getVM); + await this.indexContent(SubstrateHandlerKind.Event)( + event, + dataSources, + getVM, + ); } for (const extrinsic of extrinsics) { - await this.indexExtrinsic(extrinsic, dataSources, getVM); + await this.indexContent(SubstrateHandlerKind.Call)( + extrinsic, + dataSources, + getVM, + ); // Process extrinsic events const extrinsicEvents = events @@ -120,50 +135,50 @@ export class IndexerManager extends BaseIndexerManager< .sort((a, b) => a.idx - b.idx); for (const event of extrinsicEvents) { - await this.indexEvent(event, dataSources, getVM); + await this.indexContent(SubstrateHandlerKind.Event)( + event, + dataSources, + getVM, + ); } } // Run finalization events const finalizeEvents = events.filter((evt) => evt.phase.isFinalization); for (const event of finalizeEvents) { - await this.indexEvent(event, dataSources, getVM); + await this.indexContent(SubstrateHandlerKind.Event)( + event, + dataSources, + getVM, + ); } } else { for (const event of blockContent.events) { - await this.indexEvent(event, dataSources, getVM); + await this.indexContent(SubstrateHandlerKind.Event)( + event, + dataSources, + getVM, + ); } } } - private async indexBlockContent( - block: SubstrateBlock, - dataSources: SubstrateProjectDs[], - getVM: (d: SubstrateProjectDs) => Promise, - ): Promise { - for (const ds of dataSources) { - await this.indexData(SubstrateHandlerKind.Block, block, ds, getVM); - } - } - - private async indexExtrinsic( - extrinsic: SubstrateExtrinsic, - dataSources: SubstrateProjectDs[], - getVM: (d: SubstrateProjectDs) => Promise, - ): Promise { - for (const ds of dataSources) { - await this.indexData(SubstrateHandlerKind.Call, extrinsic, ds, getVM); - } - } - - private async indexEvent( - event: SubstrateEvent | LightSubstrateEvent, + private indexContent( + kind: SubstrateHandlerKind, + ): ( + content: + | SubstrateBlock + | SubstrateExtrinsic + | SubstrateEvent + | LightSubstrateEvent, dataSources: SubstrateProjectDs[], getVM: (d: SubstrateProjectDs) => Promise, - ): Promise { - for (const ds of dataSources) { - await this.indexData(SubstrateHandlerKind.Event, event, ds, getVM); - } + ) => Promise { + return async (content, dataSources, getVM) => { + for (const ds of dataSources) { + await this.indexData(kind, content, ds, getVM); + } + }; } protected async prepareFilteredData( diff --git a/packages/node/src/indexer/project.service.spec.ts b/packages/node/src/indexer/project.service.spec.ts index 16960e499c..97c814056e 100644 --- a/packages/node/src/indexer/project.service.spec.ts +++ b/packages/node/src/indexer/project.service.spec.ts @@ -4,20 +4,20 @@ import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter'; import { Test } from '@nestjs/testing'; import { - BaseProjectService, + ProjectService, ConnectionPoolService, ConnectionPoolStateManager, NodeConfig, ProjectUpgradeService, upgradableSubqueryProject, + DsProcessorService, + DynamicDsService } from '@subql/node-core'; import { SubstrateDatasourceKind, SubstrateHandlerKind } from '@subql/types'; import { GraphQLSchema } from 'graphql'; +import { BlockchainService } from '../blockchain.service'; import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; -import { ProjectService } from './project.service'; function testSubqueryProject(): SubqueryProject { return { @@ -52,7 +52,7 @@ function testSubqueryProject(): SubqueryProject { } // @ts-ignore -class TestProjectService extends BaseProjectService { +class TestProjectService extends ProjectService { packageVersion = '1.0.0'; async init(startHeight?: number): Promise { @@ -140,7 +140,7 @@ describe('ProjectService', () => { }, { provide: ProjectService, - useFactory: (apiService: ApiService, project: SubqueryProject) => + useFactory: (apiService: ApiService, project: SubqueryProject, blockchainService: BlockchainService) => new TestProjectService( { validateProjectCustomDatasources: jest.fn(), @@ -166,23 +166,31 @@ describe('ProjectService', () => { } as unknown as DynamicDsService, null as unknown as any, null as unknown as any, + blockchainService ), - inject: [ApiService, 'ISubqueryProject'], + inject: ['APIService', 'ISubqueryProject', BlockchainService,], }, EventEmitter2, - ApiService, + { + provide: 'APIService', + useFactory: ApiService.create, + inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2, NodeConfig] + }, { provide: ProjectUpgradeService, useValue: projectUpgrade, }, + { + provide: 'IBlockchainService', + useClass: BlockchainService, + } ], imports: [EventEmitterModule.forRoot()], }).compile(); const app = module.createNestApplication(); await app.init(); - apiService = app.get(ApiService); - await apiService.init(); + apiService = app.get('APIService'); projectUpgradeService = app.get( ProjectUpgradeService, ) as ProjectUpgradeService; diff --git a/packages/node/src/indexer/project.service.ts b/packages/node/src/indexer/project.service.ts deleted file mode 100644 index eb9c4ebe03..0000000000 --- a/packages/node/src/indexer/project.service.ts +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { isMainThread } from 'worker_threads'; -import { Inject, Injectable } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { - PoiService, - PoiSyncService, - BaseProjectService, - StoreService, - NodeConfig, - IProjectUpgradeService, - profiler, -} from '@subql/node-core'; -import { SubstrateDatasource } from '@subql/types'; -import { Sequelize } from '@subql/x-sequelize'; -import { SubqueryProject } from '../configure/SubqueryProject'; -import { getBlockByHeight, getTimestamp } from '../utils/substrate'; -import { ApiService } from './api.service'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; -import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; - -// eslint-disable-next-line @typescript-eslint/no-var-requires -const { version: packageVersion } = require('../../package.json'); - -@Injectable() -export class ProjectService extends BaseProjectService< - ApiService, - SubstrateDatasource -> { - protected packageVersion = packageVersion; - - constructor( - dsProcessorService: DsProcessorService, - apiService: ApiService, - @Inject(isMainThread ? PoiService : 'Null') poiService: PoiService, - @Inject(isMainThread ? PoiSyncService : 'Null') - poiSyncService: PoiSyncService, - @Inject(isMainThread ? Sequelize : 'Null') sequelize: Sequelize, - @Inject('ISubqueryProject') project: SubqueryProject, - @Inject('IProjectUpgradeService') - projectUpgradeService: IProjectUpgradeService, - @Inject(isMainThread ? StoreService : 'Null') storeService: StoreService, - nodeConfig: NodeConfig, - dynamicDsService: DynamicDsService, - eventEmitter: EventEmitter2, - unfinalizedBlockService: UnfinalizedBlocksService, - ) { - super( - dsProcessorService, - apiService, - poiService, - poiSyncService, - sequelize, - project, - projectUpgradeService, - storeService, - nodeConfig, - dynamicDsService, - eventEmitter, - unfinalizedBlockService, - ); - } - - @profiler() - async init(startHeight?: number): Promise { - return super.init(startHeight); - } - - protected async getBlockTimestamp(height: number): Promise { - const block = await getBlockByHeight(this.apiService.api, height); - return getTimestamp(block); - } - - protected async onProjectChange(project: SubqueryProject): Promise { - // Only network with chainTypes require to reload - await this.apiService.updateChainTypes(); - this.apiService.updateBlockFetching(); - } -} diff --git a/packages/node/src/indexer/runtime/base-runtime.service.ts b/packages/node/src/indexer/runtime/base-runtime.service.ts index 49dbdf7d04..03764400b9 100644 --- a/packages/node/src/indexer/runtime/base-runtime.service.ts +++ b/packages/node/src/indexer/runtime/base-runtime.service.ts @@ -1,7 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { ApiPromise } from '@polkadot/api'; import { RuntimeVersion } from '@polkadot/types/interfaces'; import { profiler } from '@subql/node-core'; @@ -10,7 +10,6 @@ import * as SubstrateUtil from '../../utils/substrate'; import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary'; export const SPEC_VERSION_BLOCK_GAP = 100; -type GetLatestFinalizedHeight = () => number; @Injectable() export abstract class BaseRuntimeService { @@ -19,7 +18,7 @@ export abstract class BaseRuntimeService { private currentRuntimeVersion?: RuntimeVersion; latestFinalizedHeight?: number; - constructor(protected apiService: ApiService) {} + constructor(@Inject('APIService') protected apiService: ApiService) {} async specChanged(height: number, specVersion: number): Promise { if (this.parentSpecVersion !== specVersion) { @@ -37,10 +36,6 @@ export abstract class BaseRuntimeService { blockHeight: number, ): Promise<{ blockSpecVersion: number; syncedDictionary: boolean }>; - init(getLatestFinalizedHeight: GetLatestFinalizedHeight): void { - this.latestFinalizedHeight = getLatestFinalizedHeight(); - } - get api(): ApiPromise { return this.apiService.api; } diff --git a/packages/node/src/indexer/runtime/runtimeService.ts b/packages/node/src/indexer/runtime/runtimeService.ts index 231153d811..28e32e0d17 100644 --- a/packages/node/src/indexer/runtime/runtimeService.ts +++ b/packages/node/src/indexer/runtime/runtimeService.ts @@ -1,7 +1,7 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { getLogger, profiler } from '@subql/node-core'; import { ApiService } from '../api.service'; import { SubstrateDictionaryService } from '../dictionary'; @@ -15,12 +15,24 @@ const logger = getLogger('RuntimeService'); @Injectable() export class RuntimeService extends BaseRuntimeService { constructor( - protected apiService: ApiService, + @Inject('APIService') protected apiService: ApiService, protected dictionaryService?: SubstrateDictionaryService, ) { super(apiService); } + async init( + startHeight: number, + latestFinalizedHeight: number, + ): Promise { + this.latestFinalizedHeight = latestFinalizedHeight; + + await this.syncDictionarySpecVersions(startHeight); + + await this.specChanged(startHeight); + await this.prefetchMeta(startHeight); + } + // get latest specVersions from dictionary async syncDictionarySpecVersions(height: number): Promise { try { diff --git a/packages/node/src/indexer/store.service.test.ts b/packages/node/src/indexer/store.service.test.ts index 890480454e..bc6702a2e1 100644 --- a/packages/node/src/indexer/store.service.test.ts +++ b/packages/node/src/indexer/store.service.test.ts @@ -9,12 +9,11 @@ import { DbOption, getFunctions, getTriggers, + ProjectService, } from '@subql/node-core'; import { QueryTypes, Sequelize } from '@subql/x-sequelize'; import rimraf from 'rimraf'; import { prepareApp } from '../utils/test.utils'; -import { ApiService } from './api.service'; -import { ProjectService } from './project.service'; const option: DbOption = { host: process.env.DB_HOST ?? '127.0.0.1', @@ -57,9 +56,6 @@ describe('Store service integration test', () => { app = await prepareApp(schemaName, cid, false); projectService = app.get('IProjectService'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; @@ -156,9 +152,6 @@ AND table_name = 'positions'; app = await prepareApp(schemaName, cid, true); projectService = app.get('IProjectService'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; @@ -226,9 +219,6 @@ ORDER BY app = await prepareApp(schemaName, cid, true); projectService = app.get('IProjectService'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; @@ -270,9 +260,6 @@ WHERE app = await prepareApp(schemaName, cid, true); projectService = app.get('IProjectService'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; @@ -329,9 +316,6 @@ ORDER BY t.typname, e.enumsortorder;`, app = await prepareApp(schemaName, cid, false, false); projectService = app.get('IProjectService'); - const apiService = app.get(ApiService); - - await apiService.init(); await projectService.init(1); tempDir = (projectService as any).project.root; diff --git a/packages/node/src/indexer/unfinalizedBlocks.service.ts b/packages/node/src/indexer/unfinalizedBlocks.service.ts deleted file mode 100644 index 196564212a..0000000000 --- a/packages/node/src/indexer/unfinalizedBlocks.service.ts +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors -// SPDX-License-Identifier: GPL-3.0 - -import { Injectable } from '@nestjs/common'; -import { - BaseUnfinalizedBlocksService, - Header, - mainThreadOnly, - NodeConfig, - StoreCacheService, -} from '@subql/node-core'; -import { substrateHeaderToHeader } from '../utils/substrate'; -import { ApiService } from './api.service'; -import { BlockContent, LightBlockContent } from './types'; - -@Injectable() -export class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService< - BlockContent | LightBlockContent -> { - constructor( - private readonly apiService: ApiService, - nodeConfig: NodeConfig, - storeCache: StoreCacheService, - ) { - super(nodeConfig, storeCache); - } - - @mainThreadOnly() - protected async getFinalizedHead(): Promise
{ - return substrateHeaderToHeader( - await this.apiService.api.rpc.chain.getHeader( - await this.apiService.api.rpc.chain.getFinalizedHead(), - ), - ); - } - - // TODO: add cache here - @mainThreadOnly() - protected async getHeaderForHash(hash: string): Promise
{ - return substrateHeaderToHeader( - await this.apiService.api.rpc.chain.getHeader(hash), - ); - } - - @mainThreadOnly() - protected async getHeaderForHeight(height: number): Promise
{ - const hash = await this.apiService.api.rpc.chain.getBlockHash(height); - return this.getHeaderForHash(hash.toHex()); - } -} diff --git a/packages/node/src/indexer/worker/worker-fetch.module.ts b/packages/node/src/indexer/worker/worker-fetch.module.ts index eb150e16f2..39ce924fd0 100644 --- a/packages/node/src/indexer/worker/worker-fetch.module.ts +++ b/packages/node/src/indexer/worker/worker-fetch.module.ts @@ -5,20 +5,15 @@ import { Module } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { ConnectionPoolService, - WorkerDynamicDsService, NodeConfig, - WorkerUnfinalizedBlocksService, WorkerCoreModule, + ProjectService, + DsProcessorService, } from '@subql/node-core'; -import { SubqueryProject } from '../../configure/SubqueryProject'; +import { BlockchainService } from '../../blockchain.service'; import { ApiService } from '../api.service'; -import { ApiPromiseConnection } from '../apiPromise.connection'; -import { DsProcessorService } from '../ds-processor.service'; -import { DynamicDsService } from '../dynamic-ds.service'; import { IndexerManager } from '../indexer.manager'; -import { ProjectService } from '../project.service'; import { WorkerRuntimeService } from '../runtime/workerRuntimeService'; -import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service'; import { WorkerService } from './worker.service'; /** @@ -28,24 +23,11 @@ import { WorkerService } from './worker.service'; @Module({ imports: [WorkerCoreModule], providers: [ + DsProcessorService, IndexerManager, { - provide: ApiService, - useFactory: async ( - project: SubqueryProject, - connectionPoolService: ConnectionPoolService, - eventEmitter: EventEmitter2, - nodeConfig: NodeConfig, - ) => { - const apiService = new ApiService( - project, - connectionPoolService, - eventEmitter, - nodeConfig, - ); - await apiService.init(); - return apiService; - }, + provide: 'APIService', + useFactory: ApiService.create, inject: [ 'ISubqueryProject', ConnectionPoolService, @@ -53,22 +35,16 @@ import { WorkerService } from './worker.service'; NodeConfig, ], }, - DsProcessorService, - { - provide: DynamicDsService, - useFactory: () => new WorkerDynamicDsService((global as any).host), - }, { provide: 'IProjectService', useClass: ProjectService, }, + WorkerRuntimeService, { - provide: UnfinalizedBlocksService, - useFactory: () => - new WorkerUnfinalizedBlocksService((global as any).host), + provide: 'IBlockchainService', + useClass: BlockchainService, }, WorkerService, - WorkerRuntimeService, ], exports: [], }) diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index 8ecb6828bb..00fe2318ff 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -15,7 +15,7 @@ import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary'; import { IndexerManager } from '../indexer.manager'; import { WorkerRuntimeService } from '../runtime/workerRuntimeService'; -import { BlockContent, isFullBlock, LightBlockContent } from '../types'; +import { BlockContent, LightBlockContent } from '../types'; export type FetchBlockResponse = { specVersion?: number; parentHash: string }; @@ -27,7 +27,7 @@ export class WorkerService extends BaseWorkerService< { specVersion: number } > { constructor( - private apiService: ApiService, + @Inject('APIService') private apiService: ApiService, private indexerManager: IndexerManager, private workerRuntimeService: WorkerRuntimeService, @Inject('IProjectService') @@ -67,11 +67,7 @@ export class WorkerService extends BaseWorkerService< block: IBlock, dataSources: SubstrateDatasource[], ): Promise { - const runtimeVersion = !isFullBlock(block.block) - ? undefined - : await this.workerRuntimeService.getRuntimeVersion(block.block.block); - - return this.indexerManager.indexBlock(block, dataSources, runtimeVersion); + return this.indexerManager.indexBlock(block, dataSources); } getSpecFromMap(height: number): number | undefined { diff --git a/packages/node/src/indexer/worker/worker.ts b/packages/node/src/indexer/worker/worker.ts index 07cb8534e2..3627fdb956 100644 --- a/packages/node/src/indexer/worker/worker.ts +++ b/packages/node/src/indexer/worker/worker.ts @@ -25,9 +25,9 @@ import { initWorkerServices, getWorkerService, IBaseIndexerWorker, + ProjectService, } from '@subql/node-core'; import { SpecVersion } from '../dictionary'; -import { ProjectService } from '../project.service'; import { WorkerModule } from './worker.module'; import { WorkerService } from './worker.service'; diff --git a/packages/node/src/init.ts b/packages/node/src/init.ts index 83aa1f71cf..a9289cc9ea 100644 --- a/packages/node/src/init.ts +++ b/packages/node/src/init.ts @@ -8,11 +8,10 @@ import { getLogger, getValidPort, NestLogger, + ProjectService, } from '@subql/node-core'; import { AppModule } from './app.module'; -import { ApiService } from './indexer/api.service'; import { FetchService } from './indexer/fetch.service'; -import { ProjectService } from './indexer/project.service'; import { yargsOptions } from './yargs'; const pjson = require('../package.json'); @@ -34,10 +33,8 @@ export async function bootstrap(): Promise { const projectService: ProjectService = app.get('IProjectService'); const fetchService = app.get(FetchService); - const apiService = app.get(ApiService); // Initialise async services, we do this here rather than in factories, so we can capture one off events - await apiService.init(); await projectService.init(); await fetchService.init(projectService.startHeight); diff --git a/packages/node/src/subcommands/reindex.module.ts b/packages/node/src/subcommands/reindex.module.ts index 7d69d44eb7..b50309f58e 100644 --- a/packages/node/src/subcommands/reindex.module.ts +++ b/packages/node/src/subcommands/reindex.module.ts @@ -11,12 +11,11 @@ import { ReindexService, StoreService, PoiService, + DsProcessorService, + UnfinalizedBlocksService, + DynamicDsService, } from '@subql/node-core'; import { ConfigureModule } from '../configure/configure.module'; -import { ApiService } from '../indexer/api.service'; -import { DsProcessorService } from '../indexer/ds-processor.service'; -import { DynamicDsService } from '../indexer/dynamic-ds.service'; -import { UnfinalizedBlocksService } from '../indexer/unfinalizedBlocks.service'; @Module({ providers: [ @@ -36,7 +35,7 @@ import { UnfinalizedBlocksService } from '../indexer/unfinalizedBlocks.service'; DsProcessorService, { // Used to work with DI for unfinalizedBlocksService but not used with reindex - provide: ApiService, + provide: 'APIService', useFactory: () => undefined, }, SchedulerRegistry, diff --git a/packages/node/src/subcommands/testing.module.ts b/packages/node/src/subcommands/testing.module.ts index 5e7eab2045..4b63f1ffe9 100644 --- a/packages/node/src/subcommands/testing.module.ts +++ b/packages/node/src/subcommands/testing.module.ts @@ -3,52 +3,41 @@ import { Module } from '@nestjs/common'; import { EventEmitter2, EventEmitterModule } from '@nestjs/event-emitter'; -import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule'; +import { ScheduleModule } from '@nestjs/schedule'; import { ConnectionPoolService, - ConnectionPoolStateManager, DbModule, - InMemoryCacheService, - PoiService, - PoiSyncService, - StoreCacheService, - StoreService, TestRunner, - SandboxService, + NodeConfig, + ProjectService, + TestingCoreModule, } from '@subql/node-core'; +import { BlockchainService } from '../blockchain.service'; import { ConfigureModule } from '../configure/configure.module'; import { ApiService } from '../indexer/api.service'; -import { DsProcessorService } from '../indexer/ds-processor.service'; -import { DynamicDsService } from '../indexer/dynamic-ds.service'; import { IndexerManager } from '../indexer/indexer.manager'; -import { ProjectService } from '../indexer/project.service'; -import { UnfinalizedBlocksService } from '../indexer/unfinalizedBlocks.service'; @Module({ providers: [ - InMemoryCacheService, - StoreService, - StoreCacheService, - EventEmitter2, - PoiService, - PoiSyncService, - SandboxService, - DsProcessorService, - DynamicDsService, - UnfinalizedBlocksService, - ConnectionPoolStateManager, - ConnectionPoolService, { provide: 'IProjectService', useClass: ProjectService, }, - ApiService, - SchedulerRegistry, - TestRunner, { - provide: 'IApi', - useExisting: ApiService, + provide: 'APIService', + useFactory: ApiService.create, + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], + }, + { + provide: 'IBlockchainService', + useClass: BlockchainService, }, + TestRunner, { provide: 'IIndexerManager', useClass: IndexerManager, @@ -65,6 +54,7 @@ export class TestingFeatureModule {} ConfigureModule.register(), EventEmitterModule.forRoot(), ScheduleModule.forRoot(), + TestingCoreModule, TestingFeatureModule, ], controllers: [], diff --git a/packages/node/src/subcommands/testing.service.ts b/packages/node/src/subcommands/testing.service.ts index 1f9677eab4..e4c3aa4fad 100644 --- a/packages/node/src/subcommands/testing.service.ts +++ b/packages/node/src/subcommands/testing.service.ts @@ -9,13 +9,10 @@ import { TestingService as BaseTestingService, NestLogger, TestRunner, - IBlock, + ProjectService, } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; import { SubqueryProject } from '../configure/SubqueryProject'; -import { ApiService } from '../indexer/api.service'; -import { IndexerManager } from '../indexer/indexer.manager'; -import { ProjectService } from '../indexer/project.service'; import { ApiAt, BlockContent, LightBlockContent } from '../indexer/types'; import { TestingModule } from './testing.module'; @@ -49,30 +46,8 @@ export class TestingService extends BaseTestingService< await testContext.init(); const projectService: ProjectService = testContext.get('IProjectService'); - const apiService = testContext.get(ApiService); - - // Initialise async services, we do this here rather than in factories, so we can capture one off events - await apiService.init(); await projectService.init(); return [testContext.close.bind(testContext), testContext.get(TestRunner)]; } - - async indexBlock( - block: IBlock, - handler: string, - indexerManager: IndexerManager, - apiService: ApiService, - ): Promise { - const runtimeVersion = - await apiService.unsafeApi.rpc.state.getRuntimeVersion( - block.getHeader().blockHash, - ); - - await indexerManager.indexBlock( - block, - this.getDsWithHandler(handler), - runtimeVersion, - ); - } }