Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Sep 29, 2024
1 parent 4e34e4a commit 91169a1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
20 changes: 20 additions & 0 deletions processor/batch-processor/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {HashAndHeight} from './database'
import {formatHead} from './util'

export class DatabaseNotSupportHotBlocksError extends Error {
constructor() {
super('database does not support hot blocks')
}
}

export class AlreadyIndexedBlockNotFoundError extends Error {
constructor(block: HashAndHeight) {
super(`already indexed block ${formatHead(block)} was not found on chain`)
}
}

export class FinalizedHeadBelowStateError extends Error {
constructor(finalizedHead: HashAndHeight, state: HashAndHeight) {
super(`finalized head ${formatHead(finalizedHead)} can not be below state ${formatHead(state)}`)
}
}
35 changes: 12 additions & 23 deletions processor/batch-processor/src/run.test.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
import {AsyncQueue} from '@subsquid/util-internal'
import {FinalTxInfo, HashAndHeight, HotDatabaseState, HotTxInfo} from './database'
import {BlockBase, Processor, FinalizedHeadBelowStateError, DatabaseNotSupportHotBlocksError} from './run'
import assert from 'assert'
import {DataSource} from './datasource'
import {FiniteRange, getSize} from '@subsquid/util-internal-range'
import assert from 'assert'
import expect from 'expect'
import {Mock, MockedObject, MockInstance, ModuleMocker} from 'jest-mock'
import {AsyncFunc} from 'mocha'
import {MockedObject, MockInstance, ModuleMocker} from 'jest-mock'
import {FinalTxInfo, HashAndHeight, HotDatabaseState, HotTxInfo} from './database'
import {DataSource} from './datasource'
import {DatabaseNotSupportHotBlocksError, FinalizedHeadBelowStateError} from './errors'
import {BlockBase, Processor} from './run'

const mock = new ModuleMocker(global)

class MockDataSource implements DataSource<BlockBase> {
private queue = new AsyncQueue<{finalizedHead: HashAndHeight; blocks: BlockBase[]}>(1)
private _supportsHotBlocks = true

// private readyFuture = createFuture<void>()
// private nextFuture: Future<void> | undefined

private _isReady: boolean = false

get isReady() {
return this._isReady
}
private queue: AsyncQueue<{finalizedHead: HashAndHeight; blocks: BlockBase[]}> | undefined

async getBlockHash(): Promise<never> {
throw new Error()
Expand All @@ -35,12 +25,10 @@ class MockDataSource implements DataSource<BlockBase> {
return getSize([{from: 0}], range)
}

async *getBlockStream(opts: {supportHotBlocks?: boolean} = {}) {
assert(!this.isReady)

this._supportsHotBlocks = opts.supportHotBlocks ?? true
async *getBlockStream() {
assert(this.queue == null)

this._isReady = true
this.queue = new AsyncQueue(1)

for await (let {finalizedHead, blocks} of this.queue.iterate()) {
yield {
Expand All @@ -51,12 +39,13 @@ class MockDataSource implements DataSource<BlockBase> {
}

async put(finalizedHead: HashAndHeight, blocks: BlockBase[]) {
assert(this.isReady)
assert(this.queue != null)
await this.queue.put({finalizedHead, blocks})
}

close() {
this.queue?.close()
this.queue = undefined
}
}

Expand Down
24 changes: 1 addition & 23 deletions processor/batch-processor/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {DataSource} from './datasource'
import {Metrics} from './metrics'
import {formatHead, getItemsCount} from './util'
import assert from 'assert'
import {AlreadyIndexedBlockNotFoundError, FinalizedHeadBelowStateError, DatabaseNotSupportHotBlocksError} from './errors'


const log = createLogger('sqd:batch-processor')
Expand Down Expand Up @@ -319,26 +320,3 @@ export class Processor<B extends BlockBase, S> {
}
}
}


export class DatabaseNotSupportHotBlocksError extends Error {
constructor() {
super('database does not support hot blocks')
}
}


export class AlreadyIndexedBlockNotFoundError extends Error {
constructor(block: HashAndHeight) {
super(`already indexed block ${formatHead(block)} was not found on chain`)
}
}


export class FinalizedHeadBelowStateError extends Error {
constructor(finalizedHead: HashAndHeight, state: HashAndHeight) {
super(
`finalized head ${formatHead(finalizedHead)} can not be below state ${formatHead(state)}`
)
}
}

0 comments on commit 91169a1

Please sign in to comment.