Skip to content

Commit

Permalink
start working in the advertisement walker
Browse files Browse the repository at this point in the history
Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos committed Sep 4, 2024
1 parent fe0e9cc commit 9bbe3bc
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 50 deletions.
85 changes: 69 additions & 16 deletions indexer/bin/piece-indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Redis } from 'ioredis'
import { RedisRepository } from '../lib/redis-repository.js'
import { syncProvidersFromIPNI } from '../lib/ipni-watcher.js'
import timers from 'node:timers/promises'
import { processNextAdvertisement } from '../lib/advertisement-walker.js'

const {
REDIS_URL: redisUrl = 'redis://localhost:6379'
Expand All @@ -21,23 +22,75 @@ const redis = new Redis({

await redis.connect()

while (true) {
const started = Date.now()
try {
console.log('Syncing from IPNI')
const providers = await syncProvidersFromIPNI(new RedisRepository(redis))
console.log(
'Found %s providers, %s support(s) HTTP(s)',
providers.length,
providers.filter(p => p.providerAddress.match(/^https?:\/\//)).length
)
} catch (err) {
console.error('Cannot sync from IPNI.', err)
await Promise.all([
runIpniSync(),
runWalkers()
])

async function runIpniSync () {
const repository = new RedisRepository(redis)
while (true) {
const started = Date.now()
try {
console.log('Syncing from IPNI')
const providers = await syncProvidersFromIPNI(repository)
console.log(
'Found %s providers, %s support(s) HTTP(s)',
providers.size,
Array.from(providers.values()).filter(p => p.providerAddress.match(/^https?:\/\//)).length
)
} catch (err) {
console.error('Cannot sync from IPNI.', err)
// TODO: log to Sentry
}
const delay = 60_000 - (Date.now() - started)
if (delay > 0) {
console.log('Waiting for %sms before the next sync from IPNI', delay)
await timers.setTimeout(delay)
}
}
const delay = 6_000 - (Date.now() - started)
if (delay > 0) {
console.log('Waiting for %sms before the next sync from IPNI', delay)
await timers.setTimeout(delay)
}

async function runWalkers () {
const repository = new RedisRepository(redis)
while (true) {
const started = Date.now()

// EVERYTHING BELOW IS TEMPORARY AND WILL BE SIGNIFICANTLY REWORKED
try {
console.log('Walking one step')
const ipniInfoMap = await repository.getIpniInfoForAllProviders()
const walkerStateMap = await repository.getWalkerStateForAllProviders()

// FIXME: run this concurrently
for (const [providerId, info] of ipniInfoMap.entries()) {
const state = walkerStateMap.get(providerId)

if (!info.providerAddress?.match(/^https?:\/\//)) {
console.log('Skipping provider %s address %s', providerId, info.providerAddress)
continue
}
if (['12D3KooWKF2Qb8s4gFXsVB1jb98HpcwhWf12b1TA51VqrtY3PmMC'].includes(providerId)) {
console.log('Skipping unreachable provider %s', providerId)
continue
}

try {
const result = await processNextAdvertisement(providerId, info, state)
console.log('%s %o\n -> %o', providerId, result.newState, result.indexEntry)
} catch (err) {
console.error('Cannot process the next advertisement.', err)
// TODO: log to Sentry
}
}
} catch (err) {
console.error('Walking step failed.', err)
}

const delay = 100 - (Date.now() - started)
if (delay > 0) {
console.log('Waiting for %sms before the next walk', delay)
await timers.setTimeout(delay)
}
}
}
146 changes: 146 additions & 0 deletions indexer/lib/advertisement-walker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import createDebug from 'debug'
import { assertOkResponse } from './http-assertions.js'
import { CID } from 'multiformats/cid'
import * as multihash from 'multiformats/hashes/digest'
import { varint } from 'multiformats'
import * as cbor from '@ipld/dag-cbor'

/** @import { ProviderInfo, WalkerState } from './typings.js' */
/** @import { RedisRepository as Repository } from './redis-repository.js' */

const debug = createDebug('spark-piece-indexer:observer')

/**
* @param {string} providerId
* @param {ProviderInfo} providerInfo
* @param {WalkerState | undefined} currentWalkerState
*/
export async function processNextAdvertisement (providerId, providerInfo, currentWalkerState) {
const nextHead = providerInfo.lastAdvertisementCID

/** @type {WalkerState} */
let state

if (!currentWalkerState?.lastHead) {
console.log('Initial walk for provider %s (%s): %s', providerId, providerInfo.providerAddress, providerInfo.lastAdvertisementCID)

/** @type {WalkerState} */
state = {
lastHead: nextHead,
head: nextHead,
tail: nextHead,
status: 'placeholder'
}
} else {
console.log('WALK NOT IMPLEMENTED YET %s %o', providerId, currentWalkerState)
return {}
}

// if (state.tail === state.lastHead || state.tail === undefined) {
// console.log('WALK FINISHED: %s %o', state)
// return { }
// }
if (!state || !state.tail) {
console.log('NOTHING TO DO for %s %o', providerId, currentWalkerState)
return {}
}

// TODO: handle networking errors, Error: connect ENETUNREACH 154.42.3.42:3104

const { previousAdvertisementCid, ...entry } = await fetchAdvertisedPayload(providerInfo.providerAddress, state.tail)
state.tail = previousAdvertisementCid
state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}`

const indexEntry = entry.pieceCid ? entry : undefined
return {
newState: state,
indexEntry
}
}

/**
* @param {string} providerAddress
* @param {string} advertisementCid
*/
export async function fetchAdvertisedPayload (providerAddress, advertisementCid) {
const advertisement =
/** @type {{
Addresses: string[],
ContextID: { '/': { bytes: string } },
Entries: { '/': string },
IsRm: false,
Metadata: { '/': { bytes: string } },
PreviousID?: { '/': string ,
Provider: string
Signature: {
'/': {
bytes: string
}
}
}} */(
await fetchCid(providerAddress, advertisementCid)
)
const previousAdvertisementCid = advertisement.PreviousID?.['/']
debug('advertisement %s %j', advertisementCid, advertisement)

const entriesCid = advertisement.Entries['/']
const entriesChunk =
/** @type {{
Entries: { '/' : { bytes: string } }[]
}} */(
await fetchCid(providerAddress, entriesCid)
)
debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5))
const entryHash = entriesChunk.Entries[0]['/'].bytes
const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString()

const meta = parseMetadata(advertisement.Metadata['/'].bytes)
const pieceCid = meta.deal?.PieceCID.toString()

return {
previousAdvertisementCid,
pieceCid,
payloadCid
}
}

/**
* @param {string} providerBaseUrl
* @param {string} cid
* @returns {Promise<unknown>}
*/
async function fetchCid (providerBaseUrl, cid) {
const url = new URL(cid, new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl))
debug('Fetching %s', url)
// const res = await fetch(url)
const res = await fetch(url, { signal: AbortSignal.timeout(30_000) })
await assertOkResponse(res)
return await res.json()
}

/**
* @param {string} meta
*/
export function parseMetadata (meta) {
const bytes = Buffer.from(meta, 'base64')
const [protocolCode, nextOffset] = varint.decode(bytes)

const protocol = {
0x900: 'bitswap',
0x910: 'graphsync',
0x0920: 'http'
}[protocolCode] ?? '0x' + protocolCode.toString(16)

if (protocol === 'graphsync') {
// console.log(bytes.subarray(nextOffset).toString('hex'))
/** @type {{
PieceCID: import('multiformats/cid').CID,
VerifiedDeal: boolean,
FastRetrieval: boolean
}} */
const deal = cbor.decode(bytes.subarray(nextOffset))
return { protocol, deal }
} else {
return { protocol }
}
}
3 changes: 2 additions & 1 deletion indexer/lib/http-assertions.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ export async function assertOkResponse (res, errorMsg) {
try {
body = await res.text()
} catch {}
const err = new Error(`${errorMsg ?? `Cannot fetch ${res.url}`} (${res.status}): ${body?.trimEnd()}`)
body = body?.trimEnd()
const err = new Error(`${errorMsg ?? `Cannot fetch ${res.url}`} (${res.status}): ${body}`)
Object.assign(err, {
statusCode: res.status,
serverMessage: body
Expand Down
59 changes: 59 additions & 0 deletions indexer/lib/ipni-watcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import createDebug from 'debug'
import { assertOkResponse } from './http-assertions.js'
import { multiaddrToHttpUrl } from './vendored/multiaddr.js'

const debug = createDebug('spark-piece-indexer:observer')

/** @import { ProviderToInfoMap, ProviderInfo } from './typings.js' */
/** @import { RedisRepository as Repository } from './redis-repository.js' */

/**
* @returns {Promise<ProviderToInfoMap>}
*/
export async function getProvidersWithMetadata () {
const res = await fetch('https://cid.contact/providers')
assertOkResponse(res)

const providers = /** @type {{
AddrInfo: {
ID: string;
Addrs: string[];
},
LastAdvertisement: {
"/": string;
},
LastAdvertisementTime: string;
Publisher: {
ID: string;
Addrs: string[];
},
// Ignored: ExtendedProviders, FrozenAt
* }[]}
*/(await res.json())

/** @type {[string, ProviderInfo][]} */
const entries = providers.map(p => {
const providerId = p.Publisher.ID
const lastAdvertisementCID = p.LastAdvertisement['/']

// FIXME: handle empty Addrs[]
let providerAddress = p.Publisher.Addrs[0]
try {
providerAddress = multiaddrToHttpUrl(providerAddress)
} catch (err) {
debug('Cannot convert address to HTTP(s) URL (provider: %s): %s', providerId, err)
}

return [providerId, { providerAddress, lastAdvertisementCID }]
})
return new Map(entries)
}

/**
* @param {Repository} repository
*/
export async function syncProvidersFromIPNI (repository) {
const providerInfos = await getProvidersWithMetadata()
await repository.setIpniInfoForAllProviders(providerInfos)
return providerInfos
}
Loading

0 comments on commit 9bbe3bc

Please sign in to comment.