Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add webtransport listener #2422

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c722ce1
feat: add webtransport listener
achingbrain Feb 23, 2024
e7c33fb
Merge remote-tracking branch 'origin/main' into feat/add-webtransport…
achingbrain Apr 3, 2024
016da0a
chore: fix up tests
achingbrain Apr 3, 2024
19e5dd0
chore: fix deps
achingbrain Apr 3, 2024
c31c054
chore: most tests passing
achingbrain Apr 8, 2024
1f6f0f1
chore: remove log
achingbrain Apr 8, 2024
dac49e6
Merge remote-tracking branch 'origin/main' into feat/add-webtransport…
achingbrain Apr 8, 2024
2581673
chore: update go version
achingbrain Apr 8, 2024
74d0a7d
chore: update go version
achingbrain Apr 8, 2024
a9ffb09
Merge remote-tracking branch 'origin/main' into feat/add-webtransport…
achingbrain Apr 28, 2024
0eadc67
chore: update version
achingbrain Apr 28, 2024
825a297
chore: missed merge
achingbrain Apr 28, 2024
c9df7a2
chore: consistent logging
achingbrain Apr 28, 2024
5f35c6a
chore: install go on node runners
achingbrain Apr 28, 2024
9fe17b0
chore: install go on electron runners
achingbrain Apr 28, 2024
8b5e9b7
chore: generate certs on listen
achingbrain Apr 28, 2024
0b4ec75
Merge branch 'main' into feat/add-webtransport-listener
achingbrain Apr 29, 2024
c7a552f
Merge branch 'main' into feat/add-webtransport-listener
achingbrain Apr 30, 2024
f453592
chore: run webtransport integration tests
achingbrain Apr 30, 2024
7862f36
Merge remote-tracking branch 'origin/main' into feat/add-webtransport…
achingbrain May 1, 2024
670e96f
chore: unskip tests
achingbrain May 1, 2024
ec7add6
Merge remote-tracking branch 'origin/main' into feat/add-webtransport…
achingbrain May 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ jobs:
fail-fast: true
steps:
- uses: actions/checkout@v4
# needed for webtransport tests, remove after https://github.com/libp2p/js-libp2p/pull/2422
- uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0
with:
go-version: '1.22'
- uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node }}
Expand Down Expand Up @@ -147,6 +151,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# needed for webtransport tests, remove after https://github.com/libp2p/js-libp2p/pull/2422
- uses: actions/setup-go@0c52d547c9bc32b1aa3301fd7a9cb496313a4491 # v5.0.0
with:
go-version: '1.22'
- uses: actions/setup-node@v4
with:
node-version: lts/*
Expand Down
2 changes: 1 addition & 1 deletion packages/integration-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"@libp2p/interface": "^1.3.1",
"@libp2p/interface-compliance-tests": "^5.4.4",
"@libp2p/interface-internal": "^1.2.1",
"@libp2p/interop": "^11.0.0",
"@libp2p/interop": "^12.0.2",
"@libp2p/kad-dht": "^12.0.16",
"@libp2p/logger": "^4.0.12",
"@libp2p/mdns": "^10.0.23",
Expand Down
17 changes: 14 additions & 3 deletions packages/integration-tests/test/interop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { mplex } from '@libp2p/mplex'
import { peerIdFromKeys } from '@libp2p/peer-id'
import { tcp } from '@libp2p/tcp'
import { tls } from '@libp2p/tls'
import { webTransport } from '@libp2p/webtransport'
import { multiaddr } from '@multiformats/multiaddr'
import { execa } from 'execa'
import { path as p2pd } from 'go-libp2p'
Expand Down Expand Up @@ -46,7 +47,11 @@ async function createGoPeer (options: SpawnOptions): Promise<Daemon> {
if (options.noListen === true) {
opts.push('-noListenAddrs')
} else {
opts.push('-hostAddrs=/ip4/127.0.0.1/tcp/0')
if (options.transport === 'webtransport') {
opts.push('-hostAddrs=/ip4/127.0.0.1/udp/0/quic-v1/webtransport')
} else {
opts.push('-hostAddrs=/ip4/127.0.0.1/tcp/0')
}
}

if (options.encryption != null) {
Expand Down Expand Up @@ -122,16 +127,22 @@ async function createJsPeer (options: SpawnOptions): Promise<Daemon> {
const opts: Libp2pOptions<ServiceMap> = {
peerId,
addresses: {
listen: options.noListen === true ? [] : ['/ip4/127.0.0.1/tcp/0']
listen: []
},
transports: [tcp(), circuitRelayTransport()],
transports: [tcp(), circuitRelayTransport(), webTransport()],
streamMuxers: [],
connectionEncryption: [noise()],
connectionManager: {
minConnections: 0
}
}

if (options.transport === 'webtransport') {
opts.addresses?.listen?.push('/ip4/127.0.0.1/udp/0/quic-v1/webtransport')
} else if (options.transport === 'tcp') {
opts.addresses?.listen?.push('/ip4/127.0.0.1/tcp/0')
}

const services: ServiceFactoryMap = {
identify: identify()
}
Expand Down
8 changes: 6 additions & 2 deletions packages/transport-webtransport/.aegir.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { createClient } from '@libp2p/daemon-client'
import { multiaddr } from '@multiformats/multiaddr'
import { execa } from 'execa'
import { path as p2pd } from 'go-libp2p'
import pDefer from 'p-defer'
Expand Down Expand Up @@ -27,6 +25,12 @@ export default {
}

async function createGoLibp2p () {
// have to import these dynamically as they have a transitive dependency on
// @libp2p/interface which is part of this monorepo so may not have been built
// yet
const { multiaddr } = await import('@multiformats/multiaddr')
const { createClient } = await import('@libp2p/daemon-client')

const controlPort = Math.floor(Math.random() * (50000 - 10000 + 1)) + 10000
const apiAddr = multiaddr(`/ip4/127.0.0.1/tcp/${controlPort}`)
const deferred = pDefer()
Expand Down
15 changes: 11 additions & 4 deletions packages/transport-webtransport/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,36 @@
"scripts": {
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"dep-check": "aegir dep-check -i @fails-components/webtransport-transport-http3-quiche",
"doc-check": "aegir doc-check",
"build": "aegir build",
"test": "aegir test -t browser -t webworker",
"test": "aegir test",
"test:node": "aegir test -t node --cov",
"test:chrome": "aegir test -t browser --cov",
"test:chrome-webworker": "aegir test -t webworker"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.0.0",
"@fails-components/webtransport": "^1.1.0",
"@fails-components/webtransport-transport-http3-quiche": "^1.1.0",
"@libp2p/interface": "^1.3.1",
"@libp2p/peer-id": "^4.1.1",
"@libp2p/utils": "^5.4.1",
"@multiformats/multiaddr": "^12.2.3",
"@multiformats/multiaddr-matcher": "^1.2.1",
"@peculiar/x509": "^1.9.7",
"browser-readablestream-to-it": "^2.0.5",
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"race-signal": "^1.0.2",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.1.0"
},
"devDependencies": {
"@libp2p/daemon-client": "^8.0.5",
"@libp2p/interface-compliance-tests": "^5.4.4",
"@libp2p/logger": "^4.0.12",
"@libp2p/peer-id-factory": "^4.1.1",
"@libp2p/daemon-client": "^8.0.5",
"@libp2p/ping": "^1.0.18",
"@noble/hashes": "^1.4.0",
"aegir": "^42.2.11",
Expand All @@ -75,7 +81,8 @@
"it-to-buffer": "^4.0.7",
"libp2p": "^1.5.2",
"p-defer": "^4.0.1",
"p-wait-for": "^5.0.2"
"p-wait-for": "^5.0.2",
"sinon": "^18.0.0"
},
"browser": {
"./dist/src/listener.js": "./dist/src/listener.browser.js",
Expand Down
150 changes: 150 additions & 0 deletions packages/transport-webtransport/src/create-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { Http3Server } from '@fails-components/webtransport'
import { TypedEventEmitter } from '@libp2p/interface'
import { raceSignal } from 'race-signal'
import type { HttpServerInit, WebTransportSession } from '@fails-components/webtransport'
import type { ComponentLogger, Logger, TypedEventTarget } from '@libp2p/interface'

interface WebTransportServerEvents extends Record<string, any> {
listening: CustomEvent
session: CustomEvent<WebTransportSession>
close: CustomEvent
error: CustomEvent<Error>
}

export interface WebTransportServer extends TypedEventTarget<WebTransportServerEvents> {
listening: boolean
sessionTimeout: number

close(callback?: () => void): void
listen(): void
address(): { port: number, host: string, family: 'IPv4' | 'IPv6' } | null
}

export interface WebTransportServerComponents {
logger: ComponentLogger
}

class DefaultWebTransportServer extends TypedEventEmitter<WebTransportServerEvents> implements WebTransportServer {
private readonly server: Http3Server
public listening: boolean
/**
* How long in ms to wait for an incoming session to be ready
*/
public sessionTimeout: number
private readonly log: Logger

