Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockchain service #2517

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {DatasourceParams, Header, IBaseIndexerWorker, IBlock, ISubqueryProject} from './indexer';

// TODO probably need to split this in 2 to have a worker specific subset

export interface ICoreBlockchainService<
DS extends BaseDataSource = BaseDataSource,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>
> {
/* The semver of the node */
packageVersion: string;

// Project service
onProjectChange(project: SubQueryProject): Promise<void> | void;
/* Not all networks have a block timestamp, e.g. Shiden */
getBlockTimestamp(height: number): Promise<Date | undefined>;
}

export interface IBlockchainService<
DS extends BaseDataSource = BaseDataSource,
CDS extends DS & BaseCustomDataSource = BaseCustomDataSource & DS,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>,
SafeAPI = any,
LightBlock = any,
FullBlock = any,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker
> extends ICoreBlockchainService<DS, SubQueryProject> {
blockHandlerKind: string;
// TODO SubqueryProject methods

// Block dispatcher service
fetchBlocks(blockNums: number[]): Promise<IBlock<LightBlock>[] | IBlock<FullBlock>[]>; // TODO this probably needs to change to get light block type correct
/* This is the worker equivalent of fetchBlocks, it provides a context to allow syncing anything between workers */
fetchBlockWorker(worker: Worker, blockNum: number, context: {workers: Worker[]}): Promise<void>;

// Project service
// onProjectChange(project: SubQueryProject): Promise<void> | void;
// /* Not all networks have a block timestamp, e.g. Shiden */
// getBlockTimestamp(height: number): Promise<Date | undefined>;

// Fetch service
/**
* The finalized header. If the chain doesn't have concrete finalization this could be a probablilistic finalization
* */
getFinalizedHeader(): Promise<Header>;
/**
* Gets the latest height of the chain, this should be unfinalized.
* Or if the chain has instant finalization this would be the same as the finalized height.
* */
getBestHeight(): Promise<number>;
/**
* The chain interval in milliseconds, if it is not consistent then provide a best estimate
* */
getChainInterval(): Promise<number>;

// Unfinalized blocks
getHeaderForHash(hash: string): Promise<Header>;
getHeaderForHeight(height: number): Promise<Header>;

// Dynamic Ds sevice
/**
* Applies and validates parameters to a template DS
* */
updateDynamicDs(params: DatasourceParams, template: DS | CDS): Promise<void>;

isCustomDs: (x: DS | CDS) => x is CDS;
isRuntimeDs: (x: DS | CDS) => x is DS;

// Indexer manager
/**
* Gets an API instance to a specific height so any state queries return data as represented at that height.
* */
getSafeApi(block: LightBlock | FullBlock): Promise<SafeAPI>;
}
1 change: 1 addition & 0 deletions packages/node-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './indexer';
export * from './subcommands';
export * from './yargs';
export * from './admin';
export * from './blockchain.service';
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export type ProcessBlockResponse = {
};

export interface IBlockDispatcher<B> {
init(onDynamicDsCreated: (height: number) => void): Promise<void>;
// now within enqueueBlock should handle getLatestBufferHeight
enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight: number): void | Promise<void>;
queueSize: number;
Expand Down
26 changes: 17 additions & 9 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {getHeapStatistics} from 'v8';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {IBlockchainService} from '../../blockchain.service';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
Expand All @@ -15,8 +17,8 @@ import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher';
import {IIndexerManager, IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher} from './base-block-dispatcher';

const logger = getLogger('BlockDispatcherService');

