Skip to content

Commit

Permalink
feat: update our state from IPNI state
Browse files Browse the repository at this point in the history
Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos committed Sep 4, 2024
1 parent 80e7740 commit c53055f
Show file tree
Hide file tree
Showing 10 changed files with 558 additions and 2 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
A lightweight IPNI node mapping Filecoin PieceCID → payload block CID.

- [Design doc](./docs/design.md)

## Development

```bash
docker run --name redis -p 6379:6379 -d redis
npm start -w indexer
```
17 changes: 17 additions & 0 deletions indexer/bin/piece-indexer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Redis } from 'ioredis'

const {
REDIS_URL: redisUrl = 'redis://localhost:6379'
} = process.env

const redisUrlParsed = new URL(redisUrl)
const redis = new Redis({
host: redisUrlParsed.hostname,
port: Number(redisUrlParsed.port),
username: redisUrlParsed.username,
password: redisUrlParsed.password,
lazyConnect: true, // call connect() explicitly so that we can exit on connection error
family: 6 // required for upstash
})

await redis.connect()
18 changes: 18 additions & 0 deletions indexer/lib/http-assertions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* @param {Response} res
* @param {string} [errorMsg]
*/
export async function assertOkResponse (res, errorMsg) {
if (res.ok) return

let body
try {
body = await res.text()
} catch {}
const err = new Error(`${errorMsg ?? `Cannot fetch ${res.url}`} (${res.status}): ${body?.trimEnd()}`)
Object.assign(err, {
statusCode: res.status,
serverMessage: body
})
throw err
}
88 changes: 88 additions & 0 deletions indexer/lib/observer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import createDebug from 'debug'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import { assertOkResponse } from './http-assertions.js'

const debug = createDebug('spark-piece-indexer:observer')

/** @import { Repository, IpniProviderInfo } from './typings.js' */

/**
* @returns {Promise<IpniProviderInfo[]>}
*/
export async function getProvidersWithMetadata () {
const res = await fetch('https://cid.contact/providers')
assertOkResponse(res)

const providers = /** @type {{
AddrInfo: {
ID: string;
Addrs: string[];
},
LastAdvertisement: {
"/": string;
},
LastAdvertisementTime: string;
Publisher: {
ID: string;
Addrs: string[];
},
// Ignored: ExtendedProviders, FrozenAt
* }[]}
*/(await res.json())

return providers.map(p => {
const providerId = p.Publisher.ID
const lastAdvertisementCID = p.LastAdvertisement['/']

// FIXME: handle empty Addrs[]
let providerAddress = p.Publisher.Addrs[0]
try {
providerAddress = multiaddrToUri(providerAddress)
} catch (err) {
debug('Cannot convert address to URI (provider: %s): %s', providerId, err)
}

return { providerId, providerAddress, lastAdvertisementCID }
})
}

/**
* @param {Repository} repository
* @param {IpniProviderInfo[]} ipniProviders
*/
export async function updateProviderStateFromIPNI (repository, ipniProviders) {
const providersWithState = await repository.getProvidersWithState()

for (const { providerId, providerAddress, lastAdvertisementCID } of ipniProviders) {
const status = providersWithState.get(providerId)
if (!status) {
const status = {
providerAddress,
lastHead: lastAdvertisementCID,
nextHead: lastAdvertisementCID,
head: lastAdvertisementCID,
tail: lastAdvertisementCID,
status: 'advertisement walk not started yet'
}
providersWithState.set(providerId, status)
debug('Initializing status for provider %s: %o', providerId, status)
continue
}

let updated = false
if (providerAddress !== status.providerAddress) {
debug('Updating provider address from %s to %s', status.providerAddress, providerAddress)
status.providerAddress = providerAddress
updated = true
}

// TODO: update the status

if (!updated) {
debug('No changes for provider %s', providerId)
providersWithState.delete(providerId)
}
}

await repository.updateProvidersWithState(providersWithState)
}
40 changes: 40 additions & 0 deletions indexer/lib/redis-repository.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/** @import { Repository, ProvidersWithState} from "./typings.js" */