constructor (components: WebTransportServerComponents, init: HttpServerInit) {
super()

this.server = new Http3Server(init)
this.listening = false
this.log = components.logger.forComponent('libp2p:webtransport:server')

this.sessionTimeout = 1000
}

close (callback?: () => void): void {
if (callback != null) {
this.addEventListener('close', callback)
}

this.server.stopServer()
this.server.closed
.then(() => {
this.listening = false
this.safeDispatchEvent('close')
})
.catch((err) => {
this.safeDispatchEvent('error', { detail: err })
})
}

listen (): void {
this.server.startServer()
this.server.ready
.then(() => {
this.server.setRequestCallback(async (args: any): Promise<any> => {
const url = args.header[':path']
const [path] = url.split('?')

if (this.server.sessionController[path] == null) {
return {
...args,
path,
status: 404
}
}

return {
...args,
path,
userData: {
search: url.substring(path.length)
},
header: {
...args.header,
':path': path
},
status: 200
}
})

this.listening = true
this.safeDispatchEvent('listening')

this.log('ready, processing incoming sessions')
this._processIncomingSessions()
.catch(err => {
this.safeDispatchEvent('error', { detail: err })
})
})
.catch((err) => {
this.safeDispatchEvent('error', { detail: err })
})
}

address (): { port: number, host: string, family: 'IPv4' | 'IPv6' } | null {
return this.server.address()
}

