Skip to content

Commit

Permalink
fixup! finish "update our state from IPNI state"
Browse files Browse the repository at this point in the history
Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos committed Sep 4, 2024
1 parent 2ec7716 commit fe0e9cc
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 453 deletions.
26 changes: 26 additions & 0 deletions indexer/bin/piece-indexer.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { Redis } from 'ioredis'
import { RedisRepository } from '../lib/redis-repository.js'
import { syncProvidersFromIPNI } from '../lib/ipni-watcher.js'
import timers from 'node:timers/promises'

const {
REDIS_URL: redisUrl = 'redis://localhost:6379'
} = process.env

// TODO: setup Sentry

const redisUrlParsed = new URL(redisUrl)
const redis = new Redis({
host: redisUrlParsed.hostname,
Expand All @@ -15,3 +20,24 @@ const redis = new Redis({
})

await redis.connect()

while (true) {
const started = Date.now()
try {
console.log('Syncing from IPNI')
const providers = await syncProvidersFromIPNI(new RedisRepository(redis))
console.log(
'Found %s providers, %s support(s) HTTP(s)',
providers.length,
providers.filter(p => p.providerAddress.match(/^https?:\/\//)).length
)
} catch (err) {
console.error('Cannot sync from IPNI.', err)
// TODO: log to Sentry
}
const delay = 6_000 - (Date.now() - started)
if (delay > 0) {
console.log('Waiting for %sms before the next sync from IPNI', delay)
await timers.setTimeout(delay)
}
}
88 changes: 0 additions & 88 deletions indexer/lib/observer.js

This file was deleted.

44 changes: 29 additions & 15 deletions indexer/lib/redis-repository.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/** @import { Repository, ProvidersWithState} from "./typings.js" */
/** @import { ProviderToIpniStateMap } from "./typings.js" */

/** @implements {Repository} */
export class RedisRepository {
#redis

Expand All @@ -12,29 +11,44 @@ export class RedisRepository {
}

/**
* @returns {Promise<ProvidersWithState>}
* @returns {Promise<ProviderToIpniStateMap>}
*/
async getProvidersWithState () {
async getIpniStateForAllProviders () {
/** @type {string[]} */
const providerIds = []
const redisKeys = []
const keyStream = this.#redis.scanStream({
match: 'provider-state:*',
match: 'ipni-state:*',
count: 1000
})
for await (const key of keyStream) {
const [, id] = key.split(':')
providerIds.push(id)
for await (const chunk of keyStream) {
redisKeys.push(...chunk)
}

const rawStates = this.#redis.
// TODO
return new Map()
const stateList = await this.#redis.mget(redisKeys)

/** @type {ProviderToIpniStateMap} */
const result = new Map()
for (let ix = 0; ix < redisKeys.length; ix++) {
const key = redisKeys[ix]
const stateStr = stateList[ix]
if (!stateStr) {
console.error('Unexpected Redis state: the existing key %s does not have any value', key)
continue
}
const providerId = key.split(':')[1]
result.set(providerId, JSON.parse(stateStr))
}

return result
}

/**
* @param {ProvidersWithState} updates
* @param {ProviderToIpniStateMap} stateMap
*/
async updateProvidersWithState (updates) {
throw new Error('Method not implemented.')
async setIpniStateForAllProviders (stateMap) {
const serialized = new Map(
Array.from(stateMap.entries()).map(([key, value]) => ([`ipni-state:${key}`, JSON.stringify(value)]))
)
await this.#redis.mset(serialized)
}
}
33 changes: 12 additions & 21 deletions indexer/lib/typings.d.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
export interface ProviderIndexingState {
/**
* Data extracted from IPNI response
*/
export interface IpniProviderInfo {
providerId: string;
providerAddress: string;
lastHead: string;
nextHead: string;
head: string;
tail: string;
status: string;
}

export type PiecePayloadCIDs = string[];

// Mapping providerIds to the indexing state
export type ProvidersWithState = Map<string, ProviderIndexingState>

export interface Repository {
getProvidersWithState(): Promise<ProvidersWithState>;
updateProvidersWithState(updates: ProvidersWithState): Promise<void>;

// addPiecePayloadCID(provider: string, pieceCid: string, payloadCid: string): Promise<void>;
// getPiecePayloadCIDs(provider: string, pieceCid: string): Promise<PiecePayloadCIDs | undefined>;
lastAdvertisementCID: string;
}

export interface IpniProviderInfo {
providerId: string;
/**
* Data stored in our database
*/
export interface ProviderIpniState {
providerAddress: string;
lastAdvertisementCID: string;
}

export type ProviderToIpniStateMap = Map<string, ProviderIpniState>;

1 change: 0 additions & 1 deletion indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
"standard": "^17.1.0"
},
"dependencies": {
"@multiformats/multiaddr-to-uri": "^10.1.0",
"debug": "^4.3.6",
"ioredis": "^5.4.1"
}
Expand Down
80 changes: 0 additions & 80 deletions indexer/test/observer.test.js

This file was deleted.

Loading

0 comments on commit fe0e9cc

Please sign in to comment.