diff --git a/src/handler.js b/src/handler.js index 19ce6da..57fe366 100644 --- a/src/handler.js +++ b/src/handler.js @@ -184,15 +184,20 @@ async function batchResponse ({ blocks, context, logger }) { try { let message = new Message() + logger.info({ keyList: context.canceled.keyList }, 'check keyList') for (let i = 0; i < blocks.length; i++) { const block = blocks[i] const canceledItem = context.canceled.get(block.key) + logger.info({ key: block.key, type: block.type, wantType: block.wantType }, 'check') + logger.info({ canceled: canceledItem }, 'canceled') + if (canceledItem === block.wantType) { const size = messageSize[block.type](block) telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type]) telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size) + logger.info({ key: block.key }, 'delete') context.canceled.delete(block.key) } else { const size = messageSize[block.type](block) diff --git a/src/service.js b/src/service.js index 3d7ef5c..81dba73 100644 --- a/src/service.js +++ b/src/service.js @@ -83,6 +83,7 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec } }) + const cancelsPerPeer = new Map() const handlerOptions = { maxInboundStreams: connectionConfig.handler.maxInboundStreams, maxOutboundStreams: connectionConfig.handler.maxOutboundStreams @@ -109,7 +110,7 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec service.handle(protocol, async ({ connection: dial, stream }) => { try { const connection = new Connection(stream) - const canceled = new LRU({ max: 200 }) + const canceled = cancelsPerPeer.get(dial.remotePeer.toString()) const hrTime = process.hrtime() const connectionId = hrTime[0] * 1000000000 + hrTime[1] @@ -161,20 +162,29 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec // TODO move to networking service.connectionManager.addEventListener('peer:connect', connection => { + cancelsPerPeer.set( + connection.detail.remotePeer.toString(), + new LRU({ max: 200 }) + ) + try { telemetry.increaseCount('bitswap-connections') telemetry.increaseGauge('bitswap-active-connections') } catch (err) { - logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer connecting') + logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer connecting') } }) // TODO move to networking service.connectionManager.addEventListener('peer:disconnect', connection => { + cancelsPerPeer.delete( + connection.detail.remotePeer.toString() + ) + try { telemetry.decreaseGauge('bitswap-active-connections') } catch (err) { - logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer disconnecting') + logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer disconnecting') } })