From 6a94700373169e2b734f205210090cc8c1a5adeb Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 21 Jan 2021 16:35:34 +1100 Subject: [PATCH] fix(usb): make sure usb + ktor works --- .../malinskiy/adam/integration/ApkE2ETest.kt | 5 ++++ .../adam/extension/ByteReadChannel.kt | 25 ++++++++++++++++++- .../adam/request/misc/ExecInRequest.kt | 1 - .../request/sync/base/BasePushFileRequest.kt | 23 ++++++++++------- .../malinskiy/adam/transport/KtorSocket.kt | 2 +- 5 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt b/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt index 60d42f977..3bcc33c80 100644 --- a/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt +++ b/src/integrationTest/kotlin/com/malinskiy/adam/integration/ApkE2ETest.kt @@ -25,6 +25,7 @@ import com.malinskiy.adam.request.shell.v1.ShellCommandRequest import com.malinskiy.adam.request.sync.v1.PushFileRequest import com.malinskiy.adam.rule.AdbDeviceRule import kotlinx.coroutines.channels.receiveOrNull +import kotlinx.coroutines.debug.junit4.CoroutinesTimeout import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.After @@ -41,6 +42,10 @@ class ApkE2ETest { val adb = AdbDeviceRule() val client = adb.adb + @Rule + @JvmField + val timeout = CoroutinesTimeout.seconds(60) + @Before fun setup() { runBlocking { diff --git a/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt b/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt index f82c021c3..118f1a2e7 100644 --- a/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt +++ b/src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt @@ -37,4 +37,27 @@ suspend fun ByteReadChannel.copyTo(socket: Socket, buffer: ByteArray): Long { } } return processed -} \ No newline at end of file +} + +/** + * Copies up to limit bytes into transformer using buffer. If limit is null - copy until EOF + */ +suspend fun ByteReadChannel.copyTo(buffer: ByteArray, offset: Int, limit: Int): Int { + var processed = 0 + loop@ while (true) { + val toRead = (buffer.size - offset) - processed + val available = readAvailable(buffer, offset + processed, toRead) + when { + processed == limit -> break@loop + available < 0 && processed != 0 -> { + break@loop + } + available < 0 -> return available + available > 0 -> { + processed += available + } + else -> continue@loop + } + } + return processed +} diff --git a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt index bb8a4434a..fcb20681a 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt @@ -16,7 +16,6 @@ package com.malinskiy.adam.request.misc -import com.malinskiy.adam.Const import com.malinskiy.adam.extension.copyTo import com.malinskiy.adam.extension.readStatus import com.malinskiy.adam.request.ComplexRequest diff --git a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt index 0b25a6c8e..a20762da9 100644 --- a/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt +++ b/src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt @@ -25,7 +25,7 @@ import com.malinskiy.adam.extension.write import com.malinskiy.adam.request.AsyncChannelRequest import com.malinskiy.adam.request.ValidationResponse import com.malinskiy.adam.transport.Socket -import com.malinskiy.adam.transport.withMaxPacketBuffer +import com.malinskiy.adam.transport.withMaxFilePacketBuffer import io.ktor.util.cio.* import io.ktor.utils.io.* import kotlinx.coroutines.Dispatchers @@ -46,10 +46,10 @@ abstract class BasePushFileRequest( get() = mode.toInt(8) and "0777".toInt(8) override suspend fun readElement(socket: Socket, sendChannel: SendChannel): Boolean { - withMaxPacketBuffer { + withMaxFilePacketBuffer { val data = array() - val available = fileReadChannel.copyTo(data, 8, Const.MAX_FILE_PACKET_LENGTH) - when { + val available = fileReadChannel.copyTo(data, 0, data.size) + return when { available < 0 -> { Const.Message.DONE.copyInto(data) (local.lastModified() / 1000).toInt().toByteArray().copyInto(data, destinationOffset = 4) @@ -59,22 +59,27 @@ abstract class BasePushFileRequest( if (transportResponse.okay) { sendChannel.send(1.0) - return true + true } else { throw PushFailedException("adb didn't acknowledge the file transfer: ${transportResponse.message ?: ""}") } } available > 0 -> { - Const.Message.DATA.copyInto(data) - available.toByteArray().reversedArray().copyInto(data, destinationOffset = 4) + socket.writeFully(Const.Message.DATA) + socket.writeFully(available.toByteArray().reversedArray()) /** * USB devices are very picky about the size of the DATA buffer. Using the adb's default */ - socket.writeFully(data, 0, available + 8) + /** + * USB devices are very picky about the size of the DATA buffer. Using the adb's default + */ + socket.writeFully(data, 0, available) currentPosition += available sendChannel.send(currentPosition.toDouble() / totalBytes) + false } - else -> null + else -> false + } } } diff --git a/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt b/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt index ba5a2721b..4cbb69ba6 100644 --- a/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt +++ b/src/main/kotlin/com/malinskiy/adam/transport/KtorSocket.kt @@ -35,7 +35,7 @@ class KtorSocket(private val ktorSocket: RealKtorSocket) : Socket { override suspend fun writeFully(byteBuffer: ByteBuffer) = writeChannel.writeFully(byteBuffer) override suspend fun writeFully(toByteArray: ByteArray, offset: Int, limit: Int) = writeChannel.writeFully(toByteArray, offset, limit) override suspend fun readAvailable(buffer: ByteArray, offset: Int, limit: Int): Int { - if (readChannel.availableForRead == 0) return 0 + if (!readChannel.isClosedForRead && readChannel.availableForRead == 0) return 0 return readChannel.readAvailable(buffer, offset, limit) }