Skip to content

Commit

Permalink
Simplify worker block dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Aug 5, 2024
1 parent 65fad83 commit 8d0791c
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 223 deletions.
29 changes: 21 additions & 8 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,44 @@
// SPDX-License-Identifier: GPL-3.0

import {BaseCustomDataSource, BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {DatasourceParams, Header, IBlock, ISubqueryProject} from './indexer';
import {DatasourceParams, Header, IBaseIndexerWorker, IBlock, ISubqueryProject} from './indexer';

// TODO probably need to split this in 2 to have a worker specific subset

export interface ICoreBlockchainService<
DS extends BaseDataSource = BaseDataSource,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>
> {
/* The semver of the node */
packageVersion: string;

// Project service
onProjectChange(project: SubQueryProject): Promise<void> | void;
/* Not all networks have a block timestamp, e.g. Shiden */
getBlockTimestamp(height: number): Promise<Date | undefined>;
}

export interface IBlockchainService<
DS extends BaseDataSource = BaseDataSource,
CDS extends DS & BaseCustomDataSource = BaseCustomDataSource & DS,
SubQueryProject extends ISubqueryProject<IProjectNetworkConfig, DS> = ISubqueryProject<IProjectNetworkConfig, DS>,
SafeAPI = any,
LightBlock = any,
FullBlock = any,
> {
/* The semver of the node */
packageVersion: string;

Worker extends IBaseIndexerWorker = IBaseIndexerWorker
> extends ICoreBlockchainService<DS, SubQueryProject> {
blockHandlerKind: string;
// TODO SubqueryProject methods

// Block dispatcher service
fetchBlocks(blockNums: number[]): Promise<IBlock<LightBlock>[] | IBlock<FullBlock>[]>; // TODO this probably needs to change to get light block type correct
/* This is the worker equivalent of fetchBlocks, it provides a context to allow syncing anything between workers */
fetchBlockWorker(worker: Worker, blockNum: number, context: {workers: Worker[]}): Promise<void>;

// Project service
onProjectChange(project: SubQueryProject): Promise<void> | void;
/* Not all networks have a block timestamp, e.g. Shiden */
getBlockTimestamp(height: number): Promise<Date | undefined>;
// onProjectChange(project: SubQueryProject): Promise<void> | void;
// /* Not all networks have a block timestamp, e.g. Shiden */
// getBlockTimestamp(height: number): Promise<Date | undefined>;

// Fetch service
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export type ProcessBlockResponse = {
};

export interface IBlockDispatcher<B> {
init(onDynamicDsCreated: (height: number) => void): Promise<void>;
// now within enqueueBlock should handle getLatestBufferHeight
enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight: number): void | Promise<void>;
queueSize: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
// SPDX-License-Identifier: GPL-3.0

import {EventEmitter2} from '@nestjs/event-emitter';
import {IBlockchainService} from '../../blockchain.service';
import {IProjectUpgradeService, NodeConfig} from '../../configure';
import {ConnectionPoolStateManager} from '../connectionPoolState.manager';
import {DynamicDsService} from '../dynamic-ds.service';
import {InMemoryCacheService} from '../inMemoryCache.service';
import {PoiSyncService} from '../poi';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IProjectService, ISubqueryProject} from '../types';
import {UnfinalizedBlocksService} from '../unfinalizedBlocks.service';
import {WorkerBlockDispatcher} from './worker-block-dispatcher';

class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any> {
class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any, any> {
async fetchBlock(worker: any, height: number): Promise<void> {
return Promise.resolve();
}
Expand All @@ -19,7 +24,7 @@ class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any> {
}
}
describe('WorkerBlockDispatcher', () => {
let dispatcher: WorkerBlockDispatcher<any, any, any>;
let dispatcher: WorkerBlockDispatcher<any, any, any, any>;

// Mock workers
const mockWorkers = [
Expand All @@ -36,9 +41,15 @@ describe('WorkerBlockDispatcher', () => {
null as unknown as IProjectUpgradeService,
null as unknown as StoreService,
null as unknown as StoreCacheService,
null as unknown as InMemoryCacheService,
null as unknown as PoiSyncService,
null as unknown as DynamicDsService,
null as unknown as UnfinalizedBlocksService<any>,
null as unknown as ConnectionPoolStateManager<any>,
null as unknown as ISubqueryProject,
null as unknown as () => Promise<any>
null as unknown as IBlockchainService<any>,
'',
[]
);
(dispatcher as any).workers = mockWorkers;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,27 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import {OnApplicationShutdown} from '@nestjs/common';
import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {last} from 'lodash';
import {IApiConnectionSpecific} from '../../api.service';
import {IBlockchainService} from '../../blockchain.service';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {IBlock, PoiSyncService} from '../../indexer';
import {
ConnectionPoolStateManager,
createIndexerWorker,
DynamicDsService,
IBaseIndexerWorker,
IBlock,
InMemoryCacheService,
PoiSyncService,
TerminateableWorker,
UnfinalizedBlocksService,
} from '../../indexer';
import {getLogger} from '../../logger';
import {monitorWrite} from '../../process';
import {AutoQueue, isTaskFlushedError} from '../../utils';
Expand All @@ -22,15 +35,6 @@ import {BaseBlockDispatcher} from './base-block-dispatcher';

const logger = getLogger('WorkerBlockDispatcherService');

type Worker = {
processBlock: (height: number) => Promise<any>;
getStatus: () => Promise<any>;
getMemoryLeft: () => Promise<number>;
getBlocksLoaded: () => Promise<number>;
waitForWorkerBatchSize: (heapSizeInBytes: number) => Promise<void>;
terminate: () => Promise<number>;
};

function initAutoQueue<T>(
workers: number | undefined,
batchSize: number,
Expand All @@ -41,27 +45,39 @@ function initAutoQueue<T>(
return new AutoQueue(workers * batchSize * 2, 1, timeout, name);
}

export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
extends BaseBlockDispatcher<AutoQueue<void>, DS, B>
@Injectable()
export class WorkerBlockDispatcher<
DS extends BaseDataSource = BaseDataSource,
Worker extends IBaseIndexerWorker = IBaseIndexerWorker,
Block = any,
ApiConn extends IApiConnectionSpecific = IApiConnectionSpecific
>
extends BaseBlockDispatcher<AutoQueue<void>, DS, Block>
implements OnApplicationShutdown
{
protected workers: W[] = [];
protected workers: TerminateableWorker<Worker>[] = [];
private numWorkers: number;
private isShutdown = false;
private currentWorkerIndex = 0;

protected abstract fetchBlock(worker: W, height: number): Promise<void>;
private createWorker: () => Promise<TerminateableWorker<Worker>>;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
projectService: IProjectService<DS>,
projectUpgradeService: IProjectUpgradeService,
@Inject('IProjectService') projectService: IProjectService<DS>,
@Inject('IProjectUpgradeService') projectUpgradeService: IProjectUpgradeService,
storeService: StoreService,
storeCacheService: StoreCacheService,
cacheService: InMemoryCacheService,
poiSyncService: PoiSyncService,
project: ISubqueryProject,
private createIndexerWorker: () => Promise<W>,
dynamicDsService: DynamicDsService<DS>,
unfinalizedBlocksService: UnfinalizedBlocksService<Block>,
connectionPoolState: ConnectionPoolStateManager<ApiConn>,
@Inject('ISubqueryProject') project: ISubqueryProject,
@Inject('IBlockchainService') private blockchainService: IBlockchainService<DS>,
workerPath: string,
workerFns: Parameters<typeof createIndexerWorker<Worker, ApiConn, Block, DS>>[1],
monitorService?: MonitorServiceInterface
) {
super(
Expand All @@ -76,13 +92,27 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
poiSyncService,
monitorService
);

this.createWorker = () =>
createIndexerWorker<Worker, ApiConn, Block, DS>(
workerPath,
workerFns,
storeService.getStore(),
cacheService.getCache(),
dynamicDsService,
unfinalizedBlocksService,
connectionPoolState,
project.root,
projectService.startHeight,
monitorService
);
// initAutoQueue will assert that workers is set. unfortunately we cant do anything before the super call
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.numWorkers = nodeConfig.workers!;
}

async init(onDynamicDsCreated: (height: number) => void): Promise<void> {
this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createIndexerWorker()));
this.workers = await Promise.all(new Array(this.numWorkers).fill(0).map(() => this.createWorker()));
return super.init(onDynamicDsCreated);
}

Expand All @@ -96,7 +126,8 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
await Promise.all(this.workers.map((w) => w.terminate()));
}
}
async enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): Promise<void> {

async enqueueBlocks(heights: (IBlock<Block> | number)[], latestBufferHeight?: number): Promise<void> {
assert(
heights.every((h) => typeof h === 'number'),
'Worker block dispatcher only supports enqueuing numbers, not blocks.'
Expand Down Expand Up @@ -147,7 +178,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>

await worker.waitForWorkerBatchSize(this.minimumHeapLimit);

const pendingBlock = this.fetchBlock(worker, height);
const pendingBlock = this.blockchainService.fetchBlockWorker(worker, height, {workers: this.workers});

const processBlock = async () => {
try {
Expand Down
6 changes: 3 additions & 3 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {EventEmitter2} from '@nestjs/event-emitter';
import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {Sequelize} from '@subql/x-sequelize';
import {IApi} from '../api.service';
import {IBlockchainService} from '../blockchain.service';
import {IBlockchainService, ICoreBlockchainService} from '../blockchain.service';
import {IProjectUpgradeService, NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
Expand All @@ -35,7 +35,7 @@ class NotInitError extends Error {
export class ProjectService<
DS extends BaseDataSource = BaseDataSource,
API extends IApi = IApi,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>
> implements IProjectService<DS>
{
private _schema?: string;
Expand All @@ -55,7 +55,7 @@ export class ProjectService<
private readonly dynamicDsService: DynamicDsService<DS>,
private eventEmitter: EventEmitter2,
@Inject('IUnfinalizedBlocksService') private readonly unfinalizedBlockService: UnfinalizedBlocksService,
@Inject('IBlockchainService') private blockchainService: IBlockchainService<DS>
@Inject('IBlockchainService') private blockchainService: ICoreBlockchainService<DS>
) {
if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) {
throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time');
Expand Down
4 changes: 3 additions & 1 deletion packages/node-core/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ export function createWorkerHost<
);
}

export type TerminateableWorker<T extends IBaseIndexerWorker> = T & {terminate: () => Promise<number>};

export async function createIndexerWorker<
T extends IBaseIndexerWorker,
ApiConnection extends IApiConnectionSpecific<any, any, any> /*ApiPromiseConnection*/ /*ApiPromiseConnection*/,
Expand All @@ -189,7 +191,7 @@ export async function createIndexerWorker<
startHeight: number,
monitorService?: MonitorServiceInterface,
workerData?: any
): Promise<T & {terminate: () => Promise<number>}> {
): Promise<TerminateableWorker<T>> {
const indexerWorker = Worker.create<
T & {initWorker: (startHeight: number) => Promise<void>},
DefaultWorkerFunctions<ApiConnection, DS>
Expand Down
37 changes: 32 additions & 5 deletions packages/node/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { isCustomDs, isRuntimeDs } from '@subql/common-substrate';
import {
DatasourceParams,
Header,
IBaseIndexerWorker,
IBlock,
IBlockchainService,
mainThreadOnly,
Expand All @@ -30,6 +31,7 @@ import {
isFullBlock,
LightBlockContent,
} from './indexer/types';
import { IIndexerWorker } from './indexer/worker/worker';
import {
calcInterval,
getBlockByHeight,
Expand All @@ -55,20 +57,21 @@ export class BlockchainService
SubqueryProject,
ApiAt,
LightBlockContent,
BlockContent
BlockContent,
IIndexerWorker
>
{
constructor(
@Inject('APIService') private apiService: ApiService,
@Inject(isMainThread ? RuntimeService : 'Null')
private runtimeService: RuntimeService,
@Inject('RuntimeService') private runtimeService: RuntimeService,
) {}

isCustomDs = isCustomDs;
isRuntimeDs = isRuntimeDs;
blockHandlerKind = SubstrateHandlerKind.Block;
packageVersion = packageVersion;

@mainThreadOnly()
async fetchBlocks(
blockNums: number[],
): Promise<IBlock<BlockContent>[] | IBlock<LightBlockContent>[]> {
Expand All @@ -84,6 +87,29 @@ export class BlockchainService
);
}

async fetchBlockWorker(
worker: IIndexerWorker,
height: number,
context: { workers: IIndexerWorker[] },
): Promise<void> {
// get SpecVersion from main runtime service
const { blockSpecVersion, syncedDictionary } =
await this.runtimeService.getSpecVersion(height);

// if main runtime specVersion has been updated, then sync with all workers specVersion map, and lastFinalizedBlock
if (syncedDictionary) {
context.workers.map((w) =>
w.syncRuntimeService(
this.runtimeService.specVersionMap,
this.runtimeService.latestFinalizedHeight,
),
);
}

// const start = new Date();
await worker.fetchBlock(height, blockSpecVersion);
}

async onProjectChange(project: SubqueryProject): Promise<void> {
// Only network with chainTypes require to reload
await this.apiService.updateChainTypes();
Expand All @@ -98,8 +124,9 @@ export class BlockchainService
async getFinalizedHeader(): Promise<Header> {
const finalizedHash =
await this.apiService.unsafeApi.rpc.chain.getFinalizedHead();
const finalizedHeader =
await this.apiService.unsafeApi.rpc.chain.getHeader(finalizedHash);
const finalizedHeader = await this.apiService.unsafeApi.rpc.chain.getHeader(
finalizedHash,
);
return substrateHeaderToHeader(finalizedHeader);
}

Expand Down
7 changes: 0 additions & 7 deletions packages/node/src/indexer/blockDispatcher/index.ts

This file was deleted.

Loading

0 comments on commit 8d0791c

Please sign in to comment.