Skip to content

Commit

Permalink
chore: move sql read/write type queries into correct classes
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x committed Jan 9, 2025
1 parent 01bb6fe commit d4b6fd3
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
6 changes: 3 additions & 3 deletions src/event-stream/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class EventStreamHandler {
const result = await this.db.ingestion.applyStackerDbChunk(sql, timestamp, chunk);
appliedSignerMessageResults.push(...result);
}
await this.db.updateLastIngestedRedisMsgId(sql, messageId);
await this.db.ingestion.updateLastIngestedRedisMsgId(sql, messageId);
});
this.logger.info(`Apply StackerDB chunks finished in ${time.getElapsedSeconds()}s`);

Expand All @@ -113,15 +113,15 @@ export class EventStreamHandler {
// TODO: wrap in sql transaction
const time = stopwatch();
await this.db.sqlWriteTransaction(async sql => {
const lastIngestedBlockHeight = await this.db.ingestion.getLastIngestedBlockHeight(sql);
const lastIngestedBlockHeight = await this.db.getLastIngestedBlockHeight(sql);
if (block.blockHeight <= lastIngestedBlockHeight) {
this.logger.info(`Skipping previously ingested block ${block.blockHeight}`);
return;
}
this.logger.info(`Apply block ${block.blockHeight}`);
await this.db.ingestion.applyBlock(sql, block);
await this.db.ingestion.updateChainTipBlockHeight(sql, block.blockHeight);
await this.db.updateLastIngestedRedisMsgId(sql, messageId);
await this.db.ingestion.updateLastIngestedRedisMsgId(sql, messageId);
});
this.logger.info(`Apply block ${block.blockHeight} finished in ${time.getElapsedSeconds()}s`);
}
Expand Down
5 changes: 2 additions & 3 deletions src/pg/ingestion/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ export class PgWriteStore extends BasePgStoreModule {
await sql`UPDATE chain_tip SET block_height = ${blockHeight}`;
}

async getLastIngestedBlockHeight(sql: PgSqlClient): Promise<number> {
const result = await sql<{ block_height: number }[]>`SELECT block_height FROM chain_tip`;
return result[0].block_height;
async updateLastIngestedRedisMsgId(sql: PgSqlClient, msgId: string): Promise<void> {
await sql`UPDATE chain_tip SET last_redis_msg_id = ${msgId}`;
}

async applyStackerDbChunk(
Expand Down
5 changes: 3 additions & 2 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ export class PgStore extends BasePgStore {
this.notifications = new NotificationPgStore(this, sql, this.ingestion.events);
}

async updateLastIngestedRedisMsgId(sql: PgSqlClient, msgId: string): Promise<void> {
await sql`UPDATE chain_tip SET last_redis_msg_id = ${msgId}`;
async getLastIngestedBlockHeight(sql: PgSqlClient): Promise<number> {
const result = await sql<{ block_height: number }[]>`SELECT block_height FROM chain_tip`;
return result[0].block_height;
}

public async getLastIngestedRedisMsgId(): Promise<string> {
Expand Down

0 comments on commit d4b6fd3

Please sign in to comment.