async _processIncomingSessions (): Promise<void> {
const sessionStream = this.server.sessionStream('/.well-known/libp2p-webtransport')
const sessionReader = sessionStream.getReader()

while (true) {
const { done, value: session } = await sessionReader.read()

if (done) {
this.log('session reader finished')
break
}

this.log('new incoming session')
void Promise.resolve()
.then(async () => {
try {
await raceSignal(session.ready, AbortSignal.timeout(this.sessionTimeout))
this.log('session ready')

this.safeDispatchEvent('session', { detail: session })
} catch (err) {
this.log.error('error waiting for session to become ready', err)
}
})
}
}
}

export interface SessionHandler {
(event: CustomEvent<WebTransportSession>): void
}

export function createServer (components: WebTransportServerComponents, init: HttpServerInit, sessionHandler?: SessionHandler): WebTransportServer {
const server = new DefaultWebTransportServer(components, init)

if (sessionHandler != null) {
server.addEventListener('session', sessionHandler)
}

return server
}
8 changes: 6 additions & 2 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@
throw new CodeError('Failed to authenticate webtransport', 'ERR_AUTHENTICATION_FAILED')
}

if (options?.signal?.aborted === true) {
throw new AbortError()
}

Check warning on line 196 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L195-L196

Added lines #L195 - L196 were not covered by tests

this.metrics?.dialerEvents.increment({ open: true })

maConn = {
Expand Down Expand Up @@ -303,8 +307,8 @@
/**
* Filter check for all Multiaddrs that this transport can listen on
*/
listenFilter (): Multiaddr[] {
return []
listenFilter (multiaddrs: Multiaddr[]): Multiaddr[] {
return multiaddrs.filter(ma => WebTransportMatcher.exactMatch(ma))

Check warning on line 311 in packages/transport-webtransport/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-webtransport/src/index.ts#L311

Added line #L311 was not covered by tests
}

/**
Expand Down
Loading
Loading