From 8426f87972c6e614d8e9260f8aee607869010f58 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 14 Jan 2021 23:35:21 +1100 Subject: [PATCH 1/7] fix(transport): send large streams via 64k chunks --- .../adam/extension/ByteReadChannel.kt | 23 +++++++++++++++++++ .../adam/request/misc/ExecInRequest.kt | 8 ++++++- .../pkg/StreamingPackageInstallRequest.kt | 8 ++++++- .../request/sync/base/BasePushFileRequest.kt | 8 +++++-- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt b/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt index f5fd2e97a..57682e450 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt @@ -68,6 +68,29 @@ suspend fun ByteReadChannel.copyTo(transformer: ResponseTransformer, buff return processed } +/** + * Copies up to limit bytes into transformer using buffer. If limit is null - copy until EOF + */ +suspend fun ByteReadChannel.copyTo(buffer: ByteArray, offset: Int, limit: Int): Int { + var processed = 0 + loop@ while (true) { + val toRead = (buffer.size - offset) - processed + val available = readAvailable(buffer, offset + processed, toRead) + when { + processed == limit -> break@loop + available < 0 && processed != 0 -> { + break@loop + } + available < 0 -> return available + available > 0 -> { + processed += available + } + else -> continue@loop + } + } + return processed +} + /** * TODO: rewrite * Assumes buffer hasArray == true diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt index 12c3312d0..e5e93a883 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt @@ -29,7 +29,13 @@ import io.ktor.utils.io.ByteReadChannel class ExecInRequest(private val cmd: String, private val channel: ByteReadChannel) : ComplexRequest() { override suspend fun readElement(readChannel: AndroidReadChannel, writeChannel: AndroidWriteChannel) { val buffer = ByteArray(Const.MAX_FILE_PACKET_LENGTH) - channel.copyTo(writeChannel, buffer) + while (true) { + val available = channel.copyTo(buffer, 0, buffer.size) + when { + available > 0 -> writeChannel.writeFully(buffer, 0, available) + else -> break + } + } //Have to poll readChannel.readStatus() } diff --git a/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt index e03746f7d..dafcf9354 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt @@ -115,7 +115,13 @@ class StreamingPackageInstallRequest( var fileChannel: ByteReadChannel? = null try { val fileChannel = pkg.readChannel(coroutineContext = coroutineContext) - fileChannel.copyTo(writeChannel, buffer) + while (true) { + val available = fileChannel.copyTo(buffer, 0, buffer.size) + when { + available > 0 -> writeChannel.writeFully(buffer, 0, available) + else -> break + } + } } finally { fileChannel?.cancel() } diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt index 21170ed98..eb576c963 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt @@ -18,6 +18,7 @@ package com.malinskiy.adam.request.sync.base import com.malinskiy.adam.Const import com.malinskiy.adam.exception.PushFailedException +import com.malinskiy.adam.extension.copyTo import com.malinskiy.adam.extension.toByteArray import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.ValidationResponse @@ -44,7 +45,7 @@ abstract class BasePushFileRequest( get() = mode.toInt(8) and "0777".toInt(8) override suspend fun readElement(readChannel: AndroidReadChannel, writeChannel: AndroidWriteChannel): Double? { - val available = fileReadChannel.readAvailable(buffer, 8, Const.MAX_FILE_PACKET_LENGTH) + val available = fileReadChannel.copyTo(buffer, 8, Const.MAX_FILE_PACKET_LENGTH) return when { available < 0 -> { Const.Message.DONE.copyInto(buffer) @@ -63,11 +64,14 @@ abstract class BasePushFileRequest( available > 0 -> { Const.Message.DATA.copyInto(buffer) available.toByteArray().reversedArray().copyInto(buffer, destinationOffset = 4) + /** + * USB devices are very picky about the size of the DATA buffer. Using the adb's default + */ writeChannel.writeFully(buffer, 0, available + 8) currentPosition += available currentPosition.toDouble() / totalBytes } - else -> currentPosition.toDouble() / totalBytes + else -> null } } From 81a3e65f1b4c951bc1f22f52ffbd550867d4f99b Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 00:24:49 +1100 Subject: [PATCH 2/7] feat(transport): optimize buffer pooling + yield when possible Refrain from using ByteArray constructors ByteBuffer.allocate Also throw exceptions for in NioSocket when makes sense --- src/main/kotlin/com/malinskiy/adam/Const.kt | 1 - .../com/malinskiy/adam/extension/Socket.kt | 74 ++++++--- .../adam/request/SynchronousRequest.kt | 34 ++-- .../device/AsyncDeviceMonitorRequest.kt | 12 +- .../device/FetchDeviceFeaturesRequest.kt | 12 +- .../adam/request/device/ListDevicesRequest.kt | 12 +- .../forwarding/ListPortForwardsRequest.kt | 12 +- .../framebuffer/ScreenCaptureRequest.kt | 147 +++++++++--------- .../request/mdns/ListMdnsServicesRequest.kt | 11 +- .../adam/request/mdns/MdnsCheckRequest.kt | 11 +- .../adam/request/misc/ConnectDeviceRequest.kt | 13 +- .../request/misc/DisconnectDeviceRequest.kt | 13 +- .../adam/request/misc/ExecInRequest.kt | 11 +- .../request/misc/FetchHostFeaturesRequest.kt | 11 +- .../adam/request/misc/PairDeviceRequest.kt | 13 +- .../adam/request/misc/ReconnectRequest.kt | 33 ++-- .../adam/request/pkg/LegacySideloadRequest.kt | 22 +-- .../adam/request/pkg/SideloadRequest.kt | 3 +- .../pkg/StreamingPackageInstallRequest.kt | 24 +-- .../reverse/ListReversePortForwardsRequest.kt | 12 +- .../shell/v1/ChanneledShellCommandRequest.kt | 16 +- .../shell/v2/ChanneledShellCommandRequest.kt | 11 +- .../shell/v2/SyncShellCommandRequest.kt | 43 ++--- .../request/sync/base/BasePullFileRequest.kt | 65 ++++---- .../request/sync/base/BasePushFileRequest.kt | 49 +++--- .../adam/request/sync/v1/ListFileRequest.kt | 48 +++--- .../adam/request/sync/v1/PushFileRequest.kt | 18 +-- .../adam/request/sync/v1/StatFileRequest.kt | 22 ++- .../adam/request/sync/v2/ListFileRequest.kt | 72 +++++---- .../adam/request/sync/v2/StatFileRequest.kt | 35 +++-- .../request/testrunner/TestRunnerRequest.kt | 28 ++-- .../malinskiy/adam/transport/BufferFactory.kt | 23 ++- .../com/malinskiy/adam/transport/NioSocket.kt | 44 ++++-- .../async/AsyncDeviceMonitorRequestTest.kt | 4 +- .../forwarding/ListPortForwardsRequestTest.kt | 1 + .../malinskiy/adam/transport/NioSocketTest.kt | 32 ++++ 36 files changed, 536 insertions(+), 456 deletions(-) create mode 100644 src/test/kotlin/com/malinskiy/adam/transport/NioSocketTest.kt diff --git a/src/main/kotlin/com/malinskiy/adam/Const.kt b/src/main/kotlin/com/malinskiy/adam/Const.kt index ec53f2cfe..c92d4da05 100644 --- a/src/main/kotlin/com/malinskiy/adam/Const.kt +++ b/src/main/kotlin/com/malinskiy/adam/Const.kt @@ -26,7 +26,6 @@ object Const { const val SERVER_PORT_ENV_VAR = "ANDROID_ADB_SERVER_PORT" const val MAX_PACKET_LENGTH = 16384 const val MAX_FILE_PACKET_LENGTH = 64 * 1024 - const val KTOR_INTERNAL_BUFFER_LENGTH = 4088 const val MAX_PROTOBUF_LOGCAT_LENGTH = 10_000 const val MAX_PROTOBUF_PACKET_LENGTH = 10 * 1024 * 1024L //10Mb diff --git a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt index 6c0b88e9c..afec31482 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt @@ -17,6 +17,7 @@ package com.malinskiy.adam.extension import com.malinskiy.adam.Const +import com.malinskiy.adam.exception.RequestRejectedException import com.malinskiy.adam.request.transform.ResponseTransformer import com.malinskiy.adam.request.transform.StringResponseTransformer import com.malinskiy.adam.transport.Socket @@ -26,6 +27,7 @@ import io.ktor.util.cio.* import io.ktor.utils.io.* import io.ktor.utils.io.bits.* import io.ktor.utils.io.core.* +import kotlinx.coroutines.yield import java.io.File import java.nio.ByteBuffer import kotlin.coroutines.CoroutineContext @@ -43,7 +45,10 @@ suspend fun Socket.copyTo(channel: ByteWriteChannel, buffer: ByteArray): Long { channel.writeFully(buffer, 0, available) processed += available } - else -> continue@loop + else -> { + yield() + continue@loop + } } } return processed @@ -73,7 +78,10 @@ suspend fun Socket.copyTo(transformer: ResponseTransformer, buffer: ByteA transformer.process(buffer, 0, available) processed += available } - else -> continue@loop + else -> { + yield() + continue@loop + } } } return processed @@ -98,23 +106,47 @@ suspend fun Socket.readOptionalProtocolString(): String? { return if (errorMessageLength == null) { readStatus() } else { - val errorBytes = ByteArray(errorMessageLength) - readFully(errorBytes, 0, errorMessageLength) - String(errorBytes, Const.DEFAULT_TRANSPORT_ENCODING) + withDefaultBuffer { + val transformer = StringResponseTransformer() + this@readOptionalProtocolString.copyTo(transformer, this, limit = errorMessageLength.toLong()) + transformer.transform() + } } } -suspend fun Socket.read(): TransportResponse { - val bytes = ByteArray(4) - readFully(bytes, 0, 4) +/** + * @throws RequestRejectedException + */ +suspend fun Socket.readProtocolString(): String { + withDefaultBuffer { + val transformer = StringResponseTransformer() + val copied = copyTo(transformer, this, limit = 4L) + println(copied) + val length = transformer.transform() + if (length.length != 4) { + throw RequestRejectedException("Unexpected string length: $length") + } + val messageLength = length.toIntOrNull(16) ?: throw RequestRejectedException("Unexpected string length: $length") + + clear() + compatLimit(messageLength) + val read = readFully(this) + if (read != messageLength) throw RequestRejectedException("Incomplete string received") + return String(array(), 0, read, Const.DEFAULT_TRANSPORT_ENCODING) + } +} - val ok = bytes.isOkay() +suspend fun Socket.read(): TransportResponse { + val ok = withDefaultBuffer { + compatLimit(4) + readFully(this) + array().isOkay() + } val message = if (!ok) { readOptionalProtocolString() } else { null } - return TransportResponse(ok, message) } @@ -146,12 +178,13 @@ suspend fun Socket.writeSyncRequest(type: ByteArray, remotePath: String) { val path = remotePath.toByteArray(Const.DEFAULT_TRANSPORT_ENCODING) val size = path.size.toByteArray().reversedArray() - val cmd = ByteArray(8 + path.size) - - type.copyInto(cmd) - size.copyInto(cmd, 4) - path.copyInto(cmd, 8) - write(cmd) + withDefaultBuffer { + put(type) + put(size) + put(path) + flip() + writeFully(this) + } } suspend fun Socket.writeSyncV2Request(type: ByteArray, remotePath: String, flags: Int, mode: Int? = null) { @@ -177,10 +210,11 @@ suspend fun Socket.writeSyncV2Request(type: ByteArray, remotePath: String, flags } suspend fun Socket.readTransportResponse(): TransportResponse { - val bytes = ByteArray(4) - readFully(bytes, 0, 4) - - val ok = bytes.isOkay() + val ok = withDefaultBuffer { + compatLimit(4) + readFully(this) + array().isOkay() + } val message = if (!ok) { readOptionalProtocolString() } else { diff --git a/src/main/kotlin/com/malinskiy/adam/request/SynchronousRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/SynchronousRequest.kt index 76775e3a4..fdda7da07 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/SynchronousRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/SynchronousRequest.kt @@ -16,27 +16,31 @@ package com.malinskiy.adam.request -import com.malinskiy.adam.Const import com.malinskiy.adam.request.transform.ResponseTransformer import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxPacketBuffer +import kotlinx.coroutines.yield abstract class SynchronousRequest(target: Target = NonSpecifiedTarget) : ComplexRequest(target), ResponseTransformer { override suspend fun readElement(socket: Socket): T { - val data = ByteArray(Const.MAX_PACKET_LENGTH) - loop@ do { - if (socket.isClosedForWrite || socket.isClosedForRead) break@loop + withMaxPacketBuffer { + loop@ do { + if (socket.isClosedForWrite || socket.isClosedForRead) break@loop - val count = socket.readAvailable(data, 0, Const.MAX_PACKET_LENGTH) - when { - count == 0 -> { - continue@loop + val data = array() + val count = socket.readAvailable(data, 0, data.size) + when { + count == 0 -> { + yield() + continue@loop + } + count > 0 -> { + process(data, 0, count) + } } - count > 0 -> { - process(data, 0, count) - } - } - } while (count >= 0) + } while (count >= 0) - return transform() + return transform() + } } -} \ No newline at end of file +} diff --git a/src/main/kotlin/com/malinskiy/adam/request/device/AsyncDeviceMonitorRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/device/AsyncDeviceMonitorRequest.kt index 21660d5e4..2e64d5d02 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/device/AsyncDeviceMonitorRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/device/AsyncDeviceMonitorRequest.kt @@ -16,23 +16,15 @@ package com.malinskiy.adam.request.device -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket import kotlinx.coroutines.channels.SendChannel -import java.nio.ByteBuffer class AsyncDeviceMonitorRequest : AsyncChannelRequest, Unit>(target = HostTarget) { override suspend fun readElement(socket: Socket, sendChannel: SendChannel>): Boolean { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - val payload = String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - sendChannel.send(payload.lines() + sendChannel.send(socket.readProtocolString().lines() .filter { it.isNotEmpty() } .map { val line = it.trim() diff --git a/src/main/kotlin/com/malinskiy/adam/request/device/FetchDeviceFeaturesRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/device/FetchDeviceFeaturesRequest.kt index 13a0d12d0..57fdb8a1d 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/device/FetchDeviceFeaturesRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/device/FetchDeviceFeaturesRequest.kt @@ -16,25 +16,17 @@ package com.malinskiy.adam.request.device -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.Feature import com.malinskiy.adam.request.SerialTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer - class FetchDeviceFeaturesRequest(serial: String) : ComplexRequest>(target = SerialTarget(serial)) { override fun serialize() = createBaseRequest("features") override suspend fun readElement(socket: Socket): List { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - return String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).split(',').mapNotNull { Feature.of(it) } + return socket.readProtocolString().split(',').mapNotNull { Feature.of(it) } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/device/ListDevicesRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/device/ListDevicesRequest.kt index 3a33f2929..fedbfb9fc 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/device/ListDevicesRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/device/ListDevicesRequest.kt @@ -16,24 +16,16 @@ package com.malinskiy.adam.request.device -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer class ListDevicesRequest : ComplexRequest>(target = HostTarget) { override fun serialize() = createBaseRequest("devices") override suspend fun readElement(socket: Socket): List { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - val payload = String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - return payload.lines() + return socket.readProtocolString().lines() .filter { it.isNotEmpty() } .map { val line = it.trim() diff --git a/src/main/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequest.kt index e6bb64a1d..ace551099 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequest.kt @@ -16,22 +16,14 @@ package com.malinskiy.adam.request.forwarding -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.SerialTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer class ListPortForwardsRequest(serial: String) : ComplexRequest>(target = SerialTarget(serial)) { override suspend fun readElement(socket: Socket): List { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - val payload = String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - return payload.lines().mapNotNull { line -> + return socket.readProtocolString().lines().mapNotNull { line -> if (line.isNotEmpty()) { val split = line.split(" ") PortForwardingRule( diff --git a/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt index fb90cf66a..cc0b356ab 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt @@ -17,87 +17,90 @@ package com.malinskiy.adam.request.framebuffer import com.malinskiy.adam.exception.UnsupportedImageProtocolException +import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.compatRewind import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer +import com.malinskiy.adam.transport.withDefaultBuffer import java.nio.ByteOrder class ScreenCaptureRequest(private val adapter: ScreenCaptureAdapter) : ComplexRequest() { override suspend fun readElement(socket: Socket): T { - val protocolBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(protocolBuffer) - protocolBuffer.compatRewind() + withDefaultBuffer { + compatLimit(4) + socket.readFully(this) + compatRewind() - val protocolVersion = protocolBuffer.order(ByteOrder.LITTLE_ENDIAN).int - val headerSize = when (protocolVersion) { - 1 -> 12 // bpp, size, width, height, 4*(length, offset) - 2 -> 13 // bpp, colorSpace, size, width, height, 4*(length, offset) - 16 -> 3 // compatibility mode: size, width, height. used previously to denote framebuffer depth - /** - * See https://android.googlesource.com/platform/packages/modules/adb/+/refs/heads/master/daemon/framebuffer_service.cpp#42 - * for a possible new value for DDMS_RAWIMAGE_VERSION - */ - else -> throw UnsupportedImageProtocolException(protocolVersion) - } - val headerBuffer = ByteBuffer.allocate(headerSize * 4) - socket.readFully(headerBuffer) - headerBuffer.compatRewind() - socket.writeFully(ByteArray(1) { 0.toByte() }, 0, 1) + val protocolVersion = order(ByteOrder.LITTLE_ENDIAN).int + val headerSize = when (protocolVersion) { + 1 -> 12 // bpp, size, width, height, 4*(length, offset) + 2 -> 13 // bpp, colorSpace, size, width, height, 4*(length, offset) + 16 -> 3 // compatibility mode: size, width, height. used previously to denote framebuffer depth + /** + * See https://android.googlesource.com/platform/packages/modules/adb/+/refs/heads/master/daemon/framebuffer_service.cpp#42 + * for a possible new value for DDMS_RAWIMAGE_VERSION + */ + else -> throw UnsupportedImageProtocolException(protocolVersion) + } + clear() + compatLimit(headerSize * 4) + socket.readFully(this) + socket.writeFully(ByteArray(1) { 0.toByte() }, 0, 1) - headerBuffer.order(ByteOrder.LITTLE_ENDIAN) - headerBuffer.compatRewind() - return when (protocolVersion) { - 16 -> adapter.process( - version = protocolVersion, - bitsPerPixel = 16, - size = headerBuffer.int, - width = headerBuffer.int, - height = headerBuffer.int, - redOffset = 11, - redLength = 5, - greenOffset = 5, - greenLength = 6, - blueOffset = 0, - blueLength = 5, - alphaOffset = 0, - alphaLength = 0, - socket = socket - ) - 1 -> adapter.process( - version = protocolVersion, - bitsPerPixel = headerBuffer.int, - size = headerBuffer.int, - width = headerBuffer.int, - height = headerBuffer.int, - redOffset = headerBuffer.int, - redLength = headerBuffer.int, - blueOffset = headerBuffer.int, - blueLength = headerBuffer.int, - greenOffset = headerBuffer.int, - greenLength = headerBuffer.int, - alphaOffset = headerBuffer.int, - alphaLength = headerBuffer.int, - socket = socket - ) - 2 -> adapter.process( - version = protocolVersion, - bitsPerPixel = headerBuffer.int, - colorSpace = ColorSpace.from(headerBuffer.int), - size = headerBuffer.int, - width = headerBuffer.int, - height = headerBuffer.int, - redOffset = headerBuffer.int, - redLength = headerBuffer.int, - blueOffset = headerBuffer.int, - blueLength = headerBuffer.int, - greenOffset = headerBuffer.int, - greenLength = headerBuffer.int, - alphaOffset = headerBuffer.int, - alphaLength = headerBuffer.int, - socket = socket - ) - else -> throw UnsupportedImageProtocolException(protocolVersion) + order(ByteOrder.LITTLE_ENDIAN) + flip() + return when (protocolVersion) { + 16 -> adapter.process( + version = protocolVersion, + bitsPerPixel = 16, + size = int, + width = int, + height = int, + redOffset = 11, + redLength = 5, + greenOffset = 5, + greenLength = 6, + blueOffset = 0, + blueLength = 5, + alphaOffset = 0, + alphaLength = 0, + socket = socket + ) + 1 -> adapter.process( + version = protocolVersion, + bitsPerPixel = int, + size = int, + width = int, + height = int, + redOffset = int, + redLength = int, + blueOffset = int, + blueLength = int, + greenOffset = int, + greenLength = int, + alphaOffset = int, + alphaLength = int, + socket = socket + ) + 2 -> adapter.process( + version = protocolVersion, + bitsPerPixel = int, + colorSpace = ColorSpace.from(int), + size = int, + width = int, + height = int, + redOffset = int, + redLength = int, + blueOffset = int, + blueLength = int, + greenOffset = int, + greenLength = int, + alphaOffset = int, + alphaLength = int, + socket = socket + ) + else -> throw UnsupportedImageProtocolException(protocolVersion) + } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/mdns/ListMdnsServicesRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/mdns/ListMdnsServicesRequest.kt index c53eff455..287090be3 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/mdns/ListMdnsServicesRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/mdns/ListMdnsServicesRequest.kt @@ -16,22 +16,15 @@ package com.malinskiy.adam.request.mdns -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer class ListMdnsServicesRequest : ComplexRequest>(target = HostTarget) { override suspend fun readElement(socket: Socket): List { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - return String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - .lines() + return socket.readProtocolString().lines() .filterNot { it.isEmpty() } .map { val split = it.split(' ', '\t') diff --git a/src/main/kotlin/com/malinskiy/adam/request/mdns/MdnsCheckRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/mdns/MdnsCheckRequest.kt index 31a93b231..cb479f710 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/mdns/MdnsCheckRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/mdns/MdnsCheckRequest.kt @@ -16,24 +16,17 @@ package com.malinskiy.adam.request.mdns -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer /** * check if mdns discovery is available */ class MdnsCheckRequest : ComplexRequest(target = HostTarget) { override suspend fun readElement(socket: Socket): MdnsStatus { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - val string = String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) + val string = socket.readProtocolString() return if (string.contains("mdns daemon unavailable")) { MdnsStatus(false) } else { diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ConnectDeviceRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ConnectDeviceRequest.kt index 5a6050ab7..026219b4e 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ConnectDeviceRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ConnectDeviceRequest.kt @@ -16,11 +16,10 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer /** * Connects a remote device @@ -32,13 +31,5 @@ class ConnectDeviceRequest( override fun serialize() = createBaseRequest("connect:$host:$port") - override suspend fun readElement(socket: Socket): String { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - return String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - } + override suspend fun readElement(socket: Socket) = socket.readProtocolString() } diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/DisconnectDeviceRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/DisconnectDeviceRequest.kt index c3348101f..8f04fbb6a 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/DisconnectDeviceRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/DisconnectDeviceRequest.kt @@ -16,11 +16,10 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer /** * Disconnects a device previously connected using ConnectDeviceRequest @@ -42,13 +41,5 @@ class DisconnectDeviceRequest( }" ) - override suspend fun readElement(socket: Socket): String { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - return String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - } + override suspend fun readElement(socket: Socket) = socket.readProtocolString() } diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt index fad9c0802..5459d0a32 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt @@ -16,11 +16,11 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const import com.malinskiy.adam.extension.copyTo import com.malinskiy.adam.extension.readStatus import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import io.ktor.utils.io.* /** @@ -28,10 +28,11 @@ import io.ktor.utils.io.* */ class ExecInRequest(private val cmd: String, private val channel: ByteReadChannel) : ComplexRequest() { override suspend fun readElement(socket: Socket) { - val buffer = ByteArray(Const.MAX_FILE_PACKET_LENGTH) - channel.copyTo(socket, buffer) - //Have to poll - socket.readStatus() + withMaxFilePacketBuffer { + channel.copyTo(socket, this) + //Have to poll + socket.readStatus() + } } override fun serialize() = createBaseRequest("exec:$cmd") diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/FetchHostFeaturesRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/FetchHostFeaturesRequest.kt index bebb2f9be..b91a614de 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/FetchHostFeaturesRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/FetchHostFeaturesRequest.kt @@ -16,24 +16,17 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.Feature import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer class FetchHostFeaturesRequest : ComplexRequest>(target = HostTarget) { override fun serialize() = createBaseRequest("host-features") override suspend fun readElement(socket: Socket): List { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - return String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).split(',').mapNotNull { Feature.of(it) } + return socket.readProtocolString().split(',').mapNotNull { Feature.of(it) } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/PairDeviceRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/PairDeviceRequest.kt index 51d4c7187..40b23e9d5 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/PairDeviceRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/PairDeviceRequest.kt @@ -16,11 +16,10 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.HostTarget import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer /** * Pairs adb server with device over WiFi connection @@ -33,15 +32,7 @@ class PairDeviceRequest( private val pairingCode: String ) : ComplexRequest(target = HostTarget) { - override suspend fun readElement(socket: Socket): String { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - return String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - } + override suspend fun readElement(socket: Socket) = socket.readProtocolString() override fun serialize() = createBaseRequest("pair:$pairingCode:$url") } \ No newline at end of file diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt index 20208aae9..67a127410 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt @@ -17,12 +17,12 @@ package com.malinskiy.adam.request.misc import com.malinskiy.adam.Const -import com.malinskiy.adam.extension.compatRewind +import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.NonSpecifiedTarget import com.malinskiy.adam.request.Target import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer +import com.malinskiy.adam.transport.withDefaultBuffer /** * This request is quite tricky to use since the target of the request varies with the reconnection target @@ -37,21 +37,22 @@ class ReconnectRequest( private val reconnectTarget: ReconnectTarget? = null, target: Target = NonSpecifiedTarget ) : ComplexRequest(target = target) { - private val buffer = ByteBuffer.allocate(4) - override suspend fun readElement(socket: Socket): String { - - socket.readFully(buffer) - val array = buffer.array() - return if (array.contentEquals(done)) { - "done" - } else { - //This is length of a response string - val size = String(array, Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - payloadBuffer.compatRewind() - String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) + withDefaultBuffer { + compatLimit(4) + socket.readFully(this) + flip() + return if (array().contentEquals(done)) { + "done" + } else { + //This is length of a response string + val size = String(array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) + clear() + compatLimit(size) + socket.readFully(this) + flip() + String(array(), Const.DEFAULT_TRANSPORT_ENCODING) + } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/pkg/LegacySideloadRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/pkg/LegacySideloadRequest.kt index 304ac0a6e..53b0f4bf5 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/pkg/LegacySideloadRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/pkg/LegacySideloadRequest.kt @@ -16,12 +16,12 @@ package com.malinskiy.adam.request.pkg -import com.malinskiy.adam.Const import com.malinskiy.adam.extension.copyTo import com.malinskiy.adam.extension.readTransportResponse import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import io.ktor.util.cio.* import io.ktor.utils.io.* import kotlinx.coroutines.Dispatchers @@ -51,15 +51,17 @@ class LegacySideloadRequest( override fun serialize() = createBaseRequest("sideload:${pkg.length()}") override suspend fun readElement(socket: Socket): Boolean { - val buffer = ByteArray(Const.MAX_FILE_PACKET_LENGTH) - var fileChannel: ByteReadChannel? = null - try { - val fileChannel = pkg.readChannel(coroutineContext = coroutineContext) - fileChannel.copyTo(socket, buffer) - } finally { - fileChannel?.cancel() - } + withMaxFilePacketBuffer { + var fileChannel: ByteReadChannel? = null + try { + val fileChannel = pkg.readChannel(coroutineContext = coroutineContext) + fileChannel.copyTo(socket, this) + } finally { + fileChannel?.cancel() + } - return socket.readTransportResponse().okay + return socket.readTransportResponse().okay + + } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/pkg/SideloadRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/pkg/SideloadRequest.kt index 6138a1647..8467e3b12 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/pkg/SideloadRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/pkg/SideloadRequest.kt @@ -28,8 +28,9 @@ class SideloadRequest( private val pkg: File, private var blockSize: Int = Const.MAX_FILE_PACKET_LENGTH ) : ComplexRequest() { + val buffer = ByteArray(blockSize) + override suspend fun readElement(socket: Socket): Boolean { - val buffer = ByteArray(blockSize) var pkgChannel: ByteReadChannel? = null try { pkgChannel = pkg.readChannel() diff --git a/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt index 5903e2b01..a7487fb27 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt @@ -16,7 +16,6 @@ package com.malinskiy.adam.request.pkg -import com.malinskiy.adam.Const import com.malinskiy.adam.annotation.Features import com.malinskiy.adam.extension.bashEscape import com.malinskiy.adam.extension.copyTo @@ -26,6 +25,7 @@ import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.abb.AbbExecRequest import com.malinskiy.adam.request.transform.StringResponseTransformer import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import io.ktor.util.cio.* import io.ktor.utils.io.* import kotlinx.coroutines.Dispatchers @@ -110,17 +110,19 @@ class StreamingPackageInstallRequest( } override suspend fun readElement(socket: Socket): Boolean { - val buffer = ByteArray(Const.MAX_FILE_PACKET_LENGTH) - var fileChannel: ByteReadChannel? = null - try { - val fileChannel = pkg.readChannel(coroutineContext = coroutineContext) - fileChannel.copyTo(socket, buffer) - } finally { - fileChannel?.cancel() - } + withMaxFilePacketBuffer { + var fileChannel: ByteReadChannel? = null + try { + val fileChannel = pkg.readChannel(coroutineContext = coroutineContext) + fileChannel.copyTo(socket, this) + } finally { + fileChannel?.cancel() + } - socket.copyTo(transformer, buffer) - return transformer.transform().startsWith("Success") + clear() + socket.copyTo(transformer, this) + return transformer.transform().startsWith("Success") + } } companion object { diff --git a/src/main/kotlin/com/malinskiy/adam/request/reverse/ListReversePortForwardsRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/reverse/ListReversePortForwardsRequest.kt index 92c388810..e1f78cc2d 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/reverse/ListReversePortForwardsRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/reverse/ListReversePortForwardsRequest.kt @@ -16,27 +16,19 @@ package com.malinskiy.adam.request.reverse -import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.readProtocolString import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.NonSpecifiedTarget import com.malinskiy.adam.request.forwarding.LocalPortSpec import com.malinskiy.adam.request.forwarding.RemotePortSpec import com.malinskiy.adam.transport.Socket -import java.nio.ByteBuffer /** * Doesn't work with SerialTarget, have to use the serial as a parameter for the execute method */ class ListReversePortForwardsRequest : ComplexRequest>(target = NonSpecifiedTarget) { override suspend fun readElement(socket: Socket): List { - val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) - socket.readFully(sizeBuffer) - val size = String(sizeBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - - val payloadBuffer = ByteBuffer.allocate(size) - socket.readFully(payloadBuffer) - val payload = String(payloadBuffer.array(), Const.DEFAULT_TRANSPORT_ENCODING) - return payload.lines().mapNotNull { line -> + return socket.readProtocolString().lines().mapNotNull { line -> if (line.isNotEmpty()) { val split = line.split(" ") ReversePortForwardingRule( diff --git a/src/main/kotlin/com/malinskiy/adam/request/shell/v1/ChanneledShellCommandRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/shell/v1/ChanneledShellCommandRequest.kt index ec21102db..11deb5e40 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/shell/v1/ChanneledShellCommandRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/shell/v1/ChanneledShellCommandRequest.kt @@ -21,6 +21,7 @@ import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.NonSpecifiedTarget import com.malinskiy.adam.request.Target import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import kotlinx.coroutines.channels.SendChannel open class ChanneledShellCommandRequest( @@ -28,15 +29,16 @@ open class ChanneledShellCommandRequest( target: Target = NonSpecifiedTarget ) : AsyncChannelRequest(target = target) { - val data = ByteArray(Const.MAX_PACKET_LENGTH) - override suspend fun readElement(socket: Socket, sendChannel: SendChannel): Boolean { - val count = socket.readAvailable(data, 0, Const.MAX_PACKET_LENGTH) - when { - count > 0 -> sendChannel.send(String(data, 0, count, Const.DEFAULT_TRANSPORT_ENCODING)) - else -> Unit + withMaxFilePacketBuffer { + val data = array() + val count = socket.readAvailable(data, 0, data.size) + when { + count > 0 -> sendChannel.send(String(data, 0, count, Const.DEFAULT_TRANSPORT_ENCODING)) + else -> Unit + } + return false } - return false } override fun serialize() = createBaseRequest("shell:$cmd") diff --git a/src/main/kotlin/com/malinskiy/adam/request/shell/v2/ChanneledShellCommandRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/shell/v2/ChanneledShellCommandRequest.kt index 60d7f653d..5299fbeb9 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/shell/v2/ChanneledShellCommandRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/shell/v2/ChanneledShellCommandRequest.kt @@ -21,7 +21,7 @@ import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.NonSpecifiedTarget import com.malinskiy.adam.request.Target import com.malinskiy.adam.transport.Socket -import com.malinskiy.adam.transport.withDefaultBuffer +import com.malinskiy.adam.transport.withMaxPacketBuffer import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.SendChannel @@ -31,17 +31,16 @@ open class ChanneledShellCommandRequest( target: Target = NonSpecifiedTarget ) : AsyncChannelRequest(target = target, channel = channel) { - val data = ByteArray(Const.MAX_PACKET_LENGTH) - override suspend fun readElement(socket: Socket, sendChannel: SendChannel): Boolean { - withDefaultBuffer { - val readAvailable = socket.readAvailable(this.array(), 0, 1) + withMaxPacketBuffer { + val data = array() + val readAvailable = socket.readAvailable(data, 0, 1) when (readAvailable) { //Skip as if nothing is happening 0, -1 -> return false } - val readByte = this.get(0) + val readByte = data[0] when (MessageType.of(readByte.toInt())) { MessageType.STDOUT -> { val length = socket.readIntLittleEndian() diff --git a/src/main/kotlin/com/malinskiy/adam/request/shell/v2/SyncShellCommandRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/shell/v2/SyncShellCommandRequest.kt index 7897a178a..044df0e5f 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/shell/v2/SyncShellCommandRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/shell/v2/SyncShellCommandRequest.kt @@ -22,6 +22,7 @@ import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.NonSpecifiedTarget import com.malinskiy.adam.request.Target import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxPacketBuffer import io.ktor.utils.io.* /** @@ -30,7 +31,6 @@ import io.ktor.utils.io.* abstract class SyncShellCommandRequest(val cmd: String, target: Target = NonSpecifiedTarget) : ComplexRequest(target) { - private val data = ByteArray(Const.MAX_PACKET_LENGTH) private val stdoutBuilder = StringBuilder() private val stderrBuilder = StringBuilder() private var exitCode: Int = -1 @@ -42,25 +42,28 @@ abstract class SyncShellCommandRequest(val cmd: String, target: Target override fun serialize() = createBaseRequest("shell,v2,raw:$cmd") override suspend fun readElement(socket: Socket): T { - loop@ while (true) { - when (val messageType = MessageType.of(socket.readByte().toInt())) { - MessageType.STDOUT -> { - val length = socket.readIntLittleEndian() - socket.readFully(data, 0, length) - stdoutBuilder.append(String(data, 0, length, Const.DEFAULT_TRANSPORT_ENCODING)) - } - MessageType.STDERR -> { - val length = socket.readIntLittleEndian() - socket.readFully(data, 0, length) - stderrBuilder.append(String(data, 0, length, Const.DEFAULT_TRANSPORT_ENCODING)) - } - MessageType.EXIT -> { - val length = socket.readIntLittleEndian() - exitCode = socket.readByte().toInt() - break@loop - } - MessageType.STDIN, MessageType.CLOSE_STDIN, MessageType.WINDOW_SIZE_CHANGE, MessageType.INVALID -> { - throw RequestValidationException("Unsupported message $messageType") + withMaxPacketBuffer { + val data = array() + loop@ while (true) { + when (val messageType = MessageType.of(socket.readByte().toInt())) { + MessageType.STDOUT -> { + val length = socket.readIntLittleEndian() + socket.readFully(data, 0, length) + stdoutBuilder.append(String(data, 0, length, Const.DEFAULT_TRANSPORT_ENCODING)) + } + MessageType.STDERR -> { + val length = socket.readIntLittleEndian() + socket.readFully(data, 0, length) + stderrBuilder.append(String(data, 0, length, Const.DEFAULT_TRANSPORT_ENCODING)) + } + MessageType.EXIT -> { + val length = socket.readIntLittleEndian() + exitCode = socket.readByte().toInt() + break@loop + } + MessageType.STDIN, MessageType.CLOSE_STDIN, MessageType.WINDOW_SIZE_CHANGE, MessageType.INVALID -> { + throw RequestValidationException("Unsupported message $messageType") + } } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt index f6c9fcac3..960eb9689 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt @@ -19,11 +19,13 @@ package com.malinskiy.adam.request.sync.base import com.malinskiy.adam.Const import com.malinskiy.adam.exception.PullFailedException import com.malinskiy.adam.exception.UnsupportedSyncProtocolException +import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.toInt import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.sync.v1.StatFileRequest import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import io.ktor.util.cio.* import io.ktor.utils.io.* import kotlinx.coroutines.Dispatchers @@ -58,41 +60,48 @@ abstract class BasePullFileRequest( totalBytes = size ?: StatFileRequest(remotePath).readElement(socket).size.toLong() } - private val headerBuffer = ByteArray(8) - private val dataBuffer = ByteArray(Const.MAX_FILE_PACKET_LENGTH) - override suspend fun readElement(socket: Socket, sendChannel: SendChannel): Boolean { - socket.readFully(headerBuffer, 0, 8) + withMaxFilePacketBuffer { + val data = array() + socket.readFully(data, 0, 8) - val header = headerBuffer.copyOfRange(0, 4) - when { - header.contentEquals(Const.Message.DONE) -> { - fileWriteChannel.close() - return true - } - header.contentEquals(Const.Message.DATA) -> { - val available = headerBuffer.copyOfRange(4, 8).toInt() - if (available > Const.MAX_FILE_PACKET_LENGTH) { - throw UnsupportedSyncProtocolException() + val header = data.copyOfRange(0, 4) + when { + header.contentEquals(Const.Message.DONE) -> { + fileWriteChannel.close() + return true } - socket.readFully(dataBuffer, 0, available) - fileWriteChannel.writeFully(dataBuffer, 0, available) + header.contentEquals(Const.Message.DATA) -> { + val available = data.copyOfRange(4, 8).toInt() + if (available > Const.MAX_FILE_PACKET_LENGTH) { + throw UnsupportedSyncProtocolException() + } + clear() + compatLimit(available) + socket.readFully(this) + flip() + fileWriteChannel.writeFully(this) - currentPosition += available + currentPosition += available - sendChannel.send(currentPosition.toDouble() / totalBytes) - } - header.contentEquals(Const.Message.FAIL) -> { - val size = headerBuffer.copyOfRange(4, 8).toInt() - socket.readFully(dataBuffer, 0, size) - val errorMessage = String(dataBuffer, 0, size) - throw PullFailedException("Failed to pull file $remotePath: $errorMessage") - } - else -> { - throw UnsupportedSyncProtocolException("Unexpected header message ${String(header, Const.DEFAULT_TRANSPORT_ENCODING)}") + sendChannel.send(currentPosition.toDouble() / totalBytes) + } + header.contentEquals(Const.Message.FAIL) -> { + val size = data.copyOfRange(4, 8).toInt() + clear() + compatLimit(size) + socket.readFully(this) + flip() + array() + val errorMessage = String(array(), 0, size) + throw PullFailedException("Failed to pull file $remotePath: $errorMessage") + } + else -> { + throw UnsupportedSyncProtocolException("Unexpected header message ${String(header, Const.DEFAULT_TRANSPORT_ENCODING)}") + } } + return false } - return false } override suspend fun close(channel: SendChannel) { diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt index 9b6fab852..a2a33b669 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt @@ -24,6 +24,7 @@ import com.malinskiy.adam.extension.write import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxPacketBuffer import io.ktor.util.cio.* import io.ktor.utils.io.* import kotlinx.coroutines.Dispatchers @@ -38,39 +39,41 @@ abstract class BasePushFileRequest( coroutineContext: CoroutineContext = Dispatchers.IO ) : AsyncChannelRequest() { protected val fileReadChannel = local.readChannel(coroutineContext = coroutineContext) - protected val buffer = ByteArray(Const.KTOR_INTERNAL_BUFFER_LENGTH) protected var totalBytes = local.length() protected var currentPosition = 0L protected val modeValue: Int get() = mode.toInt(8) and "0777".toInt(8) override suspend fun readElement(socket: Socket, sendChannel: SendChannel): Boolean { - val available = fileReadChannel.readAvailable(buffer, 8, Const.KTOR_INTERNAL_BUFFER_LENGTH - 8) - when { - available < 0 -> { - Const.Message.DONE.copyInto(buffer) - (local.lastModified() / 1000).toInt().toByteArray().copyInto(buffer, destinationOffset = 4) - socket.write(request = buffer, length = 8) - val transportResponse = socket.readTransportResponse() - fileReadChannel.cancel() + withMaxPacketBuffer { + val data = array() + val available = fileReadChannel.readAvailable(data, 8, data.size - 8) + when { + available < 0 -> { + Const.Message.DONE.copyInto(data) + (local.lastModified() / 1000).toInt().toByteArray().copyInto(data, destinationOffset = 4) + socket.write(request = data, length = 8) + val transportResponse = socket.readTransportResponse() + fileReadChannel.cancel() - if (transportResponse.okay) { - sendChannel.send(1.0) - return true - } else { - throw PushFailedException("adb didn't acknowledge the file transfer: ${transportResponse.message ?: ""}") + if (transportResponse.okay) { + sendChannel.send(1.0) + return true + } else { + throw PushFailedException("adb didn't acknowledge the file transfer: ${transportResponse.message ?: ""}") + } } + available > 0 -> { + Const.Message.DATA.copyInto(data) + available.toByteArray().reversedArray().copyInto(data, destinationOffset = 4) + socket.writeFully(data, 0, available + 8) + currentPosition += available + sendChannel.send(currentPosition.toDouble() / totalBytes) + } + else -> Unit } - available > 0 -> { - Const.Message.DATA.copyInto(buffer) - available.toByteArray().reversedArray().copyInto(buffer, destinationOffset = 4) - socket.writeFully(buffer, 0, available + 8) - currentPosition += available - sendChannel.send(currentPosition.toDouble() / totalBytes) - } - else -> Unit + return false } - return false } override fun serialize() = createBaseRequest("sync:") diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/ListFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/ListFileRequest.kt index 7b4351f3c..d097718e4 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/ListFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/ListFileRequest.kt @@ -23,6 +23,7 @@ import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.sync.model.FileEntryV1 import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withDefaultBuffer import java.time.Instant class ListFileRequest( @@ -40,32 +41,37 @@ class ListFileRequest( override suspend fun readElement(socket: Socket): List { socket.writeSyncRequest(Const.Message.LIST_V1, remotePath) - val bytes = ByteArray(16) - val stringBytes = ByteArray(Const.MAX_REMOTE_PATH_LENGTH) - val result = mutableListOf() - loop@ while (true) { - socket.readFully(bytes, 0, 4) - when { - bytes.copyOfRange(0, 4).contentEquals(Const.Message.DENT_V1) -> { - socket.readFully(bytes, 0, 16) - val nameLength = bytes.copyOfRange(12, 16).toInt() - socket.readFully(stringBytes, 0, nameLength) + withDefaultBuffer { + val data = array() + val result = mutableListOf() + loop@ while (true) { - result.add( - FileEntryV1( - mode = bytes.copyOfRange(0, 4).toInt().toUInt(), - size = bytes.copyOfRange(4, 8).toInt().toUInt(), - mtime = Instant.ofEpochSecond(bytes.copyOfRange(8, 12).toInt().toLong()), - name = String(stringBytes, 0, nameLength, Const.DEFAULT_TRANSPORT_ENCODING) + socket.readFully(data, 0, 4) + when { + data.copyOfRange(0, 4).contentEquals(Const.Message.DENT_V1) -> { + socket.readFully(data, 0, 16) + val mode = data.copyOfRange(0, 4).toInt().toUInt() + val size = data.copyOfRange(4, 8).toInt().toUInt() + val mtime = Instant.ofEpochSecond(data.copyOfRange(8, 12).toInt().toLong()) + val nameLength = data.copyOfRange(12, 16).toInt() + socket.readFully(data, 0, nameLength) + + result.add( + FileEntryV1( + mode = mode, + size = size, + mtime = mtime, + name = String(data, 0, nameLength, Const.DEFAULT_TRANSPORT_ENCODING) + ) ) - ) + } + data.copyOfRange(0, 4).contentEquals(Const.Message.DONE) -> break@loop + else -> break@loop } - bytes.copyOfRange(0, 4).contentEquals(Const.Message.DONE) -> break@loop - else -> break@loop } - } - return result + return result + } } override fun serialize() = createBaseRequest("sync:") diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt index b746bbc24..1fa8ed7dc 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt @@ -18,9 +18,9 @@ package com.malinskiy.adam.request.sync.v1 import com.malinskiy.adam.Const import com.malinskiy.adam.extension.toByteArray -import com.malinskiy.adam.extension.write import com.malinskiy.adam.request.sync.base.BasePushFileRequest import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withDefaultBuffer import kotlinx.coroutines.Dispatchers import java.io.File import kotlin.coroutines.CoroutineContext @@ -42,13 +42,13 @@ class PushFileRequest( val packetLength = (path.size + mode.size) val size = packetLength.toByteArray().reversedArray() - val cmd = ByteArray(8 + path.size + 4) - - type.copyInto(cmd) - size.copyInto(cmd, 4) - path.copyInto(cmd, 8) - mode.copyInto(cmd, 8 + path.size) - - socket.write(cmd) + withDefaultBuffer { + put(type) + put(size) + put(path) + put(mode) + flip() + socket.writeFully(this) + } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt index e837dd186..4d1f98314 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt @@ -18,6 +18,7 @@ package com.malinskiy.adam.request.sync.v1 import com.malinskiy.adam.Const import com.malinskiy.adam.exception.UnsupportedSyncProtocolException +import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.toInt import com.malinskiy.adam.extension.toUInt import com.malinskiy.adam.extension.writeSyncRequest @@ -25,6 +26,7 @@ import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.sync.model.FileEntryV1 import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withDefaultBuffer import java.time.Instant class StatFileRequest( @@ -33,16 +35,20 @@ class StatFileRequest( override suspend fun readElement(socket: Socket): FileEntryV1 { socket.writeSyncRequest(Const.Message.LSTAT_V1, remotePath) - val bytes = ByteArray(16) - socket.readFully(bytes, 0, 16) + withDefaultBuffer { + compatLimit(16) + socket.readFully(this) + flip() - if (!bytes.copyOfRange(0, 4).contentEquals(Const.Message.LSTAT_V1)) throw UnsupportedSyncProtocolException() + val bytes = array() + if (!bytes.copyOfRange(0, 4).contentEquals(Const.Message.LSTAT_V1)) throw UnsupportedSyncProtocolException() - return FileEntryV1( - mode = bytes.copyOfRange(4, 8).toUInt(), - size = bytes.copyOfRange(8, 12).toUInt(), - mtime = Instant.ofEpochSecond(bytes.copyOfRange(12, 16).toInt().toLong()) - ) + return FileEntryV1( + mode = bytes.copyOfRange(4, 8).toUInt(), + size = bytes.copyOfRange(8, 12).toUInt(), + mtime = Instant.ofEpochSecond(bytes.copyOfRange(12, 16).toInt().toLong()) + ) + } } override fun validate(): ValidationResponse { diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v2/ListFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v2/ListFileRequest.kt index ba417d178..2a40ee18a 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v2/ListFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v2/ListFileRequest.kt @@ -24,6 +24,7 @@ import com.malinskiy.adam.request.Feature import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.sync.model.FileEntryV2 import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withDefaultBuffer import java.time.Instant @Features(Feature.LS_V2) @@ -48,40 +49,53 @@ class ListFileRequest( override suspend fun readElement(socket: Socket): List { socket.writeSyncRequest(Const.Message.LIST_V2, remotePath) - val stringBytes = ByteArray(Const.MAX_REMOTE_PATH_LENGTH) + withDefaultBuffer { + val data = array() +// val bytes = ByteArray(72) + val result = mutableListOf() + loop@ while (true) { + socket.readFully(data, 0, 4) + when { + data.copyOfRange(0, 4).contentEquals(Const.Message.DENT_V2) -> { + socket.readFully(data, 0, 72) + val nameLength = data.copyOfRange(68, 72).toInt() + val error = data.copyOfRange(0, 4).toUInt() + val dev = data.copyOfRange(4, 12).toULong() + val ino = data.copyOfRange(12, 20).toULong() + val mode = data.copyOfRange(20, 24).toUInt() + val nlink = data.copyOfRange(24, 28).toUInt() + val uid = data.copyOfRange(28, 32).toUInt() + val gid = data.copyOfRange(32, 36).toUInt() + val size = data.copyOfRange(36, 44).toULong() + val atime = Instant.ofEpochSecond(data.copyOfRange(44, 52).toLong()) + val mtime = Instant.ofEpochSecond(data.copyOfRange(52, 60).toLong()) + val ctime = Instant.ofEpochSecond(data.copyOfRange(60, 68).toLong()) - val bytes = ByteArray(72) - val result = mutableListOf() - loop@ while (true) { - socket.readFully(bytes, 0, 4) - when { - bytes.copyOfRange(0, 4).contentEquals(Const.Message.DENT_V2) -> { - socket.readFully(bytes, 0, 72) - val nameLength = bytes.copyOfRange(68, 72).toInt() - socket.readFully(stringBytes, 0, nameLength) - result.add( - FileEntryV2( - error = bytes.copyOfRange(0, 4).toUInt(), - dev = bytes.copyOfRange(4, 12).toULong(), - ino = bytes.copyOfRange(12, 20).toULong(), - mode = bytes.copyOfRange(20, 24).toUInt(), - nlink = bytes.copyOfRange(24, 28).toUInt(), - uid = bytes.copyOfRange(28, 32).toUInt(), - gid = bytes.copyOfRange(32, 36).toUInt(), - size = bytes.copyOfRange(36, 44).toULong(), - atime = Instant.ofEpochSecond(bytes.copyOfRange(44, 52).toLong()), - mtime = Instant.ofEpochSecond(bytes.copyOfRange(52, 60).toLong()), - ctime = Instant.ofEpochSecond(bytes.copyOfRange(60, 68).toLong()), - name = String(stringBytes, 0, nameLength, Const.DEFAULT_TRANSPORT_ENCODING) + socket.readFully(data, 0, nameLength) + result.add( + FileEntryV2( + error = error, + dev = dev, + ino = ino, + mode = mode, + nlink = nlink, + uid = uid, + gid = gid, + size = size, + atime = atime, + mtime = mtime, + ctime = ctime, + name = String(data, 0, nameLength, Const.DEFAULT_TRANSPORT_ENCODING) + ) ) - ) + } + data.copyOfRange(0, 4).contentEquals(Const.Message.DONE) -> break@loop + else -> break@loop } - bytes.copyOfRange(0, 4).contentEquals(Const.Message.DONE) -> break@loop - else -> break@loop } - } - return result + return result + } } override fun serialize() = createBaseRequest("sync:") diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v2/StatFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v2/StatFileRequest.kt index 334c9c8da..d968eac88 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v2/StatFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v2/StatFileRequest.kt @@ -28,6 +28,7 @@ import com.malinskiy.adam.request.Feature import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.sync.model.FileEntryV2 import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withDefaultBuffer import java.time.Instant @Features(Feature.STAT_V2) @@ -38,24 +39,26 @@ class StatFileRequest( override suspend fun readElement(socket: Socket): FileEntryV2 { socket.writeSyncRequest(Const.Message.LSTAT_V2, remotePath) - val bytes = ByteArray(72) - socket.readFully(bytes, 0, 72) + withDefaultBuffer { + val bytes = array() + socket.readFully(bytes, 0, 72) - if (!bytes.copyOfRange(0, 4).contentEquals(Const.Message.LSTAT_V2)) throw UnsupportedSyncProtocolException() + if (!bytes.copyOfRange(0, 4).contentEquals(Const.Message.LSTAT_V2)) throw UnsupportedSyncProtocolException() - return FileEntryV2( - error = bytes.copyOfRange(4, 8).toUInt(), - dev = bytes.copyOfRange(8, 16).toULong(), - ino = bytes.copyOfRange(16, 24).toULong(), - mode = bytes.copyOfRange(24, 28).toUInt(), - nlink = bytes.copyOfRange(28, 32).toUInt(), - uid = bytes.copyOfRange(32, 36).toUInt(), - gid = bytes.copyOfRange(36, 40).toUInt(), - size = bytes.copyOfRange(40, 48).toULong(), - atime = Instant.ofEpochSecond(bytes.copyOfRange(48, 56).toLong()), - mtime = Instant.ofEpochSecond(bytes.copyOfRange(56, 64).toLong()), - ctime = Instant.ofEpochSecond(bytes.copyOfRange(64, 72).toLong()) - ) + return FileEntryV2( + error = bytes.copyOfRange(4, 8).toUInt(), + dev = bytes.copyOfRange(8, 16).toULong(), + ino = bytes.copyOfRange(16, 24).toULong(), + mode = bytes.copyOfRange(24, 28).toUInt(), + nlink = bytes.copyOfRange(28, 32).toUInt(), + uid = bytes.copyOfRange(32, 36).toUInt(), + gid = bytes.copyOfRange(36, 40).toUInt(), + size = bytes.copyOfRange(40, 48).toULong(), + atime = Instant.ofEpochSecond(bytes.copyOfRange(48, 56).toLong()), + mtime = Instant.ofEpochSecond(bytes.copyOfRange(56, 64).toLong()), + ctime = Instant.ofEpochSecond(bytes.copyOfRange(64, 72).toLong()) + ) + } } override fun validate(): ValidationResponse { diff --git a/src/main/kotlin/com/malinskiy/adam/request/testrunner/TestRunnerRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/testrunner/TestRunnerRequest.kt index fa8a70ca9..95d6341ac 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/testrunner/TestRunnerRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/testrunner/TestRunnerRequest.kt @@ -16,12 +16,12 @@ package com.malinskiy.adam.request.testrunner -import com.malinskiy.adam.Const import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.transform.InstrumentationResponseTransformer import com.malinskiy.adam.request.transform.ProgressiveResponseTransformer import com.malinskiy.adam.request.transform.ProtoInstrumentationResponseTransformer import com.malinskiy.adam.transport.Socket +import com.malinskiy.adam.transport.withMaxPacketBuffer import kotlinx.coroutines.channels.SendChannel /** @@ -53,7 +53,6 @@ class TestRunnerRequest( private val outputLogPath: String? = null, private val protobuf: Boolean = false, ) : AsyncChannelRequest, Unit>() { - private val buffer = ByteArray(Const.MAX_PACKET_LENGTH) private val transformer: ProgressiveResponseTransformer?> by lazy { if (protobuf) { @@ -64,19 +63,22 @@ class TestRunnerRequest( } override suspend fun readElement(socket: Socket, sendChannel: SendChannel>): Boolean { - val available = socket.readAvailable(buffer, 0, Const.MAX_PACKET_LENGTH) - - when { - available > 0 -> { - transformer.process(buffer, 0, available)?.let { sendChannel.send(it) } - } - available < 0 -> { - return true + withMaxPacketBuffer { + val buffer = array() + val available = socket.readAvailable(buffer, 0, buffer.size) + + when { + available > 0 -> { + transformer.process(buffer, 0, available)?.let { sendChannel.send(it) } + } + available < 0 -> { + return true + } + else -> null } - else -> null - } - return false + return false + } } override fun serialize() = createBaseRequest(StringBuilder().apply { diff --git a/src/main/kotlin/com/malinskiy/adam/transport/BufferFactory.kt b/src/main/kotlin/com/malinskiy/adam/transport/BufferFactory.kt index ee2553c93..f6b3299a4 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/BufferFactory.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/BufferFactory.kt @@ -22,7 +22,9 @@ import java.nio.ByteBuffer internal const val DEFAULT_BUFFER_SIZE = 4088 -val AdamDefaultPool: ObjectPool = ByteBufferPool(Const.DEFAULT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE) +val AdamDefaultPool: ObjectPool = ByteBufferPool(Const.DEFAULT_BUFFER_SIZE, bufferSize = DEFAULT_BUFFER_SIZE) +val AdamMaxPacketPool: ObjectPool = ByteBufferPool(Const.DEFAULT_BUFFER_SIZE, bufferSize = Const.MAX_PACKET_LENGTH) +val AdamMaxFilePacketPool: ObjectPool = ByteBufferPool(Const.DEFAULT_BUFFER_SIZE, bufferSize = Const.MAX_FILE_PACKET_LENGTH) inline fun withDefaultBuffer(block: ByteBuffer.() -> R): R { val instance = AdamDefaultPool.borrow() @@ -32,3 +34,22 @@ inline fun withDefaultBuffer(block: ByteBuffer.() -> R): R { AdamDefaultPool.recycle(instance) } } + +inline fun withMaxPacketBuffer(block: ByteBuffer.() -> R): R { + val instance = AdamMaxPacketPool.borrow() + return try { + block(instance) + } finally { + AdamMaxPacketPool.recycle(instance) + } +} + +inline fun withMaxFilePacketBuffer(block: ByteBuffer.() -> R): R { + val instance = AdamMaxFilePacketPool.borrow() + return try { + block(instance) + } finally { + AdamMaxFilePacketPool.recycle(instance) + } +} + diff --git a/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt b/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt index 410729e88..7b29e310e 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt @@ -16,11 +16,13 @@ package com.malinskiy.adam.transport +import com.malinskiy.adam.extension.compatLimit import kotlinx.coroutines.isActive import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withTimeoutOrNull import kotlinx.coroutines.yield +import java.io.EOFException import java.io.IOException import java.net.InetSocketAddress import java.net.SocketTimeoutException @@ -53,13 +55,15 @@ class NioSocket( suspend fun connect() { if (!state.compareAndSet(State.CLOSED, State.SYN_SENT)) return - socketChannel = SelectorProvider.provider().openSocketChannel().apply { + val selectorProvider = SelectorProvider.provider() + + socketChannel = selectorProvider.openSocketChannel().apply { configureBlocking(false) configure(socket()) } val success = socketChannel.connect(socketAddress) - selector = SelectorProvider.provider().openSelector() + selector = selectorProvider.openSelector() if (success) { processAccept(selector) } else { @@ -106,7 +110,7 @@ class NioSocket( yield() } - return@withTimeoutOrNull remaining + return@withTimeoutOrNull buffer.limit() - remaining } ?: throw SocketTimeoutException("Timeout reading") } @@ -117,20 +121,30 @@ class NioSocket( override suspend fun readByte(): Byte { val buffer = ByteBuffer.allocate(1) val read = readFully(buffer) - //TODO: handle EOF + if (read == -1) throw EOFException("Input channel was closed by remote host") return buffer.array()[0] } override suspend fun writeByte(value: Int) { - writeFully(ByteArray(1) { value.toByte() }) + withDefaultBuffer { + put(value.toByte()) + flip() + writeFully(this) + } } override suspend fun readIntLittleEndian(): Int { - val allocate = ByteBuffer.allocate(4) - allocate.order(ByteOrder.LITTLE_ENDIAN) - val read = readFully(allocate) - allocate.flip() - return allocate.int + withDefaultBuffer { + val order = order() + order(ByteOrder.LITTLE_ENDIAN) + compatLimit(4) + val read = readFully(this) + if (read == -1) throw EOFException("Input channel was closed by remote host") + flip() + val result = int + order(order) + return result + } } override suspend fun writeIntLittleEndian(value: Int) { @@ -141,6 +155,7 @@ class NioSocket( writeFully(allocate) } + @Suppress("BlockingMethodInNonBlockingContext") override suspend fun close() { mutex.withLock { val shouldDrain = when { @@ -193,7 +208,12 @@ class NioSocket( while (iterator.hasNext()) { val selectionKey = iterator.next() if (selectionKey.isConnectable) { - socketChannel.finishConnect() + try { + (selectionKey.channel() as SocketChannel).finishConnect() + } catch (e: IOException) { + selectionKey.cancel() + throw e + } selectionKey.interestOps(0) val success = state.compareAndSet(State.SYN_SENT, State.ESTABLISHED) @@ -209,6 +229,7 @@ class NioSocket( if (socketChannel.isConnectionPending) { try { socketChannel.close() + socketChannel.keyFor(selector).cancel() } catch (e: IOException) { //ignore } @@ -255,6 +276,7 @@ class NioSocket( state.set(State.CLOSE_WAIT) } State.CLOSING -> state.set(State.CLOSED) + else -> Unit } } selectionKey.interestOps(0) diff --git a/src/test/kotlin/com/malinskiy/adam/request/async/AsyncDeviceMonitorRequestTest.kt b/src/test/kotlin/com/malinskiy/adam/request/async/AsyncDeviceMonitorRequestTest.kt index b1754a384..23bbf3631 100644 --- a/src/test/kotlin/com/malinskiy/adam/request/async/AsyncDeviceMonitorRequestTest.kt +++ b/src/test/kotlin/com/malinskiy/adam/request/async/AsyncDeviceMonitorRequestTest.kt @@ -28,7 +28,6 @@ import io.ktor.utils.io.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.receiveOrNull -import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.Test import kotlin.coroutines.CoroutineContext @@ -48,8 +47,7 @@ class AsyncDeviceMonitorRequestTest : CoroutineScope { output.writeFully(response, 0, response.size) response = ("0015emulator-5554\tdevice\n").toByteArray(Const.DEFAULT_TRANSPORT_ENCODING) output.writeFully(response, 0, response.size) - //Need to delay the server otherwise the client doesn't have time to check the response - delay(10000) + input.discard() output.close() } diff --git a/src/test/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequestTest.kt b/src/test/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequestTest.kt index 3325ca590..6909d5947 100644 --- a/src/test/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequestTest.kt +++ b/src/test/kotlin/com/malinskiy/adam/request/forwarding/ListPortForwardsRequestTest.kt @@ -46,6 +46,7 @@ class ListPortForwardsRequestTest { """.trimIndent() ) + input.discard() output.close() } diff --git a/src/test/kotlin/com/malinskiy/adam/transport/NioSocketTest.kt b/src/test/kotlin/com/malinskiy/adam/transport/NioSocketTest.kt new file mode 100644 index 000000000..586284c48 --- /dev/null +++ b/src/test/kotlin/com/malinskiy/adam/transport/NioSocketTest.kt @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2021 Anton Malinskiy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.malinskiy.adam.transport + +import kotlinx.coroutines.runBlocking +import org.junit.Test +import java.net.ConnectException +import java.net.InetSocketAddress + +class NioSocketTest { + @Test(expected = ConnectException::class) + fun testClosedPort() { + runBlocking { + //This will fail obviously in a scenario where 65535 is actually open + NioSocket(InetSocketAddress("localhost", 65535), 1_000, 1_000).connect() + } + } +} \ No newline at end of file From 6e0d6a9bd89557712edeeb5fe4781db382e4b88f Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 00:28:14 +1100 Subject: [PATCH 3/7] fix(socket): remove debug println --- src/main/kotlin/com/malinskiy/adam/extension/Socket.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt index afec31482..af551c126 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt @@ -121,9 +121,8 @@ suspend fun Socket.readProtocolString(): String { withDefaultBuffer { val transformer = StringResponseTransformer() val copied = copyTo(transformer, this, limit = 4L) - println(copied) val length = transformer.transform() - if (length.length != 4) { + if (copied != 4L) { throw RequestRejectedException("Unexpected string length: $length") } val messageLength = length.toIntOrNull(16) ?: throw RequestRejectedException("Unexpected string length: $length") From f3f4f4546a40b29e0d56ae7227881dfae56a03a3 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 01:29:11 +1100 Subject: [PATCH 4/7] fix(test): fix test failures --- .../malinskiy/adam/extension/ByteBuffer.kt | 1 + .../com/malinskiy/adam/extension/Socket.kt | 15 ++++++++--- .../framebuffer/ScreenCaptureRequest.kt | 3 ++- .../adam/request/misc/ReconnectRequest.kt | 11 ++++---- .../request/sync/base/BasePullFileRequest.kt | 5 ++-- .../adam/request/sync/v1/PushFileRequest.kt | 3 ++- .../adam/request/sync/v1/StatFileRequest.kt | 7 ++--- .../com/malinskiy/adam/transport/NioSocket.kt | 27 +++++++++++-------- 8 files changed, 43 insertions(+), 29 deletions(-) diff --git a/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt b/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt index a9cb1d9cb..0928fc3d2 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt @@ -27,3 +27,4 @@ import java.nio.ByteBuffer fun ByteBuffer.compatRewind() = ((this as Buffer).rewind() as ByteBuffer) fun ByteBuffer.compatLimit(newLimit: Int) = ((this as Buffer).limit(newLimit) as ByteBuffer) fun ByteBuffer.compatPosition(newLimit: Int) = ((this as Buffer).position(newLimit) as ByteBuffer) +fun ByteBuffer.compatFlip() = ((this as Buffer).flip() as ByteBuffer) diff --git a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt index af551c126..f2c1c4610 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt @@ -139,7 +139,7 @@ suspend fun Socket.read(): TransportResponse { val ok = withDefaultBuffer { compatLimit(4) readFully(this) - array().isOkay() + isOkay() } val message = if (!ok) { readOptionalProtocolString() @@ -149,7 +149,13 @@ suspend fun Socket.read(): TransportResponse { return TransportResponse(ok, message) } -private fun ByteArray.isOkay() = contentEquals(Const.Message.OKAY) +private fun ByteBuffer.isOkay(): Boolean { + if (limit() != 4) return false + for (i in 0..3) { + if (get(i) != Const.Message.OKAY[i]) return false + } + return true +} suspend fun Socket.readStatus(): String { withDefaultBuffer { @@ -181,7 +187,7 @@ suspend fun Socket.writeSyncRequest(type: ByteArray, remotePath: String) { put(type) put(size) put(path) - flip() + compatFlip() writeFully(this) } } @@ -212,7 +218,8 @@ suspend fun Socket.readTransportResponse(): TransportResponse { val ok = withDefaultBuffer { compatLimit(4) readFully(this) - array().isOkay() + compatFlip() + isOkay() } val message = if (!ok) { readOptionalProtocolString() diff --git a/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt index cc0b356ab..96eeeee47 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt @@ -17,6 +17,7 @@ package com.malinskiy.adam.request.framebuffer import com.malinskiy.adam.exception.UnsupportedImageProtocolException +import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.compatRewind import com.malinskiy.adam.request.ComplexRequest @@ -48,7 +49,7 @@ class ScreenCaptureRequest(private val adapter: ScreenCaptureAdapter) : Co socket.writeFully(ByteArray(1) { 0.toByte() }, 0, 1) order(ByteOrder.LITTLE_ENDIAN) - flip() + compatFlip() return when (protocolVersion) { 16 -> adapter.process( version = protocolVersion, diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt index 67a127410..6c31ec87f 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt @@ -17,6 +17,7 @@ package com.malinskiy.adam.request.misc import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.NonSpecifiedTarget @@ -41,17 +42,17 @@ class ReconnectRequest( withDefaultBuffer { compatLimit(4) socket.readFully(this) - flip() - return if (array().contentEquals(done)) { + compatFlip() + return if (array().copyOfRange(0, 4).contentEquals(done)) { "done" } else { //This is length of a response string - val size = String(array(), Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) + val size = String(array(), 0, 4, Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) clear() compatLimit(size) socket.readFully(this) - flip() - String(array(), Const.DEFAULT_TRANSPORT_ENCODING) + compatFlip() + String(array(), 0, size, Const.DEFAULT_TRANSPORT_ENCODING) } } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt index 960eb9689..4f22d0c7a 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt @@ -19,6 +19,7 @@ package com.malinskiy.adam.request.sync.base import com.malinskiy.adam.Const import com.malinskiy.adam.exception.PullFailedException import com.malinskiy.adam.exception.UnsupportedSyncProtocolException +import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.toInt import com.malinskiy.adam.request.AsyncChannelRequest @@ -79,7 +80,7 @@ abstract class BasePullFileRequest( clear() compatLimit(available) socket.readFully(this) - flip() + compatFlip() fileWriteChannel.writeFully(this) currentPosition += available @@ -91,7 +92,7 @@ abstract class BasePullFileRequest( clear() compatLimit(size) socket.readFully(this) - flip() + compatFlip() array() val errorMessage = String(array(), 0, size) throw PullFailedException("Failed to pull file $remotePath: $errorMessage") diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt index 1fa8ed7dc..710418077 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/PushFileRequest.kt @@ -17,6 +17,7 @@ package com.malinskiy.adam.request.sync.v1 import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.toByteArray import com.malinskiy.adam.request.sync.base.BasePushFileRequest import com.malinskiy.adam.transport.Socket @@ -47,7 +48,7 @@ class PushFileRequest( put(size) put(path) put(mode) - flip() + compatFlip() socket.writeFully(this) } } diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt index 4d1f98314..66adab279 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/v1/StatFileRequest.kt @@ -18,10 +18,7 @@ package com.malinskiy.adam.request.sync.v1 import com.malinskiy.adam.Const import com.malinskiy.adam.exception.UnsupportedSyncProtocolException -import com.malinskiy.adam.extension.compatLimit -import com.malinskiy.adam.extension.toInt -import com.malinskiy.adam.extension.toUInt -import com.malinskiy.adam.extension.writeSyncRequest +import com.malinskiy.adam.extension.* import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.request.sync.model.FileEntryV1 @@ -38,7 +35,7 @@ class StatFileRequest( withDefaultBuffer { compatLimit(16) socket.readFully(this) - flip() + compatFlip() val bytes = array() if (!bytes.copyOfRange(0, 4).contentEquals(Const.Message.LSTAT_V1)) throw UnsupportedSyncProtocolException() diff --git a/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt b/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt index 7b29e310e..dc5496d2b 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt @@ -16,6 +16,7 @@ package com.malinskiy.adam.transport +import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import kotlinx.coroutines.isActive import kotlinx.coroutines.sync.Mutex @@ -119,16 +120,19 @@ class NioSocket( } override suspend fun readByte(): Byte { - val buffer = ByteBuffer.allocate(1) - val read = readFully(buffer) - if (read == -1) throw EOFException("Input channel was closed by remote host") - return buffer.array()[0] + withDefaultBuffer { + compatLimit(1) + val read = readFully(this) + compatFlip() + if (read == -1) throw EOFException("Input channel was closed by remote host") + return this.get() + } } override suspend fun writeByte(value: Int) { withDefaultBuffer { put(value.toByte()) - flip() + compatFlip() writeFully(this) } } @@ -140,7 +144,7 @@ class NioSocket( compatLimit(4) val read = readFully(this) if (read == -1) throw EOFException("Input channel was closed by remote host") - flip() + compatFlip() val result = int order(order) return result @@ -148,11 +152,12 @@ class NioSocket( } override suspend fun writeIntLittleEndian(value: Int) { - val allocate = ByteBuffer.allocate(4) - allocate.order(ByteOrder.LITTLE_ENDIAN) - allocate.putInt(value) - allocate.flip() - writeFully(allocate) + withDefaultBuffer { + order(ByteOrder.LITTLE_ENDIAN) + putInt(value) + compatFlip() + writeFully(this) + } } @Suppress("BlockingMethodInNonBlockingContext") From 4b736e5c847e6bb0784f8b038449e5cccd10bb93 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 10:42:50 +1100 Subject: [PATCH 5/7] fix(bytebuffer): yet another incompatibility with JDK 8 --- src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt | 1 + src/main/kotlin/com/malinskiy/adam/extension/Socket.kt | 2 +- .../adam/request/framebuffer/ScreenCaptureRequest.kt | 3 ++- .../com/malinskiy/adam/request/misc/ReconnectRequest.kt | 3 ++- .../adam/request/pkg/StreamingPackageInstallRequest.kt | 3 ++- .../malinskiy/adam/request/sync/base/BasePullFileRequest.kt | 5 +++-- src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt | 3 ++- 7 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt b/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt index 0928fc3d2..6ca8a8d48 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/ByteBuffer.kt @@ -28,3 +28,4 @@ fun ByteBuffer.compatRewind() = ((this as Buffer).rewind() as ByteBuffer) fun ByteBuffer.compatLimit(newLimit: Int) = ((this as Buffer).limit(newLimit) as ByteBuffer) fun ByteBuffer.compatPosition(newLimit: Int) = ((this as Buffer).position(newLimit) as ByteBuffer) fun ByteBuffer.compatFlip() = ((this as Buffer).flip() as ByteBuffer) +fun ByteBuffer.compatClear() = ((this as Buffer).clear() as ByteBuffer) diff --git a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt index f2c1c4610..996ac8d94 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/Socket.kt @@ -127,7 +127,7 @@ suspend fun Socket.readProtocolString(): String { } val messageLength = length.toIntOrNull(16) ?: throw RequestRejectedException("Unexpected string length: $length") - clear() + compatClear() compatLimit(messageLength) val read = readFully(this) if (read != messageLength) throw RequestRejectedException("Incomplete string received") diff --git a/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt index 96eeeee47..024b157e2 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/framebuffer/ScreenCaptureRequest.kt @@ -17,6 +17,7 @@ package com.malinskiy.adam.request.framebuffer import com.malinskiy.adam.exception.UnsupportedImageProtocolException +import com.malinskiy.adam.extension.compatClear import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.compatRewind @@ -43,7 +44,7 @@ class ScreenCaptureRequest(private val adapter: ScreenCaptureAdapter) : Co */ else -> throw UnsupportedImageProtocolException(protocolVersion) } - clear() + compatClear() compatLimit(headerSize * 4) socket.readFully(this) socket.writeFully(ByteArray(1) { 0.toByte() }, 0, 1) diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt index 6c31ec87f..671336b73 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ReconnectRequest.kt @@ -17,6 +17,7 @@ package com.malinskiy.adam.request.misc import com.malinskiy.adam.Const +import com.malinskiy.adam.extension.compatClear import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.request.ComplexRequest @@ -48,7 +49,7 @@ class ReconnectRequest( } else { //This is length of a response string val size = String(array(), 0, 4, Const.DEFAULT_TRANSPORT_ENCODING).toInt(radix = 16) - clear() + compatClear() compatLimit(size) socket.readFully(this) compatFlip() diff --git a/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt index a7487fb27..ab5347eaa 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt @@ -18,6 +18,7 @@ package com.malinskiy.adam.request.pkg import com.malinskiy.adam.annotation.Features import com.malinskiy.adam.extension.bashEscape +import com.malinskiy.adam.extension.compatClear import com.malinskiy.adam.extension.copyTo import com.malinskiy.adam.request.ComplexRequest import com.malinskiy.adam.request.Feature @@ -119,7 +120,7 @@ class StreamingPackageInstallRequest( fileChannel?.cancel() } - clear() + compatClear() socket.copyTo(transformer, this) return transformer.transform().startsWith("Success") } diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt index 4f22d0c7a..032e0ab44 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePullFileRequest.kt @@ -19,6 +19,7 @@ package com.malinskiy.adam.request.sync.base import com.malinskiy.adam.Const import com.malinskiy.adam.exception.PullFailedException import com.malinskiy.adam.exception.UnsupportedSyncProtocolException +import com.malinskiy.adam.extension.compatClear import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import com.malinskiy.adam.extension.toInt @@ -77,7 +78,7 @@ abstract class BasePullFileRequest( if (available > Const.MAX_FILE_PACKET_LENGTH) { throw UnsupportedSyncProtocolException() } - clear() + compatClear() compatLimit(available) socket.readFully(this) compatFlip() @@ -89,7 +90,7 @@ abstract class BasePullFileRequest( } header.contentEquals(Const.Message.FAIL) -> { val size = data.copyOfRange(4, 8).toInt() - clear() + compatClear() compatLimit(size) socket.readFully(this) compatFlip() diff --git a/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt b/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt index dc5496d2b..f190e041a 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/NioSocket.kt @@ -16,6 +16,7 @@ package com.malinskiy.adam.transport +import com.malinskiy.adam.extension.compatClear import com.malinskiy.adam.extension.compatFlip import com.malinskiy.adam.extension.compatLimit import kotlinx.coroutines.isActive @@ -182,7 +183,7 @@ class NioSocket( if (shouldDrain) { val buffer = ByteBuffer.allocate(128) while (true) { - buffer.clear() + buffer.compatClear() if (readUnsafe(selector, buffer) == -1 || state.get() == State.CLOSED || isClosedForRead) { break } else { From 2ca535cbc8deb0db29567e254db55eac67cf61a6 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 12:06:58 +1100 Subject: [PATCH 6/7] fix(ci): add test results back --- .github/workflows/ci.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 93f1e2750..6ab8c008f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,6 +13,7 @@ jobs: - name: gradle test jacocoTestReport run: ./gradlew test jacocoTestReport - name: archive test results + if: failure() run: (cd build/reports/tests/test; zip -r -X ../../../../test-result.zip .) - name: Save test output uses: actions/upload-artifact@master @@ -55,6 +56,7 @@ jobs: - name: Generate integration code coverage report run: ./gradlew jacocoIntegrationTestReport - name: archive integration test results + if: failure() run: (cd build/reports/tests/integrationTest; zip -r -X ../../../../integration-test-result.zip .) - name: Save integration test output uses: actions/upload-artifact@master From 6a94700373169e2b734f205210090cc8c1a5adeb Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 16:35:34 +1100 Subject: [PATCH 7/7] fix(usb): make sure usb + ktor works --- .../malinskiy/adam/integration/ApkE2ETest.kt | 5 ++++ .../adam/extension/ByteReadChannel.kt | 25 ++++++++++++++++++- .../adam/request/misc/ExecInRequest.kt | 1 - .../request/sync/base/BasePushFileRequest.kt | 23 ++++++++++------- .../malinskiy/adam/transport/KtorSocket.kt | 2 +- 5 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt b/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt index 60d42f977..3bcc33c80 100644 --- a/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt +++ b/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt @@ -25,6 +25,7 @@ import com.malinskiy.adam.request.shell.v1.ShellCommandRequest import com.malinskiy.adam.request.sync.v1.PushFileRequest import com.malinskiy.adam.rule.AdbDeviceRule import kotlinx.coroutines.channels.receiveOrNull +import kotlinx.coroutines.debug.junit4.CoroutinesTimeout import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.After @@ -41,6 +42,10 @@ class ApkE2ETest { val adb = AdbDeviceRule() val client = adb.adb + @Rule + @JvmField + val timeout = CoroutinesTimeout.seconds(60) + @Before fun setup() { runBlocking { diff --git a/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt b/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt index f82c021c3..118f1a2e7 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt @@ -37,4 +37,27 @@ suspend fun ByteReadChannel.copyTo(socket: Socket, buffer: ByteArray): Long { } } return processed -} \ No newline at end of file +} + +/** + * Copies up to limit bytes into transformer using buffer. If limit is null - copy until EOF + */ +suspend fun ByteReadChannel.copyTo(buffer: ByteArray, offset: Int, limit: Int): Int { + var processed = 0 + loop@ while (true) { + val toRead = (buffer.size - offset) - processed + val available = readAvailable(buffer, offset + processed, toRead) + when { + processed == limit -> break@loop + available < 0 && processed != 0 -> { + break@loop + } + available < 0 -> return available + available > 0 -> { + processed += available + } + else -> continue@loop + } + } + return processed +} diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt index bb8a4434a..fcb20681a 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt @@ -16,7 +16,6 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const import com.malinskiy.adam.extension.copyTo import com.malinskiy.adam.extension.readStatus import com.malinskiy.adam.request.ComplexRequest diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt index 0b25a6c8e..a20762da9 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt @@ -25,7 +25,7 @@ import com.malinskiy.adam.extension.write import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.transport.Socket -import com.malinskiy.adam.transport.withMaxPacketBuffer +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import io.ktor.util.cio.* import io.ktor.utils.io.* import kotlinx.coroutines.Dispatchers @@ -46,10 +46,10 @@ abstract class BasePushFileRequest( get() = mode.toInt(8) and "0777".toInt(8) override suspend fun readElement(socket: Socket, sendChannel: SendChannel): Boolean { - withMaxPacketBuffer { + withMaxFilePacketBuffer { val data = array() - val available = fileReadChannel.copyTo(data, 8, Const.MAX_FILE_PACKET_LENGTH) - when { + val available = fileReadChannel.copyTo(data, 0, data.size) + return when { available < 0 -> { Const.Message.DONE.copyInto(data) (local.lastModified() / 1000).toInt().toByteArray().copyInto(data, destinationOffset = 4) @@ -59,22 +59,27 @@ abstract class BasePushFileRequest( if (transportResponse.okay) { sendChannel.send(1.0) - return true + true } else { throw PushFailedException("adb didn't acknowledge the file transfer: ${transportResponse.message ?: ""}") } } available > 0 -> { - Const.Message.DATA.copyInto(data) - available.toByteArray().reversedArray().copyInto(data, destinationOffset = 4) + socket.writeFully(Const.Message.DATA) + socket.writeFully(available.toByteArray().reversedArray()) /** * USB devices are very picky about the size of the DATA buffer. Using the adb's default */ - socket.writeFully(data, 0, available + 8) + /** + * USB devices are very picky about the size of the DATA buffer. Using the adb's default + */ + socket.writeFully(data, 0, available) currentPosition += available sendChannel.send(currentPosition.toDouble() / totalBytes) + false } - else -> null + else -> false + } } } diff --git a/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt b/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt index ba5a2721b..4cbb69ba6 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt @@ -35,7 +35,7 @@ class KtorSocket(private val ktorSocket: RealKtorSocket) : Socket { override suspend fun writeFully(byteBuffer: ByteBuffer) = writeChannel.writeFully(byteBuffer) override suspend fun writeFully(toByteArray: ByteArray, offset: Int, limit: Int) = writeChannel.writeFully(toByteArray, offset, limit) override suspend fun readAvailable(buffer: ByteArray, offset: Int, limit: Int): Int { - if (readChannel.availableForRead == 0) return 0 + if (!readChannel.isClosedForRead && readChannel.availableForRead == 0) return 0 return readChannel.readAvailable(buffer, offset, limit) }