Skip to content

Commit

Permalink
make maxDurationInNonPriorityQueue configurable and none by default
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 13, 2024
1 parent 38c0af4 commit f7ae9fb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
4 changes: 3 additions & 1 deletion libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false
disconnectPeerAboveRateLimit: false,
maxDurationInNonPriorityQueue: Opt.none(Duration),
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -746,4 +747,5 @@ method getOrCreatePeer*(
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue
return peer
4 changes: 4 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool

# The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded
# as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration.
maxDurationInNonPriorityQueue*: Opt[Duration]

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]

Expand Down
20 changes: 11 additions & 9 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
maxDurationInNonPriorityQueue: Duration
maxDurationInNonPriorityQueue*: Opt[Duration]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
Expand All @@ -90,7 +90,7 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

rpcmessagequeue: RpcMessageQueue
rpcmessagequeue*: RpcMessageQueue

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
Expand Down Expand Up @@ -388,10 +388,11 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
continue
p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue):
if Moment.now() - ttlMsg.addedAt >= maxDurationInNonPriorityQueue:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
continue
await p.sendMsg(ttlMsg.msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
Expand All @@ -410,7 +411,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T =
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
Expand All @@ -424,7 +425,8 @@ proc new*(
onEvent: OnEvent,
codec: string,
maxMessageSize: int,
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket),
maxDurationInNonPriorityQueue = Opt.none(Duration)): T =

result = T(
getConn: getConn,
Expand All @@ -434,7 +436,7 @@ proc new*(
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(),
rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue),
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
Expand Down

0 comments on commit f7ae9fb

Please sign in to comment.