Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial implementation of the indexer #7

Merged
merged 31 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c53055f
feat: update our state from IPNI state
bajtos Sep 4, 2024
2ec7716
Merge branch 'main' into indexer-impl
bajtos Sep 4, 2024
fe0e9cc
fixup! finish "update our state from IPNI state"
bajtos Sep 4, 2024
9bbe3bc
start working in the advertisement walker
bajtos Sep 4, 2024
ef48905
Update indexer/lib/typings.d.ts
bajtos Sep 4, 2024
df599e1
handle advertisement with no entries
bajtos Sep 5, 2024
7482b0b
small tweaks, fix tests
bajtos Sep 10, 2024
8c612a1
feat: finish processNextAdvertisement
bajtos Sep 10, 2024
3e0d552
refactor: cleanup
bajtos Sep 10, 2024
459a14c
docs: update the design doc
bajtos Sep 10, 2024
36174d8
Merge branch 'main' into indexer-impl
bajtos Sep 10, 2024
17a05e8
walk one step
bajtos Sep 11, 2024
9a93803
refactor: move loop runners to lib
bajtos Sep 11, 2024
72da9eb
wip: walk providers independently from each other
bajtos Sep 11, 2024
39e257a
wip: don't retry failed op too often
bajtos Sep 12, 2024
a22bc7a
don't retry failed op too often, keep the IPNI state in memory only
bajtos Sep 12, 2024
c86f645
feat: detect HTTP request errors
bajtos Sep 16, 2024
a6cb5e2
Merge branch 'main' into indexer-impl
bajtos Sep 16, 2024
778f3e9
cleanup
bajtos Sep 16, 2024
d14a7f0
feat: repository support for REST API
bajtos Sep 16, 2024
89ec9b2
Merge branch 'main' into indexer-impl
bajtos Sep 16, 2024
a0a153a
ci: setup Redis for tests
bajtos Sep 16, 2024
1e31450
test: remove race condition in the timeout test
bajtos Sep 16, 2024
cafe142
feat: deploy to Fly.io
bajtos Sep 16, 2024
455b1ae
perf: remember walker state between steps
bajtos Sep 18, 2024
d89d33a
Update indexer/bin/piece-indexer.js
bajtos Sep 18, 2024
0755f2a
Update indexer/bin/piece-indexer.js
bajtos Sep 18, 2024
fa18f10
fixup! providerIdsBeingWalked
bajtos Sep 18, 2024
052ed7a
fixup! last_head -> lastHead
bajtos Sep 18, 2024
cc70be0
fixup! fix walker state reuse
bajtos Sep 18, 2024
faaa285
Update indexer/package.json
bajtos Sep 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
A lightweight IPNI node mapping Filecoin PieceCID → payload block CID.

- [Design doc](./docs/design.md)

## Development

```bash
docker run --name redis -p 6379:6379 -d redis
npm start -w indexer
```
96 changes: 96 additions & 0 deletions indexer/bin/piece-indexer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { Redis } from 'ioredis'
import { RedisRepository } from '../lib/redis-repository.js'
import { syncProvidersFromIPNI } from '../lib/ipni-watcher.js'
import timers from 'node:timers/promises'
import { processNextAdvertisement } from '../lib/advertisement-walker.js'

const {
REDIS_URL: redisUrl = 'redis://localhost:6379'
} = process.env

// TODO: setup Sentry

const redisUrlParsed = new URL(redisUrl)
const redis = new Redis({
host: redisUrlParsed.hostname,
port: Number(redisUrlParsed.port),
username: redisUrlParsed.username,
password: redisUrlParsed.password,
lazyConnect: true, // call connect() explicitly so that we can exit on connection error
family: 6 // required for upstash
})

await redis.connect()

await Promise.all([
runIpniSync(),
runWalkers()
])

async function runIpniSync () {
const repository = new RedisRepository(redis)
while (true) {
const started = Date.now()
try {
console.log('Syncing from IPNI')
const providers = await syncProvidersFromIPNI(repository)
console.log(
'Found %s providers, %s support(s) HTTP(s)',
providers.size,
Array.from(providers.values()).filter(p => p.providerAddress.match(/^https?:\/\//)).length
)
} catch (err) {
console.error('Cannot sync from IPNI.', err)
// TODO: log to Sentry
}
const delay = 60_000 - (Date.now() - started)
if (delay > 0) {
console.log('Waiting for %sms before the next sync from IPNI', delay)
await timers.setTimeout(delay)
}
}
}

async function runWalkers () {
const repository = new RedisRepository(redis)
while (true) {
const started = Date.now()

// EVERYTHING BELOW IS TEMPORARY AND WILL BE SIGNIFICANTLY REWORKED
try {
console.log('Walking one step')
const ipniInfoMap = await repository.getIpniInfoForAllProviders()
const walkerStateMap = await repository.getWalkerStateForAllProviders()

// FIXME: run this concurrently
for (const [providerId, info] of ipniInfoMap.entries()) {
const state = walkerStateMap.get(providerId)

if (!info.providerAddress?.match(/^https?:\/\//)) {
console.log('Skipping provider %s address %s', providerId, info.providerAddress)
continue
}
if (['12D3KooWKF2Qb8s4gFXsVB1jb98HpcwhWf12b1TA51VqrtY3PmMC'].includes(providerId)) {
console.log('Skipping unreachable provider %s', providerId)
continue
}

try {
const result = await processNextAdvertisement(providerId, info, state)
console.log('%s %o\n -> %o', providerId, result.newState, result.indexEntry)
} catch (err) {
console.error('Cannot process the next advertisement.', err)
// TODO: log to Sentry
}
}
} catch (err) {
console.error('Walking step failed.', err)
}

const delay = 100 - (Date.now() - started)
if (delay > 0) {
console.log('Waiting for %sms before the next walk', delay)
await timers.setTimeout(delay)
}
}
}
146 changes: 146 additions & 0 deletions indexer/lib/advertisement-walker.js
bajtos marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import createDebug from 'debug'
import { assertOkResponse } from './http-assertions.js'
import { CID } from 'multiformats/cid'
import * as multihash from 'multiformats/hashes/digest'
import { varint } from 'multiformats'
import * as cbor from '@ipld/dag-cbor'

/** @import { ProviderInfo, WalkerState } from './typings.js' */
/** @import { RedisRepository as Repository } from './redis-repository.js' */

const debug = createDebug('spark-piece-indexer:observer')

/**
* @param {string} providerId
* @param {ProviderInfo} providerInfo
* @param {WalkerState | undefined} currentWalkerState
*/
export async function processNextAdvertisement (providerId, providerInfo, currentWalkerState) {
const nextHead = providerInfo.lastAdvertisementCID

/** @type {WalkerState} */
let state

if (!currentWalkerState?.lastHead) {
console.log('Initial walk for provider %s (%s): %s', providerId, providerInfo.providerAddress, providerInfo.lastAdvertisementCID)

/** @type {WalkerState} */
state = {
lastHead: nextHead,
head: nextHead,
tail: nextHead,
status: 'placeholder'
}
} else {
console.log('WALK NOT IMPLEMENTED YET %s %o', providerId, currentWalkerState)
return {}
}

// if (state.tail === state.lastHead || state.tail === undefined) {
// console.log('WALK FINISHED: %s %o', state)
// return { }
// }
if (!state || !state.tail) {
console.log('NOTHING TO DO for %s %o', providerId, currentWalkerState)
return {}
}

// TODO: handle networking errors, Error: connect ENETUNREACH 154.42.3.42:3104

const { previousAdvertisementCid, ...entry } = await fetchAdvertisedPayload(providerInfo.providerAddress, state.tail)
state.tail = previousAdvertisementCid
state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}`

const indexEntry = entry.pieceCid ? entry : undefined
return {
newState: state,
indexEntry
}
}

/**
* @param {string} providerAddress
* @param {string} advertisementCid
*/
export async function fetchAdvertisedPayload (providerAddress, advertisementCid) {
const advertisement =
/** @type {{
Addresses: string[],
ContextID: { '/': { bytes: string } },
Entries: { '/': string },
IsRm: false,
Metadata: { '/': { bytes: string } },
PreviousID?: { '/': string ,
Provider: string
Signature: {
'/': {
bytes: string
}
}
}} */(
await fetchCid(providerAddress, advertisementCid)
)
const previousAdvertisementCid = advertisement.PreviousID?.['/']
debug('advertisement %s %j', advertisementCid, advertisement)

const entriesCid = advertisement.Entries['/']
const entriesChunk =
/** @type {{
Entries: { '/' : { bytes: string } }[]
}} */(
await fetchCid(providerAddress, entriesCid)
)
debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5))
const entryHash = entriesChunk.Entries[0]['/'].bytes
const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString()

const meta = parseMetadata(advertisement.Metadata['/'].bytes)
const pieceCid = meta.deal?.PieceCID.toString()

return {
previousAdvertisementCid,
pieceCid,
payloadCid
}
}

/**
* @param {string} providerBaseUrl
* @param {string} cid
* @returns {Promise<unknown>}
*/
async function fetchCid (providerBaseUrl, cid) {
const url = new URL(cid, new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl))
debug('Fetching %s', url)
// const res = await fetch(url)
const res = await fetch(url, { signal: AbortSignal.timeout(30_000) })
await assertOkResponse(res)
return await res.json()
}

/**
* @param {string} meta
*/
export function parseMetadata (meta) {
const bytes = Buffer.from(meta, 'base64')
const [protocolCode, nextOffset] = varint.decode(bytes)

const protocol = {
0x900: 'bitswap',
0x910: 'graphsync',
0x0920: 'http'
}[protocolCode] ?? '0x' + protocolCode.toString(16)

if (protocol === 'graphsync') {
// console.log(bytes.subarray(nextOffset).toString('hex'))
/** @type {{
PieceCID: import('multiformats/cid').CID,
VerifiedDeal: boolean,
FastRetrieval: boolean
}} */
const deal = cbor.decode(bytes.subarray(nextOffset))
return { protocol, deal }
} else {
return { protocol }
}
}
19 changes: 19 additions & 0 deletions indexer/lib/http-assertions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* @param {Response} res
* @param {string} [errorMsg]
*/
export async function assertOkResponse (res, errorMsg) {
if (res.ok) return

let body
try {
body = await res.text()
} catch {}
body = body?.trimEnd()
const err = new Error(`${errorMsg ?? `Cannot fetch ${res.url}`} (${res.status}): ${body}`)
Object.assign(err, {
statusCode: res.status,
serverMessage: body
})
throw err
}
59 changes: 59 additions & 0 deletions indexer/lib/ipni-watcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import createDebug from 'debug'
import { assertOkResponse } from './http-assertions.js'
import { multiaddrToHttpUrl } from './vendored/multiaddr.js'

const debug = createDebug('spark-piece-indexer:observer')

/** @import { ProviderToInfoMap, ProviderInfo } from './typings.js' */
/** @import { RedisRepository as Repository } from './redis-repository.js' */

/**
* @returns {Promise<ProviderToInfoMap>}
*/
export async function getProvidersWithMetadata () {
const res = await fetch('https://cid.contact/providers')
assertOkResponse(res)

const providers = /** @type {{
AddrInfo: {
ID: string;
Addrs: string[];
},
LastAdvertisement: {
"/": string;
},
LastAdvertisementTime: string;
Publisher: {
ID: string;
Addrs: string[];
},
// Ignored: ExtendedProviders, FrozenAt
* }[]}
*/(await res.json())

/** @type {[string, ProviderInfo][]} */
const entries = providers.map(p => {
const providerId = p.Publisher.ID
const lastAdvertisementCID = p.LastAdvertisement['/']

// FIXME: handle empty Addrs[]
let providerAddress = p.Publisher.Addrs[0]
try {
providerAddress = multiaddrToHttpUrl(providerAddress)
} catch (err) {
debug('Cannot convert address to HTTP(s) URL (provider: %s): %s', providerId, err)
}

return [providerId, { providerAddress, lastAdvertisementCID }]
})
return new Map(entries)
}

/**
* @param {Repository} repository
*/
export async function syncProvidersFromIPNI (repository) {
const providerInfos = await getProvidersWithMetadata()
await repository.setIpniInfoForAllProviders(providerInfos)
return providerInfos
}
Loading
Loading