Skip to content

Commit c5db35d

Browse files
authored
annotate upgrademngrs with {.async: (raises).} (#1068)
1 parent d1e51be commit c5db35d

File tree

4 files changed

+69
-54
lines changed

4 files changed

+69
-54
lines changed

libp2p/multistream.nim

+6-4
Original file line numberDiff line numberDiff line change
@@ -261,20 +261,22 @@ proc addHandler*(m: MultistreamSelect,
261261
match: matcher))
262262

263263
proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
264-
let
265-
handlers = m.handlers
266-
futs = handlers.mapIt(it.protocol.start())
264+
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])`
265+
var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len)
266+
for it in m.handlers:
267+
futs.add it.protocol.start()
267268
try:
268269
await allFutures(futs)
269270
for fut in futs:
270271
await fut
271272
except CancelledError as exc:
272273
var pending: seq[Future[void].Raising([])]
274+
doAssert m.handlers.len == futs.len, "Handlers modified while starting"
273275
for i, fut in futs:
274276
if not fut.finished:
275277
pending.add noCancel fut.cancelAndWait()
276278
elif fut.completed:
277-
pending.add handlers[i].protocol.stop()
279+
pending.add m.handlers[i].protocol.stop()
278280
else:
279281
static: doAssert typeof(fut).E is (CancelledError,)
280282
await noCancel allFutures(pending)

libp2p/transports/transport.nim

+9-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -81,25 +81,25 @@ proc dial*(
8181
self.dial("", address)
8282

8383
method upgrade*(
84-
self: Transport,
85-
conn: Connection,
86-
peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} =
84+
self: Transport,
85+
conn: Connection,
86+
peerId: Opt[PeerId]
87+
): Future[Muxer] {.base, async: (raises: [
88+
CancelledError, LPError], raw: true).} =
8789
## base upgrade method that the transport uses to perform
8890
## transport specific upgrades
8991
##
90-
9192
self.upgrader.upgrade(conn, peerId)
9293

9394
method handles*(
94-
self: Transport,
95-
address: MultiAddress): bool {.base, gcsafe.} =
95+
self: Transport,
96+
address: MultiAddress): bool {.base, gcsafe.} =
9697
## check if transport supports the multiaddress
9798
##
98-
9999
# by default we skip circuit addresses to avoid
100100
# having to repeat the check in every transport
101101
let protocols = address.protocols.valueOr: return false
102-
return protocols
102+
protocols
103103
.filterIt(
104104
it == multiCodec("p2p-circuit")
105105
).len == 0

libp2p/upgrademngrs/muxedupgrade.nim

+32-26
Original file line numberDiff line numberDiff line change
@@ -25,65 +25,73 @@ type
2525
muxers*: seq[MuxerProvider]
2626
streamHandler*: StreamHandler
2727

28-
proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider =
28+
func getMuxerByCodec(
29+
self: MuxedUpgrade, muxerName: string): Opt[MuxerProvider] =
30+
if muxerName.len == 0 or muxerName == "na":
31+
return Opt.none(MuxerProvider)
2932
for m in self.muxers:
3033
if muxerName == m.codec:
31-
return m
34+
return Opt.some(m)
35+
Opt.none(MuxerProvider)
3236

33-
proc mux*(
37+
proc mux(
3438
self: MuxedUpgrade,
35-
conn: Connection): Future[Muxer] {.async.} =
39+
conn: Connection
40+
): Future[Opt[Muxer]] {.async: (raises: [
41+
CancelledError, LPStreamError, MultiStreamError]).} =
3642
## mux connection
37-
3843
trace "Muxing connection", conn
3944
if self.muxers.len == 0:
4045
warn "no muxers registered, skipping upgrade flow", conn
41-
return
42-
43-
let muxerName =
44-
if conn.dir == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec))
45-
else: await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec))
46-
47-
if muxerName.len == 0 or muxerName == "na":
48-
debug "no muxer available, early exit", conn
49-
return
46+
return Opt.none(Muxer)
47+
48+
let
49+
muxerName =
50+
case conn.dir
51+
of Direction.Out:
52+
await self.ms.select(conn, self.muxers.mapIt(it.codec))
53+
of Direction.In:
54+
await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec))
55+
muxerProvider = self.getMuxerByCodec(muxerName).valueOr:
56+
debug "no muxer available, early exit", conn, muxerName
57+
return Opt.none(Muxer)
5058

5159
trace "Found a muxer", conn, muxerName
5260

5361
# create new muxer for connection
54-
let muxer = self.getMuxerByCodec(muxerName).newMuxer(conn)
62+
let muxer = muxerProvider.newMuxer(conn)
5563

5664
# install stream handler
5765
muxer.streamHandler = self.streamHandler
5866
muxer.handler = muxer.handle()
59-
return muxer
67+
Opt.some(muxer)
6068

6169
method upgrade*(
6270
self: MuxedUpgrade,
6371
conn: Connection,
64-
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
72+
peerId: Opt[PeerId]
73+
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
6574
trace "Upgrading connection", conn, direction = conn.dir
6675

67-
let sconn = await self.secure(conn, peerId) # secure the connection
76+
let sconn = await self.secure(conn, peerId) # secure the connection
6877
if sconn == nil:
69-
raise newException(UpgradeFailedError,
78+
raise (ref UpgradeFailedError)(msg:
7079
"unable to secure connection, stopping upgrade")
7180

72-
let muxer = await self.mux(sconn) # mux it if possible
73-
if muxer == nil:
74-
raise newException(UpgradeFailedError,
81+
let muxer = (await self.mux(sconn)).valueOr: # mux it if possible
82+
raise (ref UpgradeFailedError)(msg:
7583
"a muxer is required for outgoing connections")
7684

7785
when defined(libp2p_agents_metrics):
7886
conn.shortAgent = muxer.connection.shortAgent
7987

8088
if sconn.closed():
8189
await sconn.close()
82-
raise newException(UpgradeFailedError,
90+
raise (ref UpgradeFailedError)(msg:
8391
"Connection closed or missing peer info, stopping upgrade")
8492

8593
trace "Upgraded connection", conn, sconn, direction = conn.dir
86-
return muxer
94+
muxer
8795

8896
proc new*(
8997
T: type MuxedUpgrade,
@@ -101,8 +109,6 @@ proc new*(
101109
await upgrader.ms.handle(conn) # handle incoming connection
102110
except CancelledError as exc:
103111
return
104-
except CatchableError as exc:
105-
trace "exception in stream handler", conn, msg = exc.msg
106112
finally:
107113
await conn.closeWithEOF()
108114
trace "Stream handler done", conn

libp2p/upgrademngrs/upgrade.nim

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -24,8 +24,10 @@ import ../stream/connection,
2424

2525
export connmanager, connection, identify, secure, multistream
2626

27-
declarePublicCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades")
28-
declarePublicCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
27+
declarePublicCounter(libp2p_failed_upgrades_incoming,
28+
"incoming connections failed upgrades")
29+
declarePublicCounter(libp2p_failed_upgrades_outgoing,
30+
"outgoing connections failed upgrades")
2931

3032
logScope:
3133
topics = "libp2p upgrade"
@@ -38,23 +40,28 @@ type
3840
secureManagers*: seq[Secure]
3941

4042
method upgrade*(
41-
self: Upgrade,
42-
conn: Connection,
43-
peerId: Opt[PeerId]): Future[Muxer] {.base.} =
44-
doAssert(false, "Not implemented!")
43+
self: Upgrade,
44+
conn: Connection,
45+
peerId: Opt[PeerId]
46+
): Future[Muxer] {.async: (raises: [
47+
CancelledError, LPError], raw: true), base.} =
48+
raiseAssert("Not implemented!")
4549

4650
proc secure*(
47-
self: Upgrade,
48-
conn: Connection,
49-
peerId: Opt[PeerId]): Future[Connection] {.async.} =
51+
self: Upgrade,
52+
conn: Connection,
53+
peerId: Opt[PeerId]
54+
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
5055
if self.secureManagers.len <= 0:
51-
raise newException(UpgradeFailedError, "No secure managers registered!")
56+
raise (ref UpgradeFailedError)(msg: "No secure managers registered!")
5257

5358
let codec =
54-
if conn.dir == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
55-
else: await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec))
59+
if conn.dir == Out:
60+
await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
61+
else:
62+
await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec))
5663
if codec.len == 0:
57-
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
64+
raise (ref UpgradeFailedError)(msg: "Unable to negotiate a secure channel!")
5865

5966
trace "Securing connection", conn, codec
6067
let secureProtocol = self.secureManagers.filterIt(it.codec == codec)
@@ -63,4 +70,4 @@ proc secure*(
6370
# let's avoid duplicating checks but detect if it fails to do it properly
6471
doAssert(secureProtocol.len > 0)
6572

66-
return await secureProtocol[0].secure(conn, peerId)
73+
await secureProtocol[0].secure(conn, peerId)

0 commit comments

Comments
 (0)