Skip to content

Commit

Permalink
Merge pull request #22 from Malinskiy/fix/usb-transport-and-timeouts
Browse files Browse the repository at this point in the history
Fix/usb transport and timeouts
  • Loading branch information
Malinskiy authored Jan 20, 2021
2 parents 31a5cc4 + 46fc897 commit ec31c88
Show file tree
Hide file tree
Showing 84 changed files with 1,260 additions and 849 deletions.
4 changes: 3 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ java {
targetCompatibility = JavaVersion.VERSION_1_8
}

tasks.compileKotlin {
tasks.withType(KotlinCompile::class) {
kotlinOptions.jvmTarget = "1.8"
kotlinOptions.apiVersion = "1.4"
}
Expand All @@ -122,9 +122,11 @@ dependencies {
testImplementation(TestLibraries.assertk)
testImplementation(TestLibraries.junit)
testImplementation(TestLibraries.imageComparison)
testImplementation(TestLibraries.coroutinesDebug)
testImplementation(kotlin("reflect", version = Versions.kotlin))

integrationTestImplementation(TestLibraries.assertk)
integrationTestImplementation(TestLibraries.junit)
integrationTestImplementation(TestLibraries.coroutinesDebug)
integrationTestImplementation(kotlin("reflect", version = Versions.kotlin))
}
2 changes: 2 additions & 0 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ object Versions {
val imageComparison = "4.3.0"
val dokka = kotlin
val pdbank = "0.9.1"
val coroutinesDebug = "1.4.0"
}

object BuildPlugins {
Expand All @@ -32,4 +33,5 @@ object TestLibraries {
val assertk = "com.willowtreeapps.assertk:assertk:${Versions.assertk}"
val junit = "junit:junit:${Versions.junit}"
val imageComparison = "com.github.romankh3:image-comparison:${Versions.imageComparison}"
val coroutinesDebug = "org.jetbrains.kotlinx:kotlinx-coroutines-debug:${Versions.coroutinesDebug}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.malinskiy.adam.request.device.ListDevicesRequest
import com.malinskiy.adam.request.misc.GetAdbServerVersionRequest
import com.malinskiy.adam.request.prop.GetSinglePropRequest
import com.malinskiy.adam.request.shell.v1.ShellCommandRequest
import com.malinskiy.adam.transport.NioSocketFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.runBlocking
Expand All @@ -46,8 +47,10 @@ class AdbDeviceRule(val deviceType: DeviceType = DeviceType.ANY, vararg val requ
lateinit var deviceSerial: String
lateinit var supportedFeatures: List<Feature>
lateinit var lineSeparator: String

val adb = AndroidDebugBridgeClientFactory().build()

val adb = AndroidDebugBridgeClientFactory().apply {
socketFactory = NioSocketFactory()
}.build()
val initTimeout = Duration.ofSeconds(10)

override fun apply(base: Statement, description: Description): Statement {
Expand Down
71 changes: 12 additions & 59 deletions src/main/kotlin/com/malinskiy/adam/AndroidDebugBridgeClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@

package com.malinskiy.adam

import com.malinskiy.adam.exception.RequestRejectedException
import com.malinskiy.adam.exception.RequestValidationException
import com.malinskiy.adam.extension.toAndroidChannel
import com.malinskiy.adam.interactor.DiscoverAdbSocketInteractor
import com.malinskiy.adam.log.AdamLogging
import com.malinskiy.adam.request.AsyncChannelRequest
import com.malinskiy.adam.request.ComplexRequest
import com.malinskiy.adam.request.MultiRequest
import com.malinskiy.adam.request.emu.EmulatorCommandRequest
import com.malinskiy.adam.request.misc.SetDeviceRequest
import com.malinskiy.adam.transport.AndroidReadChannel
import com.malinskiy.adam.transport.AndroidWriteChannel
import com.malinskiy.adam.transport.KtorSocketFactory
import com.malinskiy.adam.transport.SocketFactory
import io.ktor.network.sockets.*
import io.ktor.utils.io.*
import com.malinskiy.adam.transport.use
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
Expand All @@ -57,22 +52,10 @@ class AndroidDebugBridgeClient(
throw RequestValidationException("Request $requestSimpleClassName did not pass validation: ${validationResponse.message}")
}
socketFactory.tcp(socketAddress).use { socket ->
val readChannel = socket.openReadChannel().toAndroidChannel()
var writeChannel: AndroidWriteChannel? = null
try {
writeChannel = socket.openWriteChannel(autoFlush = true).toAndroidChannel()
serial?.let {
processRequest(writeChannel, SetDeviceRequest(it).serialize(), readChannel)
}
return request.process(readChannel, writeChannel)
} finally {
try {
writeChannel?.close()
readChannel.cancel()
} catch (e: Exception) {
log.debug(e) { "Exception during cleanup. Ignoring" }
}
serial?.let {
SetDeviceRequest(it).handshake(socket)
}
return request.process(socket)
}
}

Expand All @@ -84,41 +67,35 @@ class AndroidDebugBridgeClient(
}
return scope.produce {
socketFactory.tcp(socketAddress).use { socket ->
val readChannel = socket.openReadChannel().toAndroidChannel()
var writeChannel: AndroidWriteChannel? = null
var backChannel = request.channel

try {
writeChannel = socket.openWriteChannel(autoFlush = true).toAndroidChannel()
serial?.let {
processRequest(writeChannel, SetDeviceRequest(it).serialize(), readChannel)
SetDeviceRequest(it).handshake(socket)
}

request.handshake(readChannel, writeChannel)
request.handshake(socket)

while (true) {
if (isClosedForSend ||
readChannel.isClosedForRead ||
writeChannel.isClosedForWrite ||
socket.isClosedForRead ||
socket.isClosedForWrite ||
request.channel?.isClosedForReceive == true
) {
break
}
request.readElement(readChannel, writeChannel)?.let {
send(it)
}
val finished = request.readElement(socket, this)
if (finished) break

backChannel?.poll()?.let {
request.writeElement(it, readChannel, writeChannel)
request.writeElement(it, socket)
}
}
} finally {
try {
withContext(NonCancellable) {
request.close(channel)
}
writeChannel?.close()
readChannel.cancel()
} catch (e: Exception) {
log.debug(e) { "Exception during cleanup. Ignoring" }
}
Expand All @@ -129,18 +106,7 @@ class AndroidDebugBridgeClient(

suspend fun execute(request: EmulatorCommandRequest): String {
socketFactory.tcp(request.address).use { socket ->
var readChannel: ByteReadChannel? = null
var writeChannel: ByteWriteChannel? = null

try {
readChannel = socket.openReadChannel()
writeChannel = socket.openWriteChannel(true)

return request.process(readChannel, writeChannel)
} finally {
readChannel?.cancel()
writeChannel?.close()
}
return request.process(socket)
}
}

Expand All @@ -154,19 +120,6 @@ class AndroidDebugBridgeClient(
return request.execute(this, serial)
}

private suspend fun processRequest(
writeChannel: AndroidWriteChannel,
request: ByteArray,
readChannel: AndroidReadChannel
) {
writeChannel.write(request)
val response = readChannel.read()
if (!response.okay) {
log.warn { "adb server rejected command ${String(request, Const.DEFAULT_TRANSPORT_ENCODING)}" }
throw RequestRejectedException(response.message ?: "no message received")
}
}

companion object {
private val log = AdamLogging.logger {}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/com/malinskiy/adam/Const.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package com.malinskiy.adam
object Const {
const val MAX_REMOTE_PATH_LENGTH = 1024
const val DEFAULT_BUFFER_SIZE = 1024
const val READ_DELAY = 100L
val DEFAULT_TRANSPORT_ENCODING = Charsets.UTF_8
const val DEFAULT_ADB_HOST = "127.0.0.1"
const val DEFAULT_ADB_PORT = 5037

const val SERVER_PORT_ENV_VAR = "ANDROID_ADB_SERVER_PORT"
const val MAX_PACKET_LENGTH = 16384
const val MAX_FILE_PACKET_LENGTH = 64 * 1024
const val KTOR_INTERNAL_BUFFER_LENGTH = 4088

const val MAX_PROTOBUF_LOGCAT_LENGTH = 10_000
const val MAX_PROTOBUF_PACKET_LENGTH = 10 * 1024 * 1024L //10Mb
Expand Down
48 changes: 5 additions & 43 deletions src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package com.malinskiy.adam.extension

import com.malinskiy.adam.request.transform.ResponseTransformer
import com.malinskiy.adam.transport.Socket
import io.ktor.utils.io.*
import java.nio.ByteBuffer

suspend fun ByteReadChannel.copyTo(channel: ByteWriteChannel, buffer: ByteArray): Long {
suspend fun ByteReadChannel.copyTo(socket: Socket, buffer: ByteBuffer) = copyTo(socket, buffer.array())
suspend fun ByteReadChannel.copyTo(socket: Socket, buffer: ByteArray): Long {
var processed = 0L
loop@ while (true) {
val available = readAvailable(buffer, 0, buffer.size)
Expand All @@ -29,50 +30,11 @@ suspend fun ByteReadChannel.copyTo(channel: ByteWriteChannel, buffer: ByteArray)
break@loop
}
available > 0 -> {
channel.writeFully(buffer, 0, available)
socket.writeFully(buffer, 0, available)
processed += available
}
else -> continue@loop
}
}
return processed
}

/**
* Copies up to limit bytes into transformer using buffer. If limit is null - copy until EOF
*/
suspend fun <T> ByteReadChannel.copyTo(transformer: ResponseTransformer<T>, buffer: ByteArray, limit: Long? = null): Long {
var processed = 0L
loop@ while (true) {
val toRead = when {
limit == null || (limit - processed) > buffer.size -> {
buffer.size
}
else -> {
(limit - processed).toInt()
}
}
val available = readAvailable(buffer, 0, toRead)
when {
processed == limit -> break@loop
available < 0 -> {
break@loop
}
available > 0 -> {
transformer.process(buffer, 0, available)
processed += available
}
else -> continue@loop
}
}
return processed
}

/**
* TODO: rewrite
* Assumes buffer hasArray == true
*/
suspend fun ByteReadChannel.copyTo(channel: ByteWriteChannel, buffer: ByteBuffer) = copyTo(channel, buffer.array())
suspend fun <T> ByteReadChannel.copyTo(transformer: ResponseTransformer<T>, buffer: ByteBuffer) = copyTo(transformer, buffer.array())
suspend fun <T> ByteReadChannel.copyTo(transformer: ResponseTransformer<T>, buffer: ByteBuffer, limit: Long? = null) =
copyTo(transformer, buffer.array(), limit)
}
Loading

0 comments on commit ec31c88

Please sign in to comment.