Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): add support for live sync from indexer #3226

Merged
merged 23 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/great-dragons-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@latticexyz/store-sync": patch
---

Added support for streaming logs from the indexer.
64 changes: 49 additions & 15 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import {
combineLatest,
scan,
mergeMap,
throwError,
} from "rxjs";
import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fromEventSource } from "./fromEventSource";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";

const debug = parentDebug.extend("createStoreSync");
Expand Down Expand Up @@ -61,7 +63,7 @@ export async function createStoreSync({
maxBlockRange,
initialState,
initialBlockLogs,
indexerUrl,
indexerUrl: indexerUrlInput,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: I tend to name these "initial"

Suggested change
indexerUrl: indexerUrlInput,
indexerUrl: initialIndexerUrl,

}: CreateStoreSyncOptions): Promise<SyncResult> {
const filters: SyncFilter[] =
initialFilters.length || tableIds.length
Expand All @@ -78,9 +80,17 @@ export async function createStoreSync({
)
: undefined;

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
const indexerUrl =
indexerUrlInput !== false
? indexerUrlInput ??
(publicClient.chain && "indexerUrl" in publicClient.chain && typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined;

const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());

const initialBlockLogs$ = defer(async (): Promise<StorageAdapterBlock | undefined> => {
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
Expand All @@ -95,15 +105,7 @@ export async function createStoreSync({
filters,
initialState,
initialBlockLogs,
indexerUrl:
indexerUrl !== false
? indexerUrl ??
(publicClient.chain &&
"indexerUrl" in publicClient.chain &&
typeof publicClient.chain.indexerUrl === "string"
? publicClient.chain.indexerUrl
: undefined)
: undefined,
indexerUrl,
});

onProgress?.({
Expand Down Expand Up @@ -199,7 +201,31 @@ export async function createStoreSync({
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
const storedIndexerLogs$ = indexerUrl
? startBlock$.pipe(
mergeMap((startBlock) => {
const url = new URL(
`api/logs-live?${new URLSearchParams({
input: JSON.stringify({ chainId, address, filters }),
block_num: startBlock.toString(),
include_tx_hash: "true",
})}`,
indexerUrl,
);
return fromEventSource<string>(url);
}),
map((messageEvent) => {
const parsedBlock = JSON.parse(messageEvent.data);
return { ...parsedBlock, blockNumber: BigInt(parsedBlock.blockNumber) } as StorageAdapterBlock;
}),
concatMap(async (block) => {
await storageAdapter(block);
return block;
}),
)
: throwError(() => new Error("No indexer URL provided"));
holic marked this conversation as resolved.
Show resolved Hide resolved

const storedRpcLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
Expand All @@ -215,13 +241,21 @@ export async function createStoreSync({
? bigIntMax(range.startBlock, lastBlockNumberProcessed + 1n)
: range.startBlock,
toBlock: range.endBlock,
storageAdapter,
logFilter,
storageAdapter,
});
holic marked this conversation as resolved.
Show resolved Hide resolved

return from(storedBlocks);
}),
tap(({ blockNumber, logs }) => {
);

const storedBlock$ = storedIndexerLogs$.pipe(
catchError((e) => {
debug("failed to stream logs from indexer:", e.message);
debug("falling back to streaming logs from RPC");
return storedRpcLogs$;
}),
Copy link
Member

@holic holic Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we tested the fallback behavior here? any way to make sure e2e tests cover this case like they do for the previous indexers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback behavior already kicks in if no indexer is provided or the existing indexer doesn't support this API, so in that sense it's covered by the existing e2e tests

tap(async ({ logs, blockNumber }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

Expand Down
10 changes: 10 additions & 0 deletions packages/store-sync/src/fromEventSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Observable } from "rxjs";

export function fromEventSource<T>(url: string | URL): Observable<MessageEvent<T>> {
return new Observable<MessageEvent>((subscriber) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (ev): void => subscriber.next(ev);
eventSource.onerror = (): void => subscriber.error(new Error("Event source closed: " + url));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure EventSource attempts to reconnect during an error (I think indefinitely) but unclear if it emits onerror during this reconnect flow. We might want to give it a bad URL and see if this emits a bunch of onerror and maybe allow a few reconnection attempts before giving up and closing the stream with subscriber.error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried that, it does indeed try to reconnect, but it seems like once it reconnects it skips the blocks that happened while the connection was down and just continues with the current block. I think we can solve this on the indexer side by using the block number as event id and also add code to handle reconnections with last_event_id, but would leave that as an optimization for later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah exactly, we need to update the indexer SSE stream to include an id and use the Last-Event-ID header for reconnections

Copy link
Member

@holic holic Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID would likely need to be block number + log index

return () => eventSource.close();
});
}
1 change: 1 addition & 0 deletions packages/store-sync/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./logToTable";
export * from "./tablesWithRecordsToLogs";
export * from "./tableToLog";
export * from "./recordToLog";
export * from "./logToRecord";
Loading