/** @implements {Repository} */
export class RedisRepository {
#redis

/**
* @param {import('ioredis').Redis} redis
*/
constructor (redis) {
this.#redis = redis
}

/**
* @returns {Promise<ProvidersWithState>}
*/
async getProvidersWithState () {
/** @type {string[]} */
const providerIds = []
const keyStream = this.#redis.scanStream({
match: 'provider-state:*',
count: 1000
})
for await (const key of keyStream) {
const [, id] = key.split(':')
providerIds.push(id)
}

const rawStates = this.#redis.
// TODO
return new Map()
}

/**
* @param {ProvidersWithState} updates
*/
async updateProvidersWithState (updates) {
throw new Error('Method not implemented.')
}
}
28 changes: 28 additions & 0 deletions indexer/lib/typings.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export interface ProviderIndexingState {
providerAddress: string;
lastHead: string;
nextHead: string;
head: string;
tail: string;
status: string;
}

export type PiecePayloadCIDs = string[];

// Mapping providerIds to the indexing state
export type ProvidersWithState = Map<string, ProviderIndexingState>

export interface Repository {
getProvidersWithState(): Promise<ProvidersWithState>;
updateProvidersWithState(updates: ProvidersWithState): Promise<void>;

// addPiecePayloadCID(provider: string, pieceCid: string, payloadCid: string): Promise<void>;
// getPiecePayloadCIDs(provider: string, pieceCid: string): Promise<PiecePayloadCIDs | undefined>;
}

export interface IpniProviderInfo {
providerId: string;
providerAddress: string;
lastAdvertisementCID: string;
}

3 changes: 1 addition & 2 deletions indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
"type": "module",
"private": true,
"scripts": {
"dry-run": "node bin/dry-run.js",
"migrate": "node bin/migrate.js",
"start": "node bin/piece-indexer.js",
"lint": "standard",
"test": "node --test"
Expand All @@ -13,6 +11,7 @@
"standard": "^17.1.0"
},
"dependencies": {
"@multiformats/multiaddr-to-uri": "^10.1.0",
"debug": "^4.3.6",
"ioredis": "^5.4.1"
}
Expand Down
80 changes: 80 additions & 0 deletions indexer/test/observer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import createDebug from 'debug'
import { Redis } from 'ioredis'
import assert from 'node:assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { getProvidersWithMetadata, updateProviderStateFromIPNI } from '../lib/observer.js'
import { RedisRepository } from '../lib/redis-repository.js'

const debug = createDebug('test')

/** @import { ProvidersWithState, ProviderIndexingState } from '../lib/typings.js' */

// See https://github.com/filecoin-station/frisbii-on-fly
const FRISBII_ID = '12D3KooWC8gXxg9LoJ9h3hy3jzBkEAxamyHEQJKtRmAuBuvoMzpr'
const FRISBII_ADDRESS = 'https://frisbii.fly.dev'

describe('getProvidersWithMetadata', () => {
it('returns response including known providers', async () => {
const providers = await getProvidersWithMetadata()
debug(JSON.stringify(providers, null, 2))

const frisbiiOnFly = providers.find(
p => p.providerId === FRISBII_ID && p.providerAddress === FRISBII_ADDRESS
)

assert(frisbiiOnFly)
assert.match(frisbiiOnFly.lastAdvertisementCID, /^bagu/)
})
})

describe('updateProviderStateFromIPNI', () => {
/** @type {Redis} */
let redis

before(async () => {
redis = new Redis({ db: 1 })
})

beforeEach(async () => {
await redis.flushall()
})

after(async () => {
await redis?.disconnect()
})

it('creates an initial state for a new provider', async () => {
const repository = new RedisRepository(redis)
await updateProviderStateFromIPNI(repository, [
{
providerId: 'peer1',
providerAddress: 'https://example.com',
lastAdvertisementCID: 'bagu1'
}
])

const state = await repository.getProvidersWithState()
assertStateEqual(state, {
peer1: {
providerAddress: 'https://example.com',
lastHead: 'tbd',
nextHead: 'tbd',
head: 'tbd',
tail: 'tbd',
status: 'tbd'
}
})
})
})

/**
*
* @param {ProvidersWithState} actualMap
* @param {Record<string, ProviderIndexingState>} expectedObject
*/
function assertStateEqual (actualMap, expectedObject) {
assert.deepStrictEqual(
Object.fromEntries(actualMap.entries()),
expectedObject
)
}
Loading

0 comments on commit c53055f

Please sign in to comment.