Skip to content

Commit

Permalink
fix: fall back to eth_getLogs in event sync (#1432)
Browse files Browse the repository at this point in the history
* chore: upgrade viem to v1.12.2

* fix: sync historical events with getContractEvents

* feat: detect filter support

* chore: update changeset
  • Loading branch information
horsefacts authored Oct 2, 2023
1 parent 32bef61 commit ef795c7
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 78 deletions.
6 changes: 6 additions & 0 deletions .changeset/great-parents-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@farcaster/core": patch
"@farcaster/hubble": patch
---

upgrade viem to 1.12.2
5 changes: 5 additions & 0 deletions .changeset/khaki-dots-cheer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fall back to eth_getLogs in event sync
2 changes: 1 addition & 1 deletion apps/hubble/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,6 @@
"semver": "^7.5.2",
"tar": "^6.1.15",
"tiny-typed-emitter": "~2.1.0",
"viem": "^1.1.4"
"viem": "^1.12.2"
}
}
112 changes: 86 additions & 26 deletions apps/hubble/src/eth/l2EventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ import { IdRegistryV2, KeyRegistry, StorageRegistry } from "./abis.js";
import { HubInterface } from "../hubble.js";
import { logger } from "../utils/logger.js";
import { optimismGoerli } from "viem/chains";
import { createPublicClient, fallback, http, Log, OnLogsParameter, PublicClient } from "viem";
import {
createPublicClient,
fallback,
http,
Log,
WatchContractEventOnLogsParameter,
PublicClient,
Hex,
FallbackTransport,
} from "viem";
import { WatchContractEvent } from "./watchContractEvent.js";
import { WatchBlockNumber } from "./watchBlockNumber.js";
import { ExtractAbiEvent } from "abitype";
import { Abi, ExtractAbiEvent } from "abitype";
import { onChainEventSorter } from "../storage/db/onChainEvent.js";
import { formatPercentage } from "../profile/profile.js";
import { addProgressBar } from "../utils/progressBars.js";
Expand All @@ -46,13 +55,14 @@ const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year
*/
export class L2EventsProvider {
private _hub: HubInterface;
private _publicClient: PublicClient;
private _publicClient: PublicClient<FallbackTransport>;

private _firstBlock: number;
private _chunkSize: number;
private _chainId: number;
private _rentExpiry: number;
private _resyncEvents: boolean;
private _useFilters: boolean;

private _onChainEventsByBlock: Map<number, Array<OnChainEvent>>;
private _retryDedupMap: Map<number, boolean>;
Expand Down Expand Up @@ -87,7 +97,7 @@ export class L2EventsProvider {

constructor(
hub: HubInterface,
publicClient: PublicClient,
publicClient: PublicClient<FallbackTransport>,
storageRegistryAddress: `0x${string}` | undefined,
keyRegistryAddress: `0x${string}` | undefined,
idRegistryAddress: `0x${string}` | undefined,
Expand All @@ -103,6 +113,7 @@ export class L2EventsProvider {
this._chunkSize = chunkSize;
this._chainId = chainId;
this._resyncEvents = resyncEvents;
this._useFilters = false;
this._rentExpiry = expiryOverride ?? RENT_EXPIRY_IN_SECONDS;

this._lastBlockNumber = 0;
Expand Down Expand Up @@ -189,10 +200,7 @@ export class L2EventsProvider {
/* Private Methods */
/* -------------------------------------------------------------------------- */

private async processStorageEvents(
// biome-ignore lint/suspicious/noExplicitAny: workaround viem bug
logs: OnLogsParameter<any, true, string>,
) {
private async processStorageEvents(logs: WatchContractEventOnLogsParameter<Abi, string, true>) {
for (const event of logs) {
const { blockNumber, blockHash, transactionHash, transactionIndex, logIndex } = event;

Expand All @@ -214,6 +222,7 @@ export class L2EventsProvider {
const rentEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof StorageRegistry.abi, "Rent">,
true,
typeof StorageRegistry.abi
Expand Down Expand Up @@ -251,10 +260,7 @@ export class L2EventsProvider {
}
}

private async processKeyRegistryEvents(
// biome-ignore lint/suspicious/noExplicitAny: workaround viem bug
logs: OnLogsParameter<any, true, string>,
) {
private async processKeyRegistryEvents(logs: WatchContractEventOnLogsParameter<Abi, string, true>) {
for (const event of logs) {
const { blockNumber, blockHash, transactionHash, transactionIndex, logIndex } = event;

Expand All @@ -275,6 +281,7 @@ export class L2EventsProvider {
const addEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof KeyRegistry.abi, "Add">,
true,
typeof KeyRegistry.abi
Expand All @@ -300,6 +307,7 @@ export class L2EventsProvider {
const removeEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof KeyRegistry.abi, "Remove">,
true,
typeof KeyRegistry.abi
Expand All @@ -322,6 +330,7 @@ export class L2EventsProvider {
const resetEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof KeyRegistry.abi, "AdminReset">,
true,
typeof KeyRegistry.abi
Expand All @@ -344,6 +353,7 @@ export class L2EventsProvider {
const migratedEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof KeyRegistry.abi, "Migrated">,
true,
typeof KeyRegistry.abi
Expand All @@ -357,7 +367,9 @@ export class L2EventsProvider {
transactionIndex,
logIndex,
undefined,
SignerMigratedEventBody.create({ migratedAt: Number(migratedEvent.args.keysMigratedAt) }),
SignerMigratedEventBody.create({
migratedAt: Number(migratedEvent.args.keysMigratedAt),
}),
);
}
} catch (e) {
Expand All @@ -367,10 +379,7 @@ export class L2EventsProvider {
}
}

private async processIdRegistryEvents(
// biome-ignore lint/suspicious/noExplicitAny: workaround viem bug
logs: OnLogsParameter<any, true, string>,
) {
private async processIdRegistryEvents(logs: WatchContractEventOnLogsParameter<Abi, string, true>) {
for (const event of logs) {
const { blockNumber, blockHash, transactionHash, transactionIndex, logIndex } = event;

Expand All @@ -391,6 +400,7 @@ export class L2EventsProvider {
const registerEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof IdRegistryV2.abi, "Register">,
true,
typeof IdRegistryV2.abi
Expand All @@ -416,6 +426,7 @@ export class L2EventsProvider {
const transferEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof IdRegistryV2.abi, "Transfer">,
true,
typeof IdRegistryV2.abi
Expand All @@ -441,6 +452,7 @@ export class L2EventsProvider {
const transferEvent = event as Log<
bigint,
number,
boolean,
ExtractAbiEvent<typeof IdRegistryV2.abi, "ChangeRecoveryAddress">,
true,
typeof IdRegistryV2.abi
Expand Down Expand Up @@ -507,8 +519,12 @@ export class L2EventsProvider {
if (lastSyncedBlock < toBlock) {
log.info({ fromBlock: lastSyncedBlock, toBlock }, "syncing events from missed blocks");

// Check if filters are supported, reduce batch size if necessary
await this.detectFilterSupport();
const chunkSize = this._useFilters ? this._chunkSize : 250;

// Sync old Rent events
await this.syncHistoricalEvents(lastSyncedBlock, toBlock, this._chunkSize);
await this.syncHistoricalEvents(lastSyncedBlock, toBlock, chunkSize);
}

this._isHistoricalSyncDone = true;
Expand Down Expand Up @@ -643,34 +659,29 @@ export class L2EventsProvider {
progressBar?.update(Math.max(nextFromBlock - fromBlock - 1, 0));
statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock));

const idFilter = await this._publicClient.createContractEventFilter({
const idLogsPromise = this.getContractEvents({
address: this.idRegistryAddress,
abi: IdRegistryV2.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
const idLogsPromise = this._publicClient.getFilterLogs({ filter: idFilter });

const storageFilter = await this._publicClient.createContractEventFilter({
const storageLogsPromise = this.getContractEvents({
address: this.storageRegistryAddress,
abi: StorageRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
const storageLogsPromise = this._publicClient.getFilterLogs({
filter: storageFilter,
});

const keyFilter = await this._publicClient.createContractEventFilter({
const keyLogsPromise = this.getContractEvents({
address: this.keyRegistryAddress,
abi: KeyRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
const keyLogsPromise = this._publicClient.getFilterLogs({ filter: keyFilter });

await this.processIdRegistryEvents(await idLogsPromise);
await this.processStorageEvents(await storageLogsPromise);
Expand All @@ -681,6 +692,55 @@ export class L2EventsProvider {
progressBar?.stop();
}

/** Wrapper around Viem client getFilterLogs/getContractEvents. Uses filters
* when supported, otherwise falls back to getContractEvents.
*/
private async getContractEvents(params: {
address: Hex;
abi: Abi;
fromBlock: bigint;
toBlock: bigint;
strict: boolean;
}) {
if (this._useFilters) {
const filter = await this._publicClient.createContractEventFilter(params);
return this._publicClient.getFilterLogs({
filter,
});
} else {
return this._publicClient.getContractEvents(params);
}
}

/** Detect whether the configured RPC provider supports filters */
private async detectFilterSupport() {
// Set up a client with fewer retries and shorter timeout
const urls: string[] = [];
this._publicClient.transport["transports"].forEach((transport) => {
if (transport?.value) {
urls.push(transport.value["url"]);
}
});
const transports = urls.map((url) => http(url, { retryCount: 1, timeout: 1000 }));
const testClient = createPublicClient({
chain: optimismGoerli,
transport: fallback(transports),
});

// Handling: intentionally catch to test for filter support
try {
await testClient.createEventFilter({
fromBlock: BigInt(1),
toBlock: BigInt(1),
});
this._useFilters = true;
log.info("RPC provider supports filters. Using eth_getFilterLogs");
} catch (err) {
this._useFilters = false;
log.info("RPC provider does not support filters. Falling back to eth_getLogs");
}
}

/** Handle a new block. Processes all events in the cache that have now been confirmed */
private async handleNewBlock(blockNumber: number) {
// Don't let multiple blocks be handled at once
Expand Down
8 changes: 6 additions & 2 deletions apps/hubble/src/test/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DeployContractParameters, createTestClient, createWalletClient, custom } from "viem";
import { Chain, localhost } from "viem/chains";
import { createPublicClient, http } from "viem";
import { createPublicClient, http, fallback } from "viem";
import { Abi } from "abitype";
import { accounts, localHttpUrl } from "./constants.js";
import { IdRegistry, NameRegistry, StorageRegistry } from "../eth/abis.js";
Expand Down Expand Up @@ -73,7 +73,11 @@ export const httpClient = createPublicClient({
transport: http(localHttpUrl),
});

export const publicClient = httpClient;
export const publicClient = createPublicClient({
chain: anvilChain,
pollingInterval: 1_000,
transport: fallback([http(localHttpUrl)]),
});

export const testClient = createTestClient({
chain: anvilChain,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"ffi-napi": "^4.0.3",
"neverthrow": "^6.0.0",
"ref-napi": "^3.0.3",
"viem": "^1.1.4"
"viem": "^1.12.2"
},
"scripts": {
"build": "tsup --config tsup.config.ts",
Expand Down
Loading

0 comments on commit ef795c7

Please sign in to comment.