Expand All @@ -25,7 +27,7 @@ type BatchBlockFetcher<B> = (heights: number[]) => Promise<IBlock<B>[]>;
/**
* @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing
*/
export abstract class BlockDispatcher<B, DS>
export class BlockDispatcher<B, DS extends BaseDataSource>
extends BaseBlockDispatcher<Queue<IBlock<B> | number>, DS, B>
implements OnApplicationShutdown
{
Expand All @@ -37,8 +39,6 @@ export abstract class BlockDispatcher<B, DS>
private fetching = false;
private isShutdown = false;

protected abstract indexBlock(block: IBlock<B>): Promise<ProcessBlockResponse>;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
Expand All @@ -48,7 +48,8 @@ export abstract class BlockDispatcher<B, DS>
storeCacheService: StoreCacheService,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
fetchBlocksBatches: BatchBlockFetcher<B>
blockchainService: IBlockchainService<DS>,
private indexerManager: IIndexerManager<B, DS>
) {
super(
nodeConfig,
Expand All @@ -64,9 +65,13 @@ export abstract class BlockDispatcher<B, DS>
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch');
if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches');
this.fetchBlocksBatches = profilerWrap(
blockchainService.fetchBlocks.bind(blockchainService),
'BlockDispatcher',
'fetchBlocksBatches'
);
} else {
this.fetchBlocksBatches = fetchBlocksBatches;
this.fetchBlocksBatches = blockchainService.fetchBlocks.bind(blockchainService);
}
}

Expand Down Expand Up @@ -164,7 +169,10 @@ export abstract class BlockDispatcher<B, DS>
await this.preProcessBlock(blockHeight);
monitorWrite(`Processing from main thread`);
// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexBlock(block);
const processBlockResponse = await this.indexerManager.indexBlock(
block,
await this.projectService.getDataSources(block.getHeader().blockHeight)
);
await this.postProcessBlock(blockHeight, processBlockResponse);
//set block to null for garbage collection
(block as any) = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
// SPDX-License-Identifier: GPL-3.0

import {EventEmitter2} from '@nestjs/event-emitter';
import {IBlockchainService} from '../../blockchain.service';
import {IProjectUpgradeService, NodeConfig} from '../../configure';
import {ConnectionPoolStateManager} from '../connectionPoolState.manager';
import {DynamicDsService} from '../dynamic-ds.service';
import {InMemoryCacheService} from '../inMemoryCache.service';
import {PoiSyncService} from '../poi';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IProjectService, ISubqueryProject} from '../types';
import {UnfinalizedBlocksService} from '../unfinalizedBlocks.service';
import {WorkerBlockDispatcher} from './worker-block-dispatcher';

