Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] refactor: daemon modules #2

Merged
merged 62 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
e04e145
Refactor tick into class.
saul-jb Mar 27, 2024
4a64494
Simplify argv and config parsing.
saul-jb Mar 27, 2024
0d665f9
Unify config interface.
saul-jb Mar 27, 2024
96f9613
Make stop wait for the loop promise to end in tick.
saul-jb Mar 27, 2024
e9d58e6
Pass signal to method in tick.
saul-jb Mar 27, 2024
0f4703e
Make events public.
saul-jb Mar 27, 2024
cd69540
Split the pin manger from the downloader.
saul-jb Mar 27, 2024
df214a0
Rename downloader.
saul-jb Mar 27, 2024
259d76d
Setup modules.
saul-jb Mar 27, 2024
d221e5d
Add welo.
saul-jb Mar 27, 2024
77ec585
Add groups.
saul-jb Mar 27, 2024
3d5fefc
Add sneakernet.
saul-jb Mar 27, 2024
abca7c1
Add entry tracker.
saul-jb Mar 27, 2024
3a9fdce
Add net server.
saul-jb Mar 27, 2024
8ea20f0
Add components interface.
saul-jb Mar 27, 2024
611ee81
Add welo to components interface.
saul-jb Mar 27, 2024
8963ac6
Fix rpc commands.
saul-jb Mar 27, 2024
e2dd5a1
Update index.
saul-jb Mar 27, 2024
e5b32e5
Update module interface.
saul-jb Mar 27, 2024
b4f5d94
Fix scheduler module.
saul-jb Mar 27, 2024
2ebf493
Fix most of the revisions module.
saul-jb Mar 27, 2024
5c6dda3
Add helia pin manager to components.
saul-jb Mar 27, 2024
dede374
Fix filesystem module.
saul-jb Mar 27, 2024
07a3dcf
Nuke old modules.
saul-jb Mar 27, 2024
c766658
Linting.
saul-jb Mar 27, 2024
204cf19
Fix other filesystem command.
saul-jb Mar 27, 2024
02e2fc4
Fix events.
saul-jb Mar 27, 2024
21380bd
Fix revisions module.
saul-jb Mar 27, 2024
7baa79d
Setup common commands.
saul-jb Mar 27, 2024
139f7c1
Fix config and welo start.
saul-jb Mar 27, 2024
bb7500b
Fix events.
saul-jb Mar 27, 2024
eb8e9cc
Disable daemon tests.
saul-jb Mar 27, 2024
8ff4e69
Nuke the daemon tests.
saul-jb Mar 27, 2024
31bafa2
Linting.
saul-jb Mar 27, 2024
87c9d7a
Linting.
saul-jb Mar 27, 2024
e271e84
Fit stat commands.
saul-jb Mar 27, 2024
693fd1d
Fix pin manager's datastore.
saul-jb Mar 28, 2024
27f8e44
Restore daemon tests.
saul-jb Mar 28, 2024
b60fc4d
Fix group creation util.
saul-jb Mar 28, 2024
744a61a
Fix tick test.
saul-jb Mar 28, 2024
8f021c5
Fix argv test.
saul-jb Mar 28, 2024
9de6337
Fix config test.
saul-jb Mar 28, 2024
81c8437
Make config path optional.
saul-jb Mar 28, 2024
bd3cca2
Replace the abort controller with stop method.
saul-jb Mar 28, 2024
cfbaedb
Make key optional.
saul-jb Mar 28, 2024
e4d4700
Get network test working again.
saul-jb Mar 28, 2024
c2056eb
Fix config test.
saul-jb Mar 28, 2024
eed2234
Stop all the components in network test.
saul-jb Mar 28, 2024
d3bfbbe
Fix revisions test.
saul-jb Mar 28, 2024
1a7f87f
Fix groups test.
saul-jb Mar 28, 2024
4fce740
Fix filesystem test.
saul-jb Mar 28, 2024
ac805ff
Fix download test.
saul-jb Mar 28, 2024
d721d8a
Fix base test.
saul-jb Mar 28, 2024
2427550
Delete old mocks.
saul-jb Mar 28, 2024
e69ba88
Delete old sigint test.
saul-jb Mar 28, 2024
3703d6d
Fix rpc test.
saul-jb Mar 28, 2024
c7d9ce9
Linting.
saul-jb Mar 28, 2024
545701b
Fix libp2p peristent storage test.
saul-jb Mar 28, 2024
1600ac9
Fix datastore/blockstore cleanup.
saul-jb Mar 28, 2024
427e2ed
Add keyManager option to components setup.
saul-jb Mar 28, 2024
738f05b
Fix libp2p persistent storage test.
saul-jb Mar 28, 2024
5b397e1
Add test output to git ignore.
saul-jb Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/data
/config
/packages/*/test-out

build
dist
Expand Down
10 changes: 10 additions & 0 deletions packages/daemon/src/common/commands/addresses.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Addresses } from '@organicdesign/db-rpc-interfaces'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod = ({ net, libp2p }) => {
net.rpc.addMethod(Addresses.name, async (): Promise<Addresses.Return> => {
return libp2p.getMultiaddrs().map(a => a.toString())
})
}

export default command
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { multiaddr } from '@multiformats/multiaddr'
import { Connect } from '@organicdesign/db-rpc-interfaces'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(Connect.name, async (raw: unknown): Promise<Connect.Return> => {
const command: ModuleMethod = ({ net, libp2p }) => {
net.rpc.addMethod(Connect.name, async (raw: unknown): Promise<Connect.Return> => {
const params = Connect.Params.parse(raw)
const address = multiaddr(params.address)

await context.libp2p.dial(address)
await libp2p.dial(address)

return null
})
Expand Down
10 changes: 10 additions & 0 deletions packages/daemon/src/common/commands/connections.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Connections } from '@organicdesign/db-rpc-interfaces'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod = ({ net, libp2p }) => {
net.rpc.addMethod(Connections.name, async (): Promise<Connections.Return> => {
return libp2p.getConnections().map(c => c.remoteAddr.toString())
})
}

export default command
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { CountPeers } from '@organicdesign/db-rpc-interfaces'
import { CID } from 'multiformats/cid'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(CountPeers.name, async (raw: unknown): Promise<CountPeers.Return> => {
const command: ModuleMethod = ({ net, libp2p }) => {
net.rpc.addMethod(CountPeers.name, async (raw: unknown): Promise<CountPeers.Return> => {
const countPeers = async (cid: CID, options?: { timeout: number }): Promise<number> => {
let count = 0

const itr = context.libp2p.contentRouting.findProviders(cid, {
const itr = libp2p.contentRouting.findProviders(cid, {
signal: AbortSignal.timeout(options?.timeout ?? 3000)
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import { CreateGroup } from '@organicdesign/db-rpc-interfaces'
import { fromString as uint8ArrayFromString } from 'uint8arrays'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(CreateGroup.name, async (raw: unknown): Promise<CreateGroup.Return> => {
const command: ModuleMethod = ({ net, welo, groups }) => {
net.rpc.addMethod(CreateGroup.name, async (raw: unknown): Promise<CreateGroup.Return> => {
const params = CreateGroup.Params.parse(raw)
const peerValues = params.peers.map(p => uint8ArrayFromString(p, 'base58btc'))

const manifest = await context.welo.determine({
const manifest = await welo.determine({
name: params.name,
meta: { type: 'group' },
access: {
protocol: '/hldb/access/static',
config: { write: [context.welo.identity.id, ...peerValues] }
config: { write: [welo.identity.id, ...peerValues] }
}
})

await context.groups.add(manifest)
await groups.add(manifest)

return manifest.address.cid.toString()
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { GetSpeeds } from '@organicdesign/db-rpc-interfaces'
import { CID } from 'multiformats/cid'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(GetSpeeds.name, async (raw: unknown): Promise<GetSpeeds.Return> => {
const command: ModuleMethod = ({ net, pinManager }) => {
net.rpc.addMethod(GetSpeeds.name, async (raw: unknown): Promise<GetSpeeds.Return> => {
const params = GetSpeeds.Params.parse(raw)

return Promise.all(params.cids.map(async str => {
const cid = CID.parse(str)
const speed = await context.pinManager.getSpeed(cid, params.range)
const speed = await pinManager.getSpeed(cid, params.range)

return {
cid: str,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import { GetStatus } from '@organicdesign/db-rpc-interfaces'
import { CID } from 'multiformats/cid'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(GetStatus.name, async (raw: unknown): Promise<GetStatus.Return> => {
const command: ModuleMethod = ({ pinManager, net }) => {
net.rpc.addMethod(GetStatus.name, async (raw: unknown): Promise<GetStatus.Return> => {
const params = GetStatus.Params.parse(raw)

return Promise.all(params.cids.map(async str => {
const cid = CID.parse(str)

const [state, blocks, size] = await Promise.all([
context.pinManager.getState(cid),
context.pinManager.getBlockCount(cid),
context.pinManager.getSize(cid)
pinManager.getState(cid),
pinManager.getBlockCount(cid),
pinManager.getSize(cid)
])

return {
Expand Down
11 changes: 11 additions & 0 deletions packages/daemon/src/common/commands/id.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { ID } from '@organicdesign/db-rpc-interfaces'
import { toString as uint8ArrayToString } from 'uint8arrays'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod = ({ net, welo }) => {
net.rpc.addMethod(ID.name, async (): Promise<ID.Return> => {
return uint8ArrayToString(welo.identity.id, 'base58btc')
})
}

export default command
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import { JoinGroup } from '@organicdesign/db-rpc-interfaces'
import { Address } from 'welo'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(JoinGroup.name, async (raw: unknown): Promise<JoinGroup.Return> => {
const command: ModuleMethod = ({ net, welo, groups }) => {
net.rpc.addMethod(JoinGroup.name, async (raw: unknown): Promise<JoinGroup.Return> => {
const params = JoinGroup.Params.parse(raw)
const manifest = await context.welo.fetch(Address.fromString(`/hldb/${params.group}`))
const manifest = await welo.fetch(Address.fromString(`/hldb/${params.group}`))

try {
await context.groups.add(manifest)
await groups.add(manifest)
} catch (error) {
if ((error as Error).message.includes('is already open')) {
throw new Error('group has already been joined')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { ListGroups } from '@organicdesign/db-rpc-interfaces'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(ListGroups.name, async (): Promise<ListGroups.Return> => {
const command: ModuleMethod = ({ net, groups }) => {
net.rpc.addMethod(ListGroups.name, async (): Promise<ListGroups.Return> => {
const promises: Array<{ group: string, name: string }> = []

for (const { key: cid, value: database } of context.groups.all()) {
for (const { key: cid, value: database } of groups.all()) {
promises.push({ group: cid, name: database.manifest.name })
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import Path from 'path'
import { SetPriority } from '@organicdesign/db-rpc-interfaces'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod<Provides, Requires> = (context, { rpc }) => {
rpc.addMethod(SetPriority.name, async (raw: unknown): Promise<SetPriority.Return> => {
const command: ModuleMethod = ({ net, pinManager }) => {
net.rpc.addMethod(SetPriority.name, async (raw: unknown): Promise<SetPriority.Return> => {
const params = SetPriority.Params.parse(raw)
const key = Path.join('/', params.group, params.path)
const pinInfo = await context.pinManager.get(key)
const pinInfo = await pinManager.get(key)

if (pinInfo == null) {
throw new Error('no such pin')
}

pinInfo.priority = params.priority

await context.pinManager.put(key, pinInfo)
await pinManager.put(key, pinInfo)

return null
})
Expand Down
14 changes: 14 additions & 0 deletions packages/daemon/src/common/commands/sneakernet-receive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { SneakernetReveive } from '@organicdesign/db-rpc-interfaces'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod = ({ sneakernet, net }) => {
net.rpc.addMethod(SneakernetReveive.name, async (raw: unknown): Promise<SneakernetReveive.Return> => {
const params = SneakernetReveive.Params.parse(raw)

await sneakernet.import(params.path)

return null
})
}

export default command
14 changes: 14 additions & 0 deletions packages/daemon/src/common/commands/sneakernet-send.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { SneakernetSend } from '@organicdesign/db-rpc-interfaces'
import type { ModuleMethod } from '@/interface.js'

const command: ModuleMethod = ({ sneakernet, net }) => {
net.rpc.addMethod(SneakernetSend.name, async (raw: unknown): Promise<SneakernetSend.Return> => {
const params = SneakernetSend.Params.parse(raw)

await sneakernet.export(params.path, params.peers)

return null
})
}

export default command
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Sync } from '@organicdesign/db-rpc-interfaces'
import { HeadsExchange } from 'welo/utils/heads-exchange'
import { cidstring } from 'welo/utils/index'
import { getHeads, addHeads } from 'welo/utils/replicator'
import type { Provides, Requires } from '../index.js'
import type { ModuleMethod } from '@/interface.js'
import type { Peer, Libp2p } from '@libp2p/interface'
import type { Database } from 'welo'
Expand Down Expand Up @@ -59,18 +58,18 @@ const sync = async (libp2p: Libp2p, peer: Peer, database: Database, options: Par
await stream.close()
}

const command: ModuleMethod<Provides, Requires> = (context, { rpc, network }) => {
rpc.addMethod(Sync.name, async (): Promise<Sync.Return> => {
const peers = network.libp2p.getPeers()
const databases = context.welo.opened.values()
const command: ModuleMethod = ({ libp2p, welo, net }) => {
net.rpc.addMethod(Sync.name, async (): Promise<Sync.Return> => {
const peers = libp2p.getPeers()
const databases = welo.opened.values()

const promises: Array<Promise<void>> = []

for (const peerId of peers) {
const peer = await network.libp2p.peerStore.get(peerId)
const peer = await libp2p.peerStore.get(peerId)

for (const database of databases) {
promises.push(sync(network.libp2p, peer, database))
promises.push(sync(libp2p, peer, database))
}
}

Expand Down
122 changes: 122 additions & 0 deletions packages/daemon/src/common/downloader/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import parallel from 'it-parallel'
import { pipe } from 'it-pipe'
import { collect } from 'streaming-iterables'
import { Event, EventTarget } from 'ts-event-target'
import { linearWeightTranslation } from './utils.js'
import type { PinManager } from '../pin-manager/index.js'
import type { Startable } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export class DownloadErrorEvent extends Event<'download:error'> {
readonly error: Error

constructor (error: Error) {
super('download:error')

this.error = error
}
}

export class Downloader implements Startable {
private readonly slots: number
private readonly pinManager: PinManager
private controller: AbortController = new AbortController()
private loopPromise: Promise<void> | null = null
readonly events = new EventTarget<[DownloadErrorEvent]>()

constructor (pinManager: PinManager, slots: number) {
this.slots = slots
this.pinManager = pinManager
}

async start (): Promise<void> {
await this.loopPromise
this.controller = new AbortController()
this.loopPromise = this.loop()
}

async stop (): Promise<void> {
this.controller.abort()
await this.loopPromise
}

private async loop (): Promise<void> {
for (;;) {
if (this.isAborted) {
return
}

await pipe(
() => this.delay(),
i => this.getPins(i),
i => this.batchDownload(i),
i => parallel(i, { concurrency: this.slots, ordered: false }),
i => this.catcher(i),
async i => collect(i)
)

await new Promise(resolve => setTimeout(resolve, 100))
}
}

private async * batchDownload (itr: AsyncIterable<[CID, number]>): AsyncGenerator<() => Promise<{ cid: CID, block: Uint8Array }>, void, undefined> {
for await (const [cid, priority] of itr) {
if (this.isAborted) {
return
}

const weight = Math.floor(linearWeightTranslation(priority / 100) * this.slots) + 1
const downloaders = await this.pinManager.download(cid, { limit: weight })

yield * downloaders
}
}

private async * catcher <T = unknown>(itr: AsyncIterable<T>): AsyncIterable<T> {
try {
yield * itr
} catch (e) {
const error = e instanceof Error ? e : new Error(JSON.stringify(e))

this.events.dispatchEvent(new DownloadErrorEvent(error))

yield * this.catcher(itr)
}
}

private async * delay (): AsyncGenerator<undefined> {
for (;;) {
if (this.isAborted) {
return
}

await new Promise(resolve => setTimeout(resolve, 100))
yield
}
}

async * getPins (loop: AsyncIterable<void>): AsyncGenerator<[CID, number]> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of loop) {
for await (const { value } of this.pinManager.getActive()) {
if (this.isAborted) {
return
}

yield [value.cid, value.priority]
}
}
}

private get isAborted (): boolean {
return this.controller.signal.aborted
}
}

export const createDownloader = async (pinManage: PinManager, slots: number): Promise<Downloader> => {
const downloader = new Downloader(pinManage, slots)

await downloader.start()

return downloader
}
Loading
Loading