From c5db35d9b006b597886440293ff5cbd86162452c Mon Sep 17 00:00:00 2001 From: Etan Kissling Date: Thu, 21 Mar 2024 08:19:57 +0100 Subject: [PATCH] annotate `upgrademngrs` with `{.async: (raises).}` (#1068) --- libp2p/multistream.nim | 10 +++-- libp2p/transports/transport.nim | 18 ++++----- libp2p/upgrademngrs/muxedupgrade.nim | 58 +++++++++++++++------------- libp2p/upgrademngrs/upgrade.nim | 37 +++++++++++------- 4 files changed, 69 insertions(+), 54 deletions(-) diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 155c9c1da3..2cd9ce2963 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -261,20 +261,22 @@ proc addHandler*(m: MultistreamSelect, match: matcher)) proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} = - let - handlers = m.handlers - futs = handlers.mapIt(it.protocol.start()) + # Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])` + var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len) + for it in m.handlers: + futs.add it.protocol.start() try: await allFutures(futs) for fut in futs: await fut except CancelledError as exc: var pending: seq[Future[void].Raising([])] + doAssert m.handlers.len == futs.len, "Handlers modified while starting" for i, fut in futs: if not fut.finished: pending.add noCancel fut.cancelAndWait() elif fut.completed: - pending.add handlers[i].protocol.stop() + pending.add m.handlers[i].protocol.stop() else: static: doAssert typeof(fut).E is (CancelledError,) await noCancel allFutures(pending) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 40b130a8cd..ee13addd8b 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -81,25 +81,25 @@ proc dial*( self.dial("", address) method upgrade*( - self: Transport, - conn: Connection, - peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} = + self: Transport, + conn: Connection, + peerId: Opt[PeerId] +): Future[Muxer] {.base, async: (raises: [ + CancelledError, LPError], raw: true).} = ## base upgrade method that the transport uses to perform ## transport specific upgrades ## - self.upgrader.upgrade(conn, peerId) method handles*( - self: Transport, - address: MultiAddress): bool {.base, gcsafe.} = + self: Transport, + address: MultiAddress): bool {.base, gcsafe.} = ## check if transport supports the multiaddress ## - # by default we skip circuit addresses to avoid # having to repeat the check in every transport let protocols = address.protocols.valueOr: return false - return protocols + protocols .filterIt( it == multiCodec("p2p-circuit") ).len == 0 diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index 400cd6cabe..1a3052ab84 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -25,53 +25,61 @@ type muxers*: seq[MuxerProvider] streamHandler*: StreamHandler -proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider = +func getMuxerByCodec( + self: MuxedUpgrade, muxerName: string): Opt[MuxerProvider] = + if muxerName.len == 0 or muxerName == "na": + return Opt.none(MuxerProvider) for m in self.muxers: if muxerName == m.codec: - return m + return Opt.some(m) + Opt.none(MuxerProvider) -proc mux*( +proc mux( self: MuxedUpgrade, - conn: Connection): Future[Muxer] {.async.} = + conn: Connection +): Future[Opt[Muxer]] {.async: (raises: [ + CancelledError, LPStreamError, MultiStreamError]).} = ## mux connection - trace "Muxing connection", conn if self.muxers.len == 0: warn "no muxers registered, skipping upgrade flow", conn - return - - let muxerName = - if conn.dir == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec)) - else: await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec)) - - if muxerName.len == 0 or muxerName == "na": - debug "no muxer available, early exit", conn - return + return Opt.none(Muxer) + + let + muxerName = + case conn.dir + of Direction.Out: + await self.ms.select(conn, self.muxers.mapIt(it.codec)) + of Direction.In: + await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec)) + muxerProvider = self.getMuxerByCodec(muxerName).valueOr: + debug "no muxer available, early exit", conn, muxerName + return Opt.none(Muxer) trace "Found a muxer", conn, muxerName # create new muxer for connection - let muxer = self.getMuxerByCodec(muxerName).newMuxer(conn) + let muxer = muxerProvider.newMuxer(conn) # install stream handler muxer.streamHandler = self.streamHandler muxer.handler = muxer.handle() - return muxer + Opt.some(muxer) method upgrade*( self: MuxedUpgrade, conn: Connection, - peerId: Opt[PeerId]): Future[Muxer] {.async.} = + peerId: Opt[PeerId] +): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} = trace "Upgrading connection", conn, direction = conn.dir - let sconn = await self.secure(conn, peerId) # secure the connection + let sconn = await self.secure(conn, peerId) # secure the connection if sconn == nil: - raise newException(UpgradeFailedError, + raise (ref UpgradeFailedError)(msg: "unable to secure connection, stopping upgrade") - let muxer = await self.mux(sconn) # mux it if possible - if muxer == nil: - raise newException(UpgradeFailedError, + let muxer = (await self.mux(sconn)).valueOr: # mux it if possible + raise (ref UpgradeFailedError)(msg: "a muxer is required for outgoing connections") when defined(libp2p_agents_metrics): @@ -79,11 +87,11 @@ method upgrade*( if sconn.closed(): await sconn.close() - raise newException(UpgradeFailedError, + raise (ref UpgradeFailedError)(msg: "Connection closed or missing peer info, stopping upgrade") trace "Upgraded connection", conn, sconn, direction = conn.dir - return muxer + muxer proc new*( T: type MuxedUpgrade, @@ -101,8 +109,6 @@ proc new*( await upgrader.ms.handle(conn) # handle incoming connection except CancelledError as exc: return - except CatchableError as exc: - trace "exception in stream handler", conn, msg = exc.msg finally: await conn.closeWithEOF() trace "Stream handler done", conn diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim index 78ae438cc6..cb120a4375 100644 --- a/libp2p/upgrademngrs/upgrade.nim +++ b/libp2p/upgrademngrs/upgrade.nim @@ -1,5 +1,5 @@ # Nim-LibP2P -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -24,8 +24,10 @@ import ../stream/connection, export connmanager, connection, identify, secure, multistream -declarePublicCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades") -declarePublicCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades") +declarePublicCounter(libp2p_failed_upgrades_incoming, + "incoming connections failed upgrades") +declarePublicCounter(libp2p_failed_upgrades_outgoing, + "outgoing connections failed upgrades") logScope: topics = "libp2p upgrade" @@ -38,23 +40,28 @@ type secureManagers*: seq[Secure] method upgrade*( - self: Upgrade, - conn: Connection, - peerId: Opt[PeerId]): Future[Muxer] {.base.} = - doAssert(false, "Not implemented!") + self: Upgrade, + conn: Connection, + peerId: Opt[PeerId] +): Future[Muxer] {.async: (raises: [ + CancelledError, LPError], raw: true), base.} = + raiseAssert("Not implemented!") proc secure*( - self: Upgrade, - conn: Connection, - peerId: Opt[PeerId]): Future[Connection] {.async.} = + self: Upgrade, + conn: Connection, + peerId: Opt[PeerId] +): Future[Connection] {.async: (raises: [CancelledError, LPError]).} = if self.secureManagers.len <= 0: - raise newException(UpgradeFailedError, "No secure managers registered!") + raise (ref UpgradeFailedError)(msg: "No secure managers registered!") let codec = - if conn.dir == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec)) - else: await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec)) + if conn.dir == Out: + await self.ms.select(conn, self.secureManagers.mapIt(it.codec)) + else: + await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec)) if codec.len == 0: - raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") + raise (ref UpgradeFailedError)(msg: "Unable to negotiate a secure channel!") trace "Securing connection", conn, codec let secureProtocol = self.secureManagers.filterIt(it.codec == codec) @@ -63,4 +70,4 @@ proc secure*( # let's avoid duplicating checks but detect if it fails to do it properly doAssert(secureProtocol.len > 0) - return await secureProtocol[0].secure(conn, peerId) + await secureProtocol[0].secure(conn, peerId)