class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any> {
class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any, any> {
async fetchBlock(worker: any, height: number): Promise<void> {
return Promise.resolve();
}
Expand All @@ -19,7 +24,7 @@ class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any> {
}
}
describe('WorkerBlockDispatcher', () => {
let dispatcher: WorkerBlockDispatcher<any, any, any>;
let dispatcher: WorkerBlockDispatcher<any, any, any, any>;

// Mock workers
const mockWorkers = [
Expand All @@ -36,9 +41,15 @@ describe('WorkerBlockDispatcher', () => {
null as unknown as IProjectUpgradeService,
null as unknown as StoreService,
null as unknown as StoreCacheService,
null as unknown as InMemoryCacheService,
null as unknown as PoiSyncService,
null as unknown as DynamicDsService,
null as unknown as UnfinalizedBlocksService<any>,
null as unknown as ConnectionPoolStateManager<any>,
null as unknown as ISubqueryProject,
null as unknown as () => Promise<any>
null as unknown as IBlockchainService<any>,
'',
[]
);
(dispatcher as any).workers = mockWorkers;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,27 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import {OnApplicationShutdown} from '@nestjs/common';
import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {last} from 'lodash';
import {IApiConnectionSpecific} from '../../api.service';
import {IBlockchainService} from '../../blockchain.service';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {IBlock, PoiSyncService} from '../../indexer';
import {
ConnectionPoolStateManager,
createIndexerWorker,
DynamicDsService,
IBaseIndexerWorker,
IBlock,
InMemoryCacheService,
PoiSyncService,
TerminateableWorker,
UnfinalizedBlocksService,
} from '../../indexer';
import {getLogger} from '../../logger';
import {monitorWrite} from '../../process';
import {AutoQueue, isTaskFlushedError} from '../../utils';
Expand All @@ -22,15 +35,6 @@ import {BaseBlockDispatcher} from './base-block-dispatcher';

const logger = getLogger('WorkerBlockDispatcherService');

type Worker = {
processBlock: (height: number) => Promise<any>;
getStatus: () => Promise<any>;
getMemoryLeft: () => Promise<number>;
getBlocksLoaded: () => Promise<number>;
waitForWorkerBatchSize: (heapSizeInBytes: number) => Promise<void>;
terminate: () => Promise<number>;
};

function initAutoQueue<T>(
workers: number | undefined,
batchSize: number,
Expand All @@ -41,27 +45,39 @@ function initAutoQueue<T>(
return new AutoQueue(workers * batchSize * 2, 1, timeout, name);
}

export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
extends BaseBlockDispatcher<AutoQueue<void>, DS, B>
@Injectable()
export class WorkerBlockDispatcher<
DS extends BaseDataSource = BaseDataSource,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker,
Block = any,
ApiConn extends IApiConnectionSpecific = IApiConnectionSpecific
>
extends BaseBlockDispatcher<AutoQueue<void>, DS, Block>
implements OnApplicationShutdown
{
protected workers: W[] = [];
protected workers: TerminateableWorker<Worker>[] = [];
private numWorkers: number;
private isShutdown = false;
private currentWorkerIndex = 0;

protected abstract fetchBlock(worker: W, height: number): Promise<void>;
private createWorker: () => Promise<TerminateableWorker<Worker>>;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
projectService: IProjectService<DS>,
projectUpgradeService: IProjectUpgradeService,
@Inject('IProjectService') projectService: IProjectService<DS>,
@Inject('IProjectUpgradeService') projectUpgradeService: IProjectUpgradeService,
storeService: StoreService,
storeCacheService: StoreCacheService,
cacheService: InMemoryCacheService,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
private createIndexerWorker: () => Promise<W>,
dynamicDsService: DynamicDsService<DS>,
unfinalizedBlocksService: UnfinalizedBlocksService<Block>,
connectionPoolState: ConnectionPoolStateManager<ApiConn>,
@Inject('ISubqueryProject') project: ISubqueryProject,
@Inject('IBlockchainService') private blockchainService: IBlockchainService<DS>,
workerPath: string,
workerFns: Parameters<typeof createIndexerWorker<Worker, ApiConn, Block, DS>>[1],
monitorService?: MonitorServiceInterface
) {
super(
Expand All @@ -76,13 +92,27 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
poiSyncService,
monitorService
);

this.createWorker = () =>
createIndexerWorker<Worker, ApiConn, Block, DS>(
workerPath,
workerFns,
storeService.getStore(),
cacheService.getCache(),
dynamicDsService,
unfinalizedBlocksService,
connectionPoolState,
project.root,
projectService.startHeight,
monitorService
);
// initAutoQueue will assert that workers is set. unfortunately we cant do anything before the super call
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.numWorkers = nodeConfig.workers!;
}

async init(onDynamicDsCreated: (height: number) => void): Promise<void> {
this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createIndexerWorker()));
this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createWorker()));
return super.init(onDynamicDsCreated);
}

Expand All @@ -96,7 +126,8 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
await Promise.all(this.workers.map((w) => w.terminate()));
}
}
async enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): Promise<void> {

async enqueueBlocks(heights: (IBlock<Block> | number)[], latestBufferHeight?: number): Promise<void> {
assert(
heights.every((h) => typeof h === 'number'),
'Worker block dispatcher only supports enqueuing numbers, not blocks.'
Expand Down Expand Up @@ -147,7 +178,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>

await worker.waitForWorkerBatchSize(this.minimumHeapLimit);

const pendingBlock = this.fetchBlock(worker, height);
const pendingBlock = this.blockchainService.fetchBlockWorker(worker, height, {workers: this.workers});

const processBlock = async () => {
try {
Expand Down
Loading
Loading