From 87958915726b88a4f0a27d3edd3b3a430e7f888e Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Wed, 19 May 2021 13:06:50 +1000 Subject: [PATCH] fix(vertx): reads should not wait for requested bytes when receiving a chunk of bytes socket should check requested as an upper bound rather than wait for it to be available --- .../transport/vertx/VariableSizeRecordParser.kt | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/com/malinskiy/adam/transport/vertx/VariableSizeRecordParser.kt b/src/main/kotlin/com/malinskiy/adam/transport/vertx/VariableSizeRecordParser.kt index 5f0b8ca28..9ea0d83c5 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/vertx/VariableSizeRecordParser.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/vertx/VariableSizeRecordParser.kt @@ -21,6 +21,7 @@ import io.vertx.core.Handler import io.vertx.core.buffer.Buffer import io.vertx.core.impl.Arguments import io.vertx.core.streams.ReadStream +import kotlin.math.min class VariableSizeRecordParser( private val stream: ReadStream? = null @@ -31,7 +32,7 @@ class VariableSizeRecordParser( private val bufferLock = Object() private var pos = 0 // Current position in buffer private var start = 0 // Position of beginning of current record - private var requested = 0 + private var requestedAtMost = 0 private var maxRecordSize = 0 private var demand = Long.MAX_VALUE private var eventHandler: Handler? = null @@ -63,7 +64,7 @@ class VariableSizeRecordParser( fun request(size: Int) { Arguments.require(size > 0, "Size must be > 0") - requested = size + requestedAtMost = size handleParsing() } @@ -83,7 +84,7 @@ class VariableSizeRecordParser( break } } - requested = 0 + requestedAtMost = 0 if (demand != Long.MAX_VALUE) { demand-- } @@ -117,11 +118,12 @@ class VariableSizeRecordParser( private fun parseFixed(): Int { val length = buff.length() val available = length - start - if (available >= requested && requested > 0) { - val end = start + requested + if (available > 0 && requestedAtMost > 0) { + val toSend = min(requestedAtMost, available) + val end = start + toSend pos = end return end - } else if (streamEnded && available > 0 && available < requested) { + } else if (streamEnded && available > 0 && available < requestedAtMost) { val end = start + length pos = end return end