Skip to content

Commit

Permalink
WIP implement blockchain service
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Aug 1, 2024
1 parent 7c108b6 commit 65fad83
Show file tree
Hide file tree
Showing 51 changed files with 807 additions and 1,093 deletions.
64 changes: 64 additions & 0 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {DatasourceParams, Header, IBlock, ISubqueryProject} from './indexer';

// TODO probably need to split this in 2 to have a worker specific subset

export interface IBlockchainService<
DS extends BaseDataSource = BaseDataSource,
CDS extends DS & BaseCustomDataSource = BaseCustomDataSource & DS,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>,
SafeAPI = any,
LightBlock = any,
FullBlock = any,
> {
/* The semver of the node */
packageVersion: string;

blockHandlerKind: string;
// TODO SubqueryProject methods

// Block dispatcher service
fetchBlocks(blockNums: number[]): Promise<IBlock<LightBlock>[] | IBlock<FullBlock>[]>; // TODO this probably needs to change to get light block type correct

// Project service
onProjectChange(project: SubQueryProject): Promise<void> | void;
/* Not all networks have a block timestamp, e.g. Shiden */
getBlockTimestamp(height: number): Promise<Date | undefined>;

// Fetch service
/**
* The finalized header. If the chain doesn't have concrete finalization this could be a probablilistic finalization
* */
getFinalizedHeader(): Promise<Header>;
/**
* Gets the latest height of the chain, this should be unfinalized.
* Or if the chain has instant finalization this would be the same as the finalized height.
* */
getBestHeight(): Promise<number>;
/**
* The chain interval in milliseconds, if it is not consistent then provide a best estimate
* */
getChainInterval(): Promise<number>;

// Unfinalized blocks
getHeaderForHash(hash: string): Promise<Header>;
getHeaderForHeight(height: number): Promise<Header>;

// Dynamic Ds sevice
/**
* Applies and validates parameters to a template DS
* */
updateDynamicDs(params: DatasourceParams, template: DS | CDS): Promise<void>;

isCustomDs: (x: DS | CDS) => x is CDS;
isRuntimeDs: (x: DS | CDS) => x is DS;

// Indexer manager
/**
* Gets an API instance to a specific height so any state queries return data as represented at that height.
* */
getSafeApi(block: LightBlock | FullBlock): Promise<SafeAPI>;
}
1 change: 1 addition & 0 deletions packages/node-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './indexer';
export * from './subcommands';
export * from './yargs';
export * from './admin';
export * from './blockchain.service';
26 changes: 17 additions & 9 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {getHeapStatistics} from 'v8';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {IBlockchainService} from '../../blockchain.service';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
Expand All @@ -15,8 +17,8 @@ import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';
import {IIndexerManager, IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher} from './base-block-dispatcher';

const logger = getLogger('BlockDispatcherService');

Expand All @@ -25,7 +27,7 @@ type BatchBlockFetcher<B> = (heights: number[]) => Promise<IBlock<B>[]>;
/**
* @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing
*/
export abstract class BlockDispatcher<B, DS>
export class BlockDispatcher<B, DS extends BaseDataSource>
extends BaseBlockDispatcher<Queue<IBlock<B> | number>, DS, B>
implements OnApplicationShutdown
{
Expand All @@ -37,8 +39,6 @@ export abstract class BlockDispatcher<B, DS>
private fetching = false;
private isShutdown = false;

protected abstract indexBlock(block: IBlock<B>): Promise<ProcessBlockResponse>;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
Expand All @@ -48,7 +48,8 @@ export abstract class BlockDispatcher<B, DS>
storeCacheService: StoreCacheService,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
fetchBlocksBatches: BatchBlockFetcher<B>
blockchainService: IBlockchainService<DS>,
private indexerManager: IIndexerManager<B, DS>
) {
super(
nodeConfig,
Expand All @@ -64,9 +65,13 @@ export abstract class BlockDispatcher<B, DS>
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch');
if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches');
this.fetchBlocksBatches = profilerWrap(
blockchainService.fetchBlocks.bind(blockchainService),
'BlockDispatcher',
'fetchBlocksBatches'
);
} else {
this.fetchBlocksBatches = fetchBlocksBatches;
this.fetchBlocksBatches = blockchainService.fetchBlocks.bind(blockchainService);
}
}

Expand Down Expand Up @@ -164,7 +169,10 @@ export abstract class BlockDispatcher<B, DS>
await this.preProcessBlock(blockHeight);
monitorWrite(`Processing from main thread`);
// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexBlock(block);
const processBlockResponse = await this.indexerManager.indexBlock(
block,
await this.projectService.getDataSources(block.getHeader().blockHeight)
);
await this.postProcessBlock(blockHeight, processBlockResponse);
//set block to null for garbage collection
(block as any) = null;
Expand Down
25 changes: 13 additions & 12 deletions packages/node-core/src/indexer/ds-processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import fs from 'fs';
import path from 'path';
import {Inject} from '@nestjs/common';
import {Inject, Injectable} from '@nestjs/common';
import {
HandlerInputMap,
BaseCustomDataSource,
Expand All @@ -14,6 +14,7 @@ import {
IProjectNetworkConfig,
} from '@subql/types-core';
import {VMScript} from 'vm2';
import {IBlockchainService} from '../blockchain.service';
import {NodeConfig} from '../configure';
import {getLogger} from '../logger';
import {Sandbox, SandboxOption} from './sandbox';
Expand All @@ -27,7 +28,7 @@ function isSecondLayerHandlerProcessor_0_0_0<
F extends Record<string, unknown>,
E,
API,
DS extends BaseCustomDataSource = BaseCustomDataSource
DS extends BaseCustomDataSource = BaseCustomDataSource,
>(
processor:
| SecondLayerHandlerProcessor_0_0_0<IM, K, F, E, DS, API>
Expand All @@ -43,7 +44,7 @@ function isSecondLayerHandlerProcessor_1_0_0<
F extends Record<string, unknown>,
E,
API,
DS extends BaseCustomDataSource = BaseCustomDataSource
DS extends BaseCustomDataSource = BaseCustomDataSource,
>(
processor:
| SecondLayerHandlerProcessor_0_0_0<IM, K, F, E, DS, API>
Expand All @@ -58,7 +59,7 @@ export function asSecondLayerHandlerProcessor_1_0_0<
F extends Record<string, unknown>,
E,
API,
DS extends BaseCustomDataSource = BaseCustomDataSource
DS extends BaseCustomDataSource = BaseCustomDataSource,
>(
processor:
| SecondLayerHandlerProcessor_0_0_0<IM, K, F, E, DS, API>
Expand Down Expand Up @@ -99,7 +100,7 @@ class DsPluginSandbox<P> extends Sandbox {
export function getDsProcessor<
P,
DS extends BaseDataSource = BaseDataSource,
CDS extends DS & BaseCustomDataSource = DS & BaseCustomDataSource
CDS extends DS & BaseCustomDataSource = DS & BaseCustomDataSource,
>(
ds: CDS,
isCustomDs: (ds: any) => boolean,
Expand Down Expand Up @@ -130,17 +131,17 @@ export function getDsProcessor<
return processorCache[ds.processor.file] as unknown as P;
}

export abstract class BaseDsProcessorService<
@Injectable()
export class DsProcessorService<
DS extends BaseDataSource = BaseDataSource,
CDS extends DS & BaseCustomDataSource = DS & BaseCustomDataSource,
P extends DsProcessor<CDS> = DsProcessor<CDS>
P extends DsProcessor<CDS> = DsProcessor<CDS>,
> {
private processorCache: Record<string, P> = {};

protected abstract isCustomDs(ds: DS): ds is CDS;

constructor(
@Inject('ISubqueryProject') private readonly project: ISubqueryProject<IProjectNetworkConfig, DS>,
@Inject('IBlockchainService') private blockchainService: IBlockchainService<DS, CDS>,
private readonly nodeConfig: NodeConfig
) {}

Expand All @@ -166,11 +167,11 @@ export abstract class BaseDsProcessorService<
}

async validateProjectCustomDatasources(dataSources: DS[]): Promise<void> {
await this.validateCustomDs(dataSources.filter((ds) => this.isCustomDs(ds)) as unknown as CDS[]);
await this.validateCustomDs(dataSources.filter((ds) => this.blockchainService.isCustomDs(ds)) as unknown as CDS[]);
}

getDsProcessor(ds: CDS): P {
if (!this.isCustomDs(ds)) {
if (!this.blockchainService.isCustomDs(ds)) {
throw new Error(`data source is not a custom data source`);
}
if (!this.processorCache[ds.processor.file]) {
Expand All @@ -194,7 +195,7 @@ export abstract class BaseDsProcessorService<

// eslint-disable-next-line @typescript-eslint/require-await
async getAssets(ds: CDS): Promise<Record<string, string>> {
if (!this.isCustomDs(ds)) {
if (!this.blockchainService.isCustomDs(ds)) {
throw new Error(`data source is not a custom data source`);
}

Expand Down
10 changes: 7 additions & 3 deletions packages/node-core/src/indexer/dynamic-ds.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {BaseDataSource} from '@subql/types-core';
import {IBlockchainService} from '../blockchain.service';
import {DatasourceParams, DynamicDsService} from './dynamic-ds.service';
import {CacheMetadataModel} from './storeCache';
import {ISubqueryProject} from './types';

class TestDynamicDsService extends DynamicDsService<DatasourceParams, ISubqueryProject> {
protected async getDatasource(params: DatasourceParams): Promise<DatasourceParams> {
return Promise.resolve(params);
class TestDynamicDsService extends DynamicDsService<BaseDataSource, ISubqueryProject> {
constructor(project: ISubqueryProject) {
super(project, {
updateDynamicDs: () => Promise.resolve(undefined), // Return the same value
} as unknown as IBlockchainService);
}

getTemplate<T extends Omit<NonNullable<ISubqueryProject['templates']>[number], 'name'> & {startBlock?: number}>(
Expand Down
25 changes: 21 additions & 4 deletions packages/node-core/src/indexer/dynamic-ds.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {Inject, Injectable} from '@nestjs/common';
import {BaseDataSource} from '@subql/types-core';
import {cloneDeep} from 'lodash';
import {IBlockchainService} from '../blockchain.service';
import {getLogger} from '../logger';
import {exitWithError} from '../process';
import {CacheMetadataModel} from './storeCache/cacheMetadata';
Expand All @@ -23,16 +26,18 @@ export interface IDynamicDsService<DS> {
getDynamicDatasources(forceReload?: boolean): Promise<DS[]>;
}

export abstract class DynamicDsService<DS, P extends ISubqueryProject = ISubqueryProject>
@Injectable()
export class DynamicDsService<DS extends BaseDataSource = BaseDataSource, P extends ISubqueryProject = ISubqueryProject>
implements IDynamicDsService<DS>
{
private _metadata?: CacheMetadataModel;
private _datasources?: DS[];
private _datasourceParams?: DatasourceParams[];

protected abstract getDatasource(params: DatasourceParams): Promise<DS>;

constructor(protected readonly project: P) {}
constructor(
@Inject('ISubqueryProject') private readonly project: P,
@Inject('IBlockchainService') private readonly blockchainService: IBlockchainService<DS>
) {}

async init(metadata: CacheMetadataModel): Promise<void> {
this._metadata = metadata;
Expand Down Expand Up @@ -124,4 +129,16 @@ export abstract class DynamicDsService<DS, P extends ISubqueryProject = ISubquer
const {name, ...template} = cloneDeep(t);
return {...template, startBlock} as T;
}

private async getDatasource(params: DatasourceParams): Promise<DS> {
const dsObj = this.getTemplate<any /*TODO DS*/>(params.templateName, params.startBlock);

try {
await this.blockchainService.updateDynamicDs(params, dsObj);

return dsObj;
} catch (e) {
throw new Error(`Unable to create dynamic datasource.\n ${(e as any).message}`);
}
}
}
9 changes: 6 additions & 3 deletions packages/node-core/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core';
import {range} from 'lodash';
import {
BaseUnfinalizedBlocksService,
UnfinalizedBlocksService,
BlockDispatcher,
delay,
Header,
IBlock,
IBlockDispatcher,
IProjectService,
NodeConfig,
IBlockchainService,
} from '../';
import {BlockHeightMap} from '../utils/blockHeightMap';
import {DictionaryService} from './dictionary/dictionary.service';
Expand Down Expand Up @@ -166,7 +167,8 @@ describe('Fetch Service', () => {
let dictionaryService: DictionaryService<any, any>;
let networkConfig: IProjectNetworkConfig;
let dataSources: BaseDataSource[];
let unfinalizedBlocksService: BaseUnfinalizedBlocksService<any>;
let unfinalizedBlocksService: UnfinalizedBlocksService<any>;
let blockchainService: IBlockchainService;

let spyOnEnqueueSequential: jest.SpyInstance<
void | Promise<void>,
Expand Down Expand Up @@ -216,7 +218,8 @@ describe('Fetch Service', () => {
metadata: {
set: jest.fn(),
},
} as any
} as any,
blockchainService
);

spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any;
Expand Down
Loading

0 comments on commit 65fad83

Please sign in to comment.