Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Sep 29, 2024
1 parent 91169a1 commit f3b112d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 79 deletions.
6 changes: 0 additions & 6 deletions processor/batch-processor/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import {HashAndHeight} from './database'
import {formatHead} from './util'

export class DatabaseNotSupportHotBlocksError extends Error {
constructor() {
super('database does not support hot blocks')
}
}

export class AlreadyIndexedBlockNotFoundError extends Error {
constructor(block: HashAndHeight) {
super(`already indexed block ${formatHead(block)} was not found on chain`)
Expand Down
34 changes: 13 additions & 21 deletions processor/batch-processor/src/run.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {AsyncQueue} from '@subsquid/util-internal'
import {FiniteRange, getSize} from '@subsquid/util-internal-range'
import assert from 'assert'
import assert, {AssertionError} from 'assert'
import expect from 'expect'
import {MockedObject, MockInstance, ModuleMocker} from 'jest-mock'
import {FinalTxInfo, HashAndHeight, HotDatabaseState, HotTxInfo} from './database'
import {DataSource} from './datasource'
import {DatabaseNotSupportHotBlocksError, FinalizedHeadBelowStateError} from './errors'
import {FinalizedHeadBelowStateError} from './errors'
import {BlockBase, Processor} from './run'

const mock = new ModuleMocker(global)
Expand Down Expand Up @@ -124,20 +124,13 @@ describe('processor', () => {
p = mock.mocked(new Processor(ds, db, batchHandler))

mock.spyOn(ds, 'getBlockStream')
mock.spyOn(db, 'transact')
mock.spyOn(p, 'run')

p.run().catch(console.error)
p.run().catch(() => {})

await waitForCallExact(ds.getBlockStream, 1)
})

it('throw on integrity error', async () => {
await ds.put(header('0x1', 2), [block('0x0', 0), block('0x1', 1), block('0x2', 2)])

await waitForResultExact(p.run, 1)

expect(p.run.mock.results[0].value).rejects.toThrow(new Error())
mock.spyOn(db, 'transact')
})

it('throw on consistency error', async () => {
Expand Down Expand Up @@ -226,12 +219,13 @@ describe('processor', () => {
p = mock.mocked(new Processor(ds, db, batchHandler))

mock.spyOn(ds, 'getBlockStream')
mock.spyOn(db, 'transact')
mock.spyOn(p, 'run')

p.run().catch(console.error)
p.run().catch(() => {})

await waitForCallExact(ds.getBlockStream, 1)

mock.spyOn(db, 'transact')
})

it('configure stream', async () => {
Expand Down Expand Up @@ -319,7 +313,7 @@ describe('processor', () => {

await waitForResultExact(p.run, 1)

expect(p.run.mock.results[0].value).rejects.toThrow(new DatabaseNotSupportHotBlocksError())
expect(p.run.mock.results[0].value).rejects.toThrow(AssertionError)
})
})

Expand All @@ -330,13 +324,14 @@ describe('processor', () => {
p = mock.mocked(new Processor(ds, db, batchHandler))

mock.spyOn(ds, 'getBlockStream')
mock.spyOn(db, 'transact')
mock.spyOn(db, 'transactHot2')
mock.spyOn(p, 'run')

p.run().catch(console.error)
p.run().catch(() => {})

await waitForCallExact(ds.getBlockStream, 1)

mock.spyOn(db, 'transact')
mock.spyOn(db, 'transactHot2')
})

it('configure stream', async () => {
Expand Down Expand Up @@ -484,10 +479,7 @@ describe('processor', () => {
{
baseHead: expect.objectContaining(header('0x1', 1)),
finalizedHead: expect.objectContaining(header('0x1', 1)),
newBlocks: [
expect.objectContaining(header('0x2a', 2)),
expect.objectContaining(header('0x3a', 3)),
],
newBlocks: [expect.objectContaining(header('0x2a', 2)), expect.objectContaining(header('0x3a', 3))],
},
expect.any(Function)
)
Expand Down
95 changes: 43 additions & 52 deletions processor/batch-processor/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {DataSource} from './datasource'
import {Metrics} from './metrics'
import {formatHead, getItemsCount} from './util'
import assert from 'assert'
import {AlreadyIndexedBlockNotFoundError, FinalizedHeadBelowStateError, DatabaseNotSupportHotBlocksError} from './errors'
import {AlreadyIndexedBlockNotFoundError, FinalizedHeadBelowStateError} from './errors'


const log = createLogger('sqd:batch-processor')
Expand Down Expand Up @@ -90,6 +90,20 @@ export class Processor<B extends BlockBase, S> {

async run(): Promise<void> {
let state = await this.getDatabaseState()

// remove all hot block to start from the finalized head
state = {...state, top: []}
if (this.db.supportsHotBlocks) {
await this.db.transactHot(
{
finalizedHead: state,
baseHead: state,
newBlocks: [],
},
async () => {}
)
}

if (state.height >= 0) {
log.info(`last processed final block was ${state.height}`)
}
Expand Down Expand Up @@ -173,52 +187,35 @@ export class Processor<B extends BlockBase, S> {
if (prevState.height === blocks[0].header.height && prevState.hash !== blocks[0].header.hash) {
throw new Error()
}

let prevHead: HashAndHeight = maybeLast(prevState.top) || prevState
let top: HashAndHeight[] = []

let baseHead: HashAndHeight = prevState
let lastBlock = last(blocks).header
let nextState: HotDatabaseState =
lastBlock.height < finalizedHead.height
? toDatabaseState(lastBlock)
: toDatabaseState(finalizedHead)

let baseHead = prevState as HashAndHeight
for (let block of prevState.top) {
if (block.height >= blocks[0].header.height) {
break
}
if (block.height > blocks[0].header.height) break
if (block.height === blocks[0].header.height && block.hash !== blocks[0].header.hash) break
baseHead = block
if (block.height <= nextState.height) continue
nextState.top.push(block)
}

if (block.height < finalizedHead.height) continue
if (block.height === finalizedHead.height) {
if (block.hash !== finalizedHead.hash) {
throw new Error()
}
continue
}

top.push(block)
for (let {header: block} of blocks) {
if (block.height <= finalizedHead.height) continue
nextState.top.push(block)
}

let nextHead: HashAndHeight
if (last(blocks).header.height >= finalizedHead.height) {
for (let {header: block} of blocks) {
if (block.height < finalizedHead.height) continue
if (block.height === finalizedHead.height) {
if (block.hash !== finalizedHead.hash) {
throw new Error()
}
continue
}

top.push(block)
}
let prevHead = maybeLast(prevState.top) || prevState
let nextHead = maybeLast(nextState.top) || nextState

nextHead = maybeLast(top) || finalizedHead
} else {
nextHead = last(blocks).header
}
if (baseHead.hash !== prevHead.hash) {
log.info(`navigating a fork between ${formatHead(prevHead)} to ${formatHead(nextHead)} with a common base ${formatHead(baseHead)}`)
}

let nextState: HotDatabaseState
if (top.length === 0 && baseHead.height === prevState.height && nextHead.height <= finalizedHead.height) {
if (nextHead.height === nextState.height && prevHead.height === prevState.height) {
await this.db.transact(
{
prevHead,
Expand All @@ -232,16 +229,8 @@ export class Processor<B extends BlockBase, S> {
})
}
)

nextState = {
height: nextHead.height,
hash: nextHead.hash,
top: [],
}
} else {
if (!this.db.supportsHotBlocks) {
throw new DatabaseNotSupportHotBlocksError()
}
assert(this.db.supportsHotBlocks, 'database does not support hot blocks')

let info: HotTxInfo = {
finalizedHead,
Expand Down Expand Up @@ -270,16 +259,10 @@ export class Processor<B extends BlockBase, S> {
return this.handler({
store,
blocks: [block],
isHead: nextState.height === ref.height,
isHead: nextHead.height === ref.height,
})
})
}

nextState = {
height: finalizedHead.height,
hash: finalizedHead.hash,
top,
}
}

return nextState
Expand Down Expand Up @@ -320,3 +303,11 @@ export class Processor<B extends BlockBase, S> {
}
}
}

function toDatabaseState(block: HashAndHeight): HotDatabaseState {
return {
height: block.height,
hash: block.hash,
top: [],
}
}

0 comments on commit f3b112d

Please sign in to comment.