Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos committed Sep 16, 2024
1 parent a6cb5e2 commit 778f3e9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 52 deletions.
71 changes: 30 additions & 41 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ Label-based CID discovery and full Piece Index sampling.

Let's implement a lightweight IPNI ingester that will process the advertisements
from Filecoin SPs and extract the list of `(ProviderID, PieceCID, PayloadCID)`
entries. Store these entries in a Postgres database. Provide a REST API endpoint
accepting `(ProviderID, PieceCID)` and returning a single `PayloadCID`.
entries. Store these entries in a persisted datastore (Redis). Provide a REST
API endpoint accepting `(ProviderID, PieceCID)` and returning a single
`PayloadCID`.

### Terminology

Expand Down Expand Up @@ -188,21 +189,21 @@ head is published before we finish processing the chain.

#### Proposed algorithm

**Persisted state (per provider)**
**Per-provider state**

Use the following per-provider state persisted in the database:
Use the following per-provider state:

- `providerId` - Primary key.

- Provider info obtained from IPNI
- Provider info obtained from IPNI - stored in memory only:

- `providerAddress` - Provider's address where we can fetch advertisements
from.

- `lastAdvertisementCID` - The CID of the most recent head seen by
cid.contact. This is where we need to start the next walk from.

- Provider walker state:
- Provider walker state - persisted in our datastore (Redis):

- `head` - The CID of the head advertisement we started the current walk from.
We update this value whenever we start a new walk.
Expand All @@ -218,15 +219,16 @@ Use the following per-provider state persisted in the database:
> walking the "old" chain, new advertisements (new heads) will be announced
> to IPNI.
>
> - `next_head` is the latest head announced to IPNI
> - `lastAdvertisementCID` is the latest head announced to IPNI
> - `head` is the advertisement where the current walk-in-progress started
>
> I suppose we don't need to keep track of `next_head`. When the current
> walk finishes, we will wait up to one minute until we make another request
> to cid.contact to find what are the latest heads for each SPs.
> I suppose we don't need to keep track of `lastAdvertisementCID`. When the
> current walk finishes, we could wait up to one minute until we make
> another request to cid.contact to find what are the latest heads for each
> SPs.
>
> In the current proposal, when the current walk finishes, we can
> immediately continue with walking from the `next_head`.
> immediately continue with walking from the `lastAdvertisementCID`.
We must always walk the chain all the way to the genesis or to the entry we have
already seen & processed.
Expand Down Expand Up @@ -276,27 +278,13 @@ Every minute, run the following high-level loop:
1. Fetch the list of providers and their latest advertisements (heads) from
https://cid.contact/providers. (This is **one** HTTP request.)

2. Fetch the state of all providers from our database. (This is **one** SQL
query.)

3. Update the state of each provider as described below, using the name
`LastAdvertisement` for the CID of the latest advertisement provided in the
response from cid.contact. (This is a small bit of computation plus **one**
SQL query.)
2. Update the in-memory info we keep for each provider (address, CID of the last
advertisement).

> **Note:** Instead of running the loop every minute, we can introduce a
> one-minute delay between the iterations instead. It should not matter too much
> in practice, though. I expect each iteration to finish within one minute:
>
> - The three steps outlined above require only three interactions over
> HTTP/SQL.
> - The long chain walks are executed in the background and don't block this
> loop.
For each provider listed in the response:

1. Update `providerAddress` and `next_head` with the values received from IPNI.
Persist this information in the database.
> in practice, though. I expect each iteration to finish within one minute, as
> it's just a single HTTP call to cid.contact.
**Walk advertisement chains (in background)**

Expand All @@ -305,27 +293,28 @@ steps.

1. Preparation

- If `tail` is not null, then there is an ongoing walk of the chain we need to
continue.
- If `tail` is not null, then there is an ongoing walk of the chain we need
to continue.

- Otherwise, if `nextHead` is the same as `lastHead`, then there are no new
advertisement to process and the walk immediately returns.
- Otherwise, if `nextHead` is the same as `lastHead`, then there are no new
advertisement to process and the walk immediately returns.

- Otherwise, we are starting a new walk. Update the walker state as follows:
- Otherwise, we are starting a new walk. Update the walker state as follows:

```
head := newHead
tail := newHead
```
```
head := newHead
tail := newHead
```
(`lastHead` does not change until we finish the walk.)
(`lastHead` does not change until we finish the walk.)
2. Take one step
1. Fetch the advertisement identified by `tail` from the index provider.
2. Process the metadata and entries to extract one `(PieceCID, PayloadCID)`
entry.
2. Process the metadata and entries to extract up to one
`(PieceCID, PayloadCID)` entry to be added to the index and `PreviousID`
linking to the next advertisement in the chain to process.
3. Update the worker state
Expand Down
14 changes: 7 additions & 7 deletions indexer/bin/piece-indexer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from 'assert'
import { Redis } from 'ioredis'
import { walkProviderChain } from '../lib/advertisement-walker.js'
import { walkChain } from '../lib/advertisement-walker.js'
import { runIpniSync } from '../lib/ipni-watcher.js'
import { RedisRepository } from '../lib/redis-repository.js'

Expand All @@ -25,8 +25,8 @@ const redis = new Redis({
await redis.connect()
const repository = new RedisRepository(redis)

/** @type {Map<string, boolean>} */
const providerWalkers = new Map()
/** @type {Set<string>} */
const providerIdsActivelyWalked = new Set()

/** @type {ProviderToInfoMap} */
const recentProvidersInfo = new Map()
Expand All @@ -43,16 +43,16 @@ const getProviderInfo = async (providerId) => {
for await (const providerInfos of runIpniSync({ minSyncIntervalInMs: 60_000 })) {
for (const [providerId, providerInfo] of providerInfos.entries()) {
recentProvidersInfo.set(providerId, providerInfo)
if (providerWalkers.get(providerId)) continue
if (providerIdsActivelyWalked.has(providerId)) continue

providerWalkers.set(providerId, true)
walkProviderChain({
providerIdsActivelyWalked.add(providerId)
walkChain({
repository,
providerId,
getProviderInfo,
minStepIntervalInMs: 100
}).finally(
() => providerWalkers.set(providerId, false)
() => providerIdsActivelyWalked.delete(providerId)
)
}
}
6 changes: 3 additions & 3 deletions indexer/lib/advertisement-walker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { assertOkResponse } from './http-assertions.js'
/** @import { ProviderInfo, WalkerState } from './typings.js' */
/** @import { RedisRepository as Repository } from './redis-repository.js' */

const debug = createDebug('spark-piece-indexer:observer')
const debug = createDebug('spark-piece-indexer:advertisement-walker')

/**
* @param {object} args
Expand All @@ -20,7 +20,7 @@ const debug = createDebug('spark-piece-indexer:observer')
* @param {number} args.minStepIntervalInMs
* @param {AbortSignal} [args.signal]
*/
export async function walkProviderChain ({
export async function walkChain ({
repository,
providerId,
getProviderInfo,
Expand Down Expand Up @@ -179,7 +179,7 @@ export async function processNextAdvertisement ({
}
}

console.error(
debug(
'Cannot process provider %s (%s) advertisement %s: %s',
providerId,
providerInfo.providerAddress,
Expand Down
2 changes: 1 addition & 1 deletion indexer/lib/ipni-watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import timers from 'node:timers/promises'
import { assertOkResponse } from './http-assertions.js'
import { multiaddrToHttpUrl } from './vendored/multiaddr.js'

const debug = createDebug('spark-piece-indexer:observer')
const debug = createDebug('spark-piece-indexer:ipni-watcher')

/** @import { ProviderToInfoMap, ProviderInfo } from './typings.js' */

Expand Down

0 comments on commit 778f3e9

Please sign in to comment.