Skip to content

Commit

Permalink
fix(transport): send large streams via 64k chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Malinskiy committed Jan 14, 2021
1 parent 31a5cc4 commit 8426f87
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
23 changes: 23 additions & 0 deletions src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,29 @@ suspend fun <T> ByteReadChannel.copyTo(transformer: ResponseTransformer<T>, buff
return processed
}

/**
* Copies up to limit bytes into transformer using buffer. If limit is null - copy until EOF
*/
suspend fun ByteReadChannel.copyTo(buffer: ByteArray, offset: Int, limit: Int): Int {
var processed = 0
loop@ while (true) {
val toRead = (buffer.size - offset) - processed
val available = readAvailable(buffer, offset + processed, toRead)
when {
processed == limit -> break@loop
available < 0 && processed != 0 -> {
break@loop
}
available < 0 -> return available
available > 0 -> {
processed += available
}
else -> continue@loop
}
}
return processed
}

/**
* TODO: rewrite
* Assumes buffer hasArray == true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import io.ktor.utils.io.ByteReadChannel
class ExecInRequest(private val cmd: String, private val channel: ByteReadChannel) : ComplexRequest<Unit>() {
override suspend fun readElement(readChannel: AndroidReadChannel, writeChannel: AndroidWriteChannel) {
val buffer = ByteArray(Const.MAX_FILE_PACKET_LENGTH)
channel.copyTo(writeChannel, buffer)
while (true) {
val available = channel.copyTo(buffer, 0, buffer.size)
when {
available > 0 -> writeChannel.writeFully(buffer, 0, available)
else -> break
}
}
//Have to poll
readChannel.readStatus()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ class StreamingPackageInstallRequest(
var fileChannel: ByteReadChannel? = null
try {
val fileChannel = pkg.readChannel(coroutineContext = coroutineContext)
fileChannel.copyTo(writeChannel, buffer)
while (true) {
val available = fileChannel.copyTo(buffer, 0, buffer.size)
when {
available > 0 -> writeChannel.writeFully(buffer, 0, available)
else -> break
}
}
} finally {
fileChannel?.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.malinskiy.adam.request.sync.base

import com.malinskiy.adam.Const
import com.malinskiy.adam.exception.PushFailedException
import com.malinskiy.adam.extension.copyTo
import com.malinskiy.adam.extension.toByteArray
import com.malinskiy.adam.request.AsyncChannelRequest
import com.malinskiy.adam.request.ValidationResponse
Expand All @@ -44,7 +45,7 @@ abstract class BasePushFileRequest(
get() = mode.toInt(8) and "0777".toInt(8)

override suspend fun readElement(readChannel: AndroidReadChannel, writeChannel: AndroidWriteChannel): Double? {
val available = fileReadChannel.readAvailable(buffer, 8, Const.MAX_FILE_PACKET_LENGTH)
val available = fileReadChannel.copyTo(buffer, 8, Const.MAX_FILE_PACKET_LENGTH)
return when {
available < 0 -> {
Const.Message.DONE.copyInto(buffer)
Expand All @@ -63,11 +64,14 @@ abstract class BasePushFileRequest(
available > 0 -> {
Const.Message.DATA.copyInto(buffer)
available.toByteArray().reversedArray().copyInto(buffer, destinationOffset = 4)
/**
* USB devices are very picky about the size of the DATA buffer. Using the adb's default
*/
writeChannel.writeFully(buffer, 0, available + 8)
currentPosition += available
currentPosition.toDouble() / totalBytes
}
else -> currentPosition.toDouble() / totalBytes
else -> null
}
}

Expand Down

0 comments on commit 8426f87

Please sign in to comment.