From 9bbe3bc588cf919dd07917233a2d4bfc1161e260 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Wed, 4 Sep 2024 16:45:04 +0200 Subject: [PATCH] start working in the advertisement walker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- indexer/bin/piece-indexer.js | 85 ++++++++++--- indexer/lib/advertisement-walker.js | 146 ++++++++++++++++++++++ indexer/lib/http-assertions.js | 3 +- indexer/lib/ipni-watcher.js | 59 +++++++++ indexer/lib/redis-repository.js | 82 ++++++++---- indexer/lib/typings.d.ts | 22 ++-- indexer/lib/vendored/multiaddr.js | 58 +++++++++ indexer/package.json | 2 + indexer/test/advertisement-walker.test.js | 86 +++++++++++++ indexer/test/helpers/test-data.js | 4 + indexer/test/ipni-watcher.test.js | 54 ++++++++ package-lock.json | 31 +++++ 12 files changed, 582 insertions(+), 50 deletions(-) create mode 100644 indexer/lib/advertisement-walker.js create mode 100644 indexer/lib/ipni-watcher.js create mode 100644 indexer/lib/vendored/multiaddr.js create mode 100644 indexer/test/advertisement-walker.test.js create mode 100644 indexer/test/helpers/test-data.js create mode 100644 indexer/test/ipni-watcher.test.js diff --git a/indexer/bin/piece-indexer.js b/indexer/bin/piece-indexer.js index 780a1fd..c2bc993 100644 --- a/indexer/bin/piece-indexer.js +++ b/indexer/bin/piece-indexer.js @@ -2,6 +2,7 @@ import { Redis } from 'ioredis' import { RedisRepository } from '../lib/redis-repository.js' import { syncProvidersFromIPNI } from '../lib/ipni-watcher.js' import timers from 'node:timers/promises' +import { processNextAdvertisement } from '../lib/advertisement-walker.js' const { REDIS_URL: redisUrl = 'redis://localhost:6379' @@ -21,23 +22,75 @@ const redis = new Redis({ await redis.connect() -while (true) { - const started = Date.now() - try { - console.log('Syncing from IPNI') - const providers = await syncProvidersFromIPNI(new RedisRepository(redis)) - console.log( - 'Found %s providers, %s support(s) HTTP(s)', - providers.length, - providers.filter(p => p.providerAddress.match(/^https?:\/\//)).length - ) - } catch (err) { - console.error('Cannot sync from IPNI.', err) +await Promise.all([ + runIpniSync(), + runWalkers() +]) + +async function runIpniSync () { + const repository = new RedisRepository(redis) + while (true) { + const started = Date.now() + try { + console.log('Syncing from IPNI') + const providers = await syncProvidersFromIPNI(repository) + console.log( + 'Found %s providers, %s support(s) HTTP(s)', + providers.size, + Array.from(providers.values()).filter(p => p.providerAddress.match(/^https?:\/\//)).length + ) + } catch (err) { + console.error('Cannot sync from IPNI.', err) // TODO: log to Sentry + } + const delay = 60_000 - (Date.now() - started) + if (delay > 0) { + console.log('Waiting for %sms before the next sync from IPNI', delay) + await timers.setTimeout(delay) + } } - const delay = 6_000 - (Date.now() - started) - if (delay > 0) { - console.log('Waiting for %sms before the next sync from IPNI', delay) - await timers.setTimeout(delay) +} + +async function runWalkers () { + const repository = new RedisRepository(redis) + while (true) { + const started = Date.now() + + // EVERYTHING BELOW IS TEMPORARY AND WILL BE SIGNIFICANTLY REWORKED + try { + console.log('Walking one step') + const ipniInfoMap = await repository.getIpniInfoForAllProviders() + const walkerStateMap = await repository.getWalkerStateForAllProviders() + + // FIXME: run this concurrently + for (const [providerId, info] of ipniInfoMap.entries()) { + const state = walkerStateMap.get(providerId) + + if (!info.providerAddress?.match(/^https?:\/\//)) { + console.log('Skipping provider %s address %s', providerId, info.providerAddress) + continue + } + if (['12D3KooWKF2Qb8s4gFXsVB1jb98HpcwhWf12b1TA51VqrtY3PmMC'].includes(providerId)) { + console.log('Skipping unreachable provider %s', providerId) + continue + } + + try { + const result = await processNextAdvertisement(providerId, info, state) + console.log('%s %o\n -> %o', providerId, result.newState, result.indexEntry) + } catch (err) { + console.error('Cannot process the next advertisement.', err) + // TODO: log to Sentry + } + } + } catch (err) { + console.error('Walking step failed.', err) + } + + const delay = 100 - (Date.now() - started) + if (delay > 0) { + console.log('Waiting for %sms before the next walk', delay) + await timers.setTimeout(delay) + } } } diff --git a/indexer/lib/advertisement-walker.js b/indexer/lib/advertisement-walker.js new file mode 100644 index 0000000..b6550c6 --- /dev/null +++ b/indexer/lib/advertisement-walker.js @@ -0,0 +1,146 @@ +import createDebug from 'debug' +import { assertOkResponse } from './http-assertions.js' +import { CID } from 'multiformats/cid' +import * as multihash from 'multiformats/hashes/digest' +import { varint } from 'multiformats' +import * as cbor from '@ipld/dag-cbor' + +/** @import { ProviderInfo, WalkerState } from './typings.js' */ +/** @import { RedisRepository as Repository } from './redis-repository.js' */ + +const debug = createDebug('spark-piece-indexer:observer') + +/** + * @param {string} providerId + * @param {ProviderInfo} providerInfo + * @param {WalkerState | undefined} currentWalkerState + */ +export async function processNextAdvertisement (providerId, providerInfo, currentWalkerState) { + const nextHead = providerInfo.lastAdvertisementCID + + /** @type {WalkerState} */ + let state + + if (!currentWalkerState?.lastHead) { + console.log('Initial walk for provider %s (%s): %s', providerId, providerInfo.providerAddress, providerInfo.lastAdvertisementCID) + + /** @type {WalkerState} */ + state = { + lastHead: nextHead, + head: nextHead, + tail: nextHead, + status: 'placeholder' + } + } else { + console.log('WALK NOT IMPLEMENTED YET %s %o', providerId, currentWalkerState) + return {} + } + + // if (state.tail === state.lastHead || state.tail === undefined) { + // console.log('WALK FINISHED: %s %o', state) + // return { } + // } + if (!state || !state.tail) { + console.log('NOTHING TO DO for %s %o', providerId, currentWalkerState) + return {} + } + + // TODO: handle networking errors, Error: connect ENETUNREACH 154.42.3.42:3104 + + const { previousAdvertisementCid, ...entry } = await fetchAdvertisedPayload(providerInfo.providerAddress, state.tail) + state.tail = previousAdvertisementCid + state.status = `Walking the advertisements from ${state.head}, next step: ${state.tail}` + + const indexEntry = entry.pieceCid ? entry : undefined + return { + newState: state, + indexEntry + } +} + +/** + * @param {string} providerAddress + * @param {string} advertisementCid + */ +export async function fetchAdvertisedPayload (providerAddress, advertisementCid) { + const advertisement = + /** @type {{ + Addresses: string[], + ContextID: { '/': { bytes: string } }, + Entries: { '/': string }, + IsRm: false, + Metadata: { '/': { bytes: string } }, + PreviousID?: { '/': string , + Provider: string + Signature: { + '/': { + bytes: string + } + } + }} */( + await fetchCid(providerAddress, advertisementCid) + ) + const previousAdvertisementCid = advertisement.PreviousID?.['/'] + debug('advertisement %s %j', advertisementCid, advertisement) + + const entriesCid = advertisement.Entries['/'] + const entriesChunk = + /** @type {{ + Entries: { '/' : { bytes: string } }[] + }} */( + await fetchCid(providerAddress, entriesCid) + ) + debug('entriesChunk %s %j', entriesCid, entriesChunk.Entries.slice(0, 5)) + const entryHash = entriesChunk.Entries[0]['/'].bytes + const payloadCid = CID.create(1, 0x55 /* raw */, multihash.decode(Buffer.from(entryHash, 'base64'))).toString() + + const meta = parseMetadata(advertisement.Metadata['/'].bytes) + const pieceCid = meta.deal?.PieceCID.toString() + + return { + previousAdvertisementCid, + pieceCid, + payloadCid + } +} + +/** + * @param {string} providerBaseUrl + * @param {string} cid + * @returns {Promise} + */ +async function fetchCid (providerBaseUrl, cid) { + const url = new URL(cid, new URL('/ipni/v1/ad/_cid_placeholder_', providerBaseUrl)) + debug('Fetching %s', url) + // const res = await fetch(url) + const res = await fetch(url, { signal: AbortSignal.timeout(30_000) }) + await assertOkResponse(res) + return await res.json() +} + +/** + * @param {string} meta + */ +export function parseMetadata (meta) { + const bytes = Buffer.from(meta, 'base64') + const [protocolCode, nextOffset] = varint.decode(bytes) + + const protocol = { + 0x900: 'bitswap', + 0x910: 'graphsync', + 0x0920: 'http' + }[protocolCode] ?? '0x' + protocolCode.toString(16) + + if (protocol === 'graphsync') { + // console.log(bytes.subarray(nextOffset).toString('hex')) + /** @type {{ + PieceCID: import('multiformats/cid').CID, + VerifiedDeal: boolean, + FastRetrieval: boolean + }} */ + const deal = cbor.decode(bytes.subarray(nextOffset)) + return { protocol, deal } + } else { + return { protocol } + } +} diff --git a/indexer/lib/http-assertions.js b/indexer/lib/http-assertions.js index 4c4d8b6..5ff9cfe 100644 --- a/indexer/lib/http-assertions.js +++ b/indexer/lib/http-assertions.js @@ -9,7 +9,8 @@ export async function assertOkResponse (res, errorMsg) { try { body = await res.text() } catch {} - const err = new Error(`${errorMsg ?? `Cannot fetch ${res.url}`} (${res.status}): ${body?.trimEnd()}`) + body = body?.trimEnd() + const err = new Error(`${errorMsg ?? `Cannot fetch ${res.url}`} (${res.status}): ${body}`) Object.assign(err, { statusCode: res.status, serverMessage: body diff --git a/indexer/lib/ipni-watcher.js b/indexer/lib/ipni-watcher.js new file mode 100644 index 0000000..9fcf506 --- /dev/null +++ b/indexer/lib/ipni-watcher.js @@ -0,0 +1,59 @@ +import createDebug from 'debug' +import { assertOkResponse } from './http-assertions.js' +import { multiaddrToHttpUrl } from './vendored/multiaddr.js' + +const debug = createDebug('spark-piece-indexer:observer') + +/** @import { ProviderToInfoMap, ProviderInfo } from './typings.js' */ +/** @import { RedisRepository as Repository } from './redis-repository.js' */ + +/** + * @returns {Promise} + */ +export async function getProvidersWithMetadata () { + const res = await fetch('https://cid.contact/providers') + assertOkResponse(res) + + const providers = /** @type {{ + AddrInfo: { + ID: string; + Addrs: string[]; + }, + LastAdvertisement: { + "/": string; + }, + LastAdvertisementTime: string; + Publisher: { + ID: string; + Addrs: string[]; + }, + // Ignored: ExtendedProviders, FrozenAt + * }[]} + */(await res.json()) + + /** @type {[string, ProviderInfo][]} */ + const entries = providers.map(p => { + const providerId = p.Publisher.ID + const lastAdvertisementCID = p.LastAdvertisement['/'] + + // FIXME: handle empty Addrs[] + let providerAddress = p.Publisher.Addrs[0] + try { + providerAddress = multiaddrToHttpUrl(providerAddress) + } catch (err) { + debug('Cannot convert address to HTTP(s) URL (provider: %s): %s', providerId, err) + } + + return [providerId, { providerAddress, lastAdvertisementCID }] + }) + return new Map(entries) +} + +/** + * @param {Repository} repository + */ +export async function syncProvidersFromIPNI (repository) { + const providerInfos = await getProvidersWithMetadata() + await repository.setIpniInfoForAllProviders(providerInfos) + return providerInfos +} diff --git a/indexer/lib/redis-repository.js b/indexer/lib/redis-repository.js index 50480fa..c96419b 100644 --- a/indexer/lib/redis-repository.js +++ b/indexer/lib/redis-repository.js @@ -1,4 +1,4 @@ -/** @import { ProviderToIpniStateMap } from "./typings.js" */ +/** @import { ProviderInfo, ProviderToInfoMap, ProviderToWalkerStateMap, WalkerState } from "./typings.js" */ export class RedisRepository { #redis @@ -11,44 +11,80 @@ export class RedisRepository { } /** - * @returns {Promise} + * @returns {Promise} */ - async getIpniStateForAllProviders () { + async getIpniInfoForAllProviders () { + const stringEntries = await this.#scanEntries('ipni-state') + /** @type {[string, ProviderInfo][]} */ + const entries = stringEntries.map( + ([providerId, stateJson]) => (([providerId, JSON.parse(stateJson)])) + ) + return new Map(entries) + } + + /** + * @param {ProviderToInfoMap} keyValueMap + */ + async setIpniInfoForAllProviders (keyValueMap) { + const serialized = new Map( + Array.from(keyValueMap.entries()).map(([key, value]) => ([`ipni-state:${key}`, JSON.stringify(value)])) + ) + await this.#redis.mset(serialized) + } + + /** + * @returns {Promise} + */ + async getWalkerStateForAllProviders () { + const stringEntries = await this.#scanEntries('walker-state') + /** @type {[string, WalkerState][]} */ + const entries = stringEntries.map( + ([providerId, stateJson]) => (([providerId, JSON.parse(stateJson)])) + ) + return new Map(entries) + } + + /** + * + * @param {string} providerId + * @param {WalkerState} state + */ + async setWalkerState (providerId, state) { + const data = JSON.stringify(state) + await this.#redis.set(`walker-state:${providerId}`, data) + } + + /** + * @param {"ipni-state" | "walker-state"} keyPrefix "ipni-state" or "walker-state" + */ + async #scanEntries (keyPrefix) { /** @type {string[]} */ const redisKeys = [] const keyStream = this.#redis.scanStream({ - match: 'ipni-state:*', + match: `${keyPrefix}:*`, count: 1000 }) for await (const chunk of keyStream) { redisKeys.push(...chunk) } - const stateList = await this.#redis.mget(redisKeys) + if (!redisKeys.length) return [] + + const stringValues = await this.#redis.mget(redisKeys) - /** @type {ProviderToIpniStateMap} */ - const result = new Map() + /** @type {[string, string][]} */ + const result = [] for (let ix = 0; ix < redisKeys.length; ix++) { - const key = redisKeys[ix] - const stateStr = stateList[ix] - if (!stateStr) { - console.error('Unexpected Redis state: the existing key %s does not have any value', key) + const prefixedKey = redisKeys[ix] + const value = stringValues[ix] + if (!value) { + console.error('Unexpected Redis state: the existing key %s does not have any value', prefixedKey) continue } - const providerId = key.split(':')[1] - result.set(providerId, JSON.parse(stateStr)) + const key = prefixedKey.split(':')[1] + result.push([key, value]) } return result } - - /** - * @param {ProviderToIpniStateMap} stateMap - */ - async setIpniStateForAllProviders (stateMap) { - const serialized = new Map( - Array.from(stateMap.entries()).map(([key, value]) => ([`ipni-state:${key}`, JSON.stringify(value)])) - ) - await this.#redis.mset(serialized) - } } diff --git a/indexer/lib/typings.d.ts b/indexer/lib/typings.d.ts index f226fad..891a8ca 100644 --- a/indexer/lib/typings.d.ts +++ b/indexer/lib/typings.d.ts @@ -1,19 +1,21 @@ /** - * Data extracted from IPNI response + * Data synced from IPNi */ -export interface IpniProviderInfo { - providerId: string; +export interface ProviderInfo { providerAddress: string; lastAdvertisementCID: string; } -/** - * Data stored in our database - */ -export interface ProviderIpniState { - providerAddress: string; - lastAdvertisementCID: string; +export type ProviderToInfoMap = Map; + +export interface WalkerState { + lastHead: string; + head: string; + tail: string | undefined + status: string; } -export type ProviderToIpniStateMap = Map; +export type ProviderToWalkerStateMap = Map + +export type PiecePayloadCIDs = string[]; diff --git a/indexer/lib/vendored/multiaddr.js b/indexer/lib/vendored/multiaddr.js new file mode 100644 index 0000000..217078c --- /dev/null +++ b/indexer/lib/vendored/multiaddr.js @@ -0,0 +1,58 @@ +// MIRRORED FROM https://github.com/filecoin-station/spark/blob/main/lib/multiaddr.js +// @ts-nocheck + +/** + * @param {string} addr Multiaddr, e.g. `/ip4/127.0.0.1/tcp/80/http` + * @returns {string} Parsed URI, e.g. `http://127.0.0.1:80` + */ +export function multiaddrToHttpUrl (addr) { + const [, hostType, hostValue, ipProtocol, port, scheme, ...rest] = addr.split('/') + + if (ipProtocol !== 'tcp') { + throw Object.assign( + new Error(`Cannot parse "${addr}": unsupported protocol "${ipProtocol}"`), + { code: 'UNSUPPORTED_MULTIADDR_PROTO' } + ) + } + + if (scheme !== 'http' && scheme !== 'https') { + throw Object.assign( + new Error(`Cannot parse "${addr}": unsupported scheme "${scheme}"`), + { code: 'UNSUPPORTED_MULTIADDR_SCHEME' } + ) + } + + if (rest.length) { + throw Object.assign( + new Error(`Cannot parse "${addr}": too many parts`), + { code: 'MULTIADDR_HAS_TOO_MANY_PARTS' } + ) + } + + return `${scheme}://${getUriHost(hostType, hostValue)}${getUriPort(scheme, port)}` +} + +function getUriHost (hostType, hostValue) { + switch (hostType) { + case 'ip4': + case 'dns': + case 'dns4': + case 'dns6': + return hostValue + case 'ip6': + // See https://superuser.com/a/367788/135774: + // According to RFC2732, literal IPv6 addresses should be put inside square brackets in URLs + return `[${hostValue}]` + } + + throw Object.assign( + new Error(`Unsupported multiaddr host type "${hostType}"`), + { code: 'UNSUPPORTED_MULTIADDR_HOST_TYPE' } + ) +} + +function getUriPort (scheme, port) { + if (scheme === 'http' && port === '80') return '' + if (scheme === 'https' && port === '443') return '' + return `:${port}` +} diff --git a/indexer/package.json b/indexer/package.json index 1c4ac8c..30cb327 100644 --- a/indexer/package.json +++ b/indexer/package.json @@ -8,6 +8,8 @@ "test": "node --test" }, "devDependencies": { + "@ipld/dag-cbor": "^9.2.1", + "multiformats": "^13.2.2", "standard": "^17.1.0" }, "dependencies": { diff --git a/indexer/test/advertisement-walker.test.js b/indexer/test/advertisement-walker.test.js new file mode 100644 index 0000000..85b8ee3 --- /dev/null +++ b/indexer/test/advertisement-walker.test.js @@ -0,0 +1,86 @@ +import createDebug from 'debug' +import { Redis } from 'ioredis' +import assert from 'node:assert' +import { after, before, beforeEach, describe, it } from 'node:test' +import { RedisRepository } from '../lib/redis-repository.js' +import { fetchAdvertisedPayload, processNextAdvertisement } from '../lib/advertisement-walker.js' +import { FRISBII_ADDRESS, FRISBII_AD_CID } from './helpers/test-data.js' + +/** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */ + +const debug = createDebug('test') + +// TODO(bajtos) We may need to replace this with a mock index provider +const providerId = '12D3KooWDYiKtcxTrjNFtR6UqKRkJpESYHmmFznQAAkDX2ZHQ49t' +const providerAddress = 'http://222.214.219.200:3104' +const knownAdvertisementCID = 'baguqeeradb34kxwvi5fs3gj6wrxfkcqntzklq4qdallcejqfhyryftnpd25a' +const knownPrevAdvertisementCID = 'baguqeerawqvze5suesscwzsmpgemthwv6hx2yi2rg35zt7jdlmxapjf5qfdq' + +describe('processNextAdvertisement', () => { + it('handles a new index provider not seen before', async () => { + /** @type {ProviderInfo} */ + const providerInfo = { + providerAddress, + lastAdvertisementCID: knownAdvertisementCID + } + const walkerState = undefined + const { indexEntry, newState } = await processNextAdvertisement(providerId, providerInfo, walkerState) + assert.deepStrictEqual(newState, /** @type {WalkerState} */({ + head: providerInfo.lastAdvertisementCID, + lastHead: providerInfo.lastAdvertisementCID, + tail: knownPrevAdvertisementCID, + status: `Walking the advertisements from ${knownAdvertisementCID}, next step: ${knownPrevAdvertisementCID}` + })) + + assert.deepStrictEqual(indexEntry, { + payloadCid: 'bafk2bzaceaybhh2uenrbiuv4x6xywbv6oxizamydggd5r2xgnnvr53uwnjqea', + pieceCid: 'baga6ea4seaqjk25ts2kekzqa5jplj6uyzk7qpiigg4koiqjz26dtmzooiocwuoa' + }) + }) + + it('does nothing when the last advertisement has been already processed', async () => { + /** @type {ProviderInfo} */ + const providerInfo = { + providerAddress, + lastAdvertisementCID: knownAdvertisementCID + } + + let result = await processNextAdvertisement(providerId, providerInfo, undefined) + assert.strictEqual(result.newState?.lastHead, providerInfo.lastAdvertisementCID) + + result = await processNextAdvertisement(providerId, providerInfo, result.newState) + assert(result.newState === undefined) + }) +}) + +/** @typedef {Awaited>} AdvertisedPayload */ + +describe('fetchAdvertisedPayload', () => { + it('returns previousAdvertisementCid, pieceCid and payloadCid for Graphsync retrievals', async () => { + const result = await fetchAdvertisedPayload(providerAddress, knownAdvertisementCID) + assert.deepStrictEqual(result, /** @type {AdvertisedPayload} */({ + payloadCid: 'bafk2bzaceaybhh2uenrbiuv4x6xywbv6oxizamydggd5r2xgnnvr53uwnjqea', + pieceCid: 'baga6ea4seaqjk25ts2kekzqa5jplj6uyzk7qpiigg4koiqjz26dtmzooiocwuoa', + previousAdvertisementCid: 'baguqeerawqvze5suesscwzsmpgemthwv6hx2yi2rg35zt7jdlmxapjf5qfdq' + })) + }) + + it('returns undefined pieceCid for HTTP retrievals', async () => { + const result = await fetchAdvertisedPayload(FRISBII_ADDRESS, FRISBII_AD_CID) + assert.deepStrictEqual(result, /** @type {AdvertisedPayload} */({ + payloadCid: 'bafkreih5zasorm4tlfga4ztwvm2dlnw6jxwwuvgnokyt3mjamfn3svvpyy', + pieceCid: undefined, + // Our Frisbii instance announced only one advertisement + // That's unrelated to HTTP vs Graphsync retrievals + previousAdvertisementCid: undefined + })) + }) +}) + +it.only('exploratory testing', async () => { + const result = await fetchAdvertisedPayload( + 'http://filswan.soundchina.net:3105', + 'baguqeeras2wvxglslzbl7fbyh6q4wbwi6nompdghd4vpnbdo3yiqq4zxhfiq' + ) + console.log(result) +}) diff --git a/indexer/test/helpers/test-data.js b/indexer/test/helpers/test-data.js new file mode 100644 index 0000000..5ac3539 --- /dev/null +++ b/indexer/test/helpers/test-data.js @@ -0,0 +1,4 @@ +// See https://github.com/filecoin-station/frisbii-on-fly +export const FRISBII_ID = '12D3KooWC8gXxg9LoJ9h3hy3jzBkEAxamyHEQJKtRmAuBuvoMzpr' +export const FRISBII_ADDRESS = 'https://frisbii.fly.dev' +export const FRISBII_AD_CID = 'baguqeeraacf25e7glnmzgtocthwavkhq7z7jrv576aofwg3jr5mn3qqomp5a' diff --git a/indexer/test/ipni-watcher.test.js b/indexer/test/ipni-watcher.test.js new file mode 100644 index 0000000..20b5d91 --- /dev/null +++ b/indexer/test/ipni-watcher.test.js @@ -0,0 +1,54 @@ +import createDebug from 'debug' +import { Redis } from 'ioredis' +import assert from 'node:assert' +import { after, before, beforeEach, describe, it } from 'node:test' +import { getProvidersWithMetadata, syncProvidersFromIPNI } from '../lib/ipni-watcher.js' +import { RedisRepository } from '../lib/redis-repository.js' +import { FRISBII_ADDRESS, FRISBII_ID } from './helpers/test-data.js' + +/** @import { ProviderInfo, WalkerState } from '../lib/typings.js' */ + +const debug = createDebug('test') + +describe('getProvidersWithMetadata', () => { + it('returns response including known providers', async () => { + const providers = await getProvidersWithMetadata() + debug(JSON.stringify(providers, null, 2)) + + const frisbiiOnFly = providers.find( + p => p.providerId === FRISBII_ID && p.providerAddress === FRISBII_ADDRESS + ) + + assert(frisbiiOnFly) + assert.match(frisbiiOnFly.lastAdvertisementCID, /^bagu/) + }) +}) + +describe('syncProvidersFromIPNI', () => { + /** @type {Redis} */ + let redis + + before(async () => { + redis = new Redis({ db: 1 }) + }) + + beforeEach(async () => { + await redis.flushall() + }) + + after(async () => { + await redis?.disconnect() + }) + + it('downloads metadata from IPNI and stores it in our DB', async () => { + const repository = new RedisRepository(redis) + await syncProvidersFromIPNI(repository) + + const stateMap = await repository.getIpniInfoForAllProviders() + + const frisbiiOnFly = stateMap.get(FRISBII_ID) + assert(frisbiiOnFly, 'Frisbii index provider was not found in our state') + assert.equal(frisbiiOnFly.providerAddress, FRISBII_ADDRESS) + assert.match(frisbiiOnFly.lastAdvertisementCID, /^bagu/) + }) +}) diff --git a/package-lock.json b/package-lock.json index eaa57c5..0b701cf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,6 +26,8 @@ "ioredis": "^5.4.1" }, "devDependencies": { + "@ipld/dag-cbor": "^9.2.1", + "multiformats": "^13.2.2", "standard": "^17.1.0" } }, @@ -129,6 +131,20 @@ "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==" }, + "node_modules/@ipld/dag-cbor": { + "version": "9.2.1", + "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-9.2.1.tgz", + "integrity": "sha512-nyY48yE7r3dnJVlxrdaimrbloh4RokQaNRdI//btfTkcTEZbpmSrbYcBQ4VKTf8ZxXAOUJy4VsRpkJo+y9RTnA==", + "dev": true, + "dependencies": { + "cborg": "^4.0.0", + "multiformats": "^13.1.0" + }, + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -503,6 +519,15 @@ "node": ">=6" } }, + "node_modules/cborg": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/cborg/-/cborg-4.2.3.tgz", + "integrity": "sha512-XBFbEJ6WMfn9L7woc2t+EzOxF8vGqddoopKBbrhIvZBt2WIUgSlT8xLmM6Aq1xv8eWt4yOSjwxWjYeuHU3CpJA==", + "dev": true, + "bin": { + "cborg": "lib/bin.js" + } + }, "node_modules/chalk": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", @@ -2335,6 +2360,12 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, + "node_modules/multiformats": { + "version": "13.2.2", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.2.2.tgz", + "integrity": "sha512-RWI+nyf0q64vyOxL8LbKtjJMki0sogRL/8axvklNtiTM0iFCVtHwME9w6+0P1/v4dQvsIg8A45oT3ka1t/M/+A==", + "dev": true + }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz",