Skip to content

Commit

Permalink
Merge branch 'fix/transport-fragmentation' into feat/buffer-pooling
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/kotlin/com/malinskiy/adam/extension/ByteReadChannel.kt
#	src/main/kotlin/com/malinskiy/adam/request/misc/ExecInRequest.kt
#	src/main/kotlin/com/malinskiy/adam/request/pkg/StreamingPackageInstallRequest.kt
#	src/main/kotlin/com/malinskiy/adam/request/sync/base/BasePushFileRequest.kt
  • Loading branch information
Anton Malinskiy committed Jan 21, 2021
2 parents 2ca535c + 8426f87 commit 9bb8811
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.malinskiy.adam.request.misc

import com.malinskiy.adam.Const
import com.malinskiy.adam.extension.copyTo
import com.malinskiy.adam.extension.readStatus
import com.malinskiy.adam.request.ComplexRequest
Expand All @@ -29,7 +30,14 @@ import io.ktor.utils.io.*
class ExecInRequest(private val cmd: String, private val channel: ByteReadChannel) : ComplexRequest<Unit>() {
override suspend fun readElement(socket: Socket) {
withMaxFilePacketBuffer {
channel.copyTo(socket, this)
val buffer = array()
while (true) {
val available = channel.copyTo(buffer, 0, buffer.size)
when {
available > 0 -> socket.writeFully(buffer, 0, available)
else -> break
}
}
//Have to poll
socket.readStatus()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,16 @@ class StreamingPackageInstallRequest(
override suspend fun readElement(socket: Socket): Boolean {
withMaxFilePacketBuffer {
var fileChannel: ByteReadChannel? = null
val buffer = array()
try {
val fileChannel = pkg.readChannel(coroutineContext = coroutineContext)
fileChannel.copyTo(socket, this)
while (true) {
val available = fileChannel.copyTo(buffer, 0, buffer.size)
when {
available > 0 -> socket.writeFully(buffer, 0, available)
else -> break
}
}
} finally {
fileChannel?.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.malinskiy.adam.request.sync.base

import com.malinskiy.adam.Const
import com.malinskiy.adam.exception.PushFailedException
import com.malinskiy.adam.extension.copyTo
import com.malinskiy.adam.extension.readTransportResponse
import com.malinskiy.adam.extension.toByteArray
import com.malinskiy.adam.extension.write
Expand Down Expand Up @@ -47,7 +48,7 @@ abstract class BasePushFileRequest(
override suspend fun readElement(socket: Socket, sendChannel: SendChannel<Double>): Boolean {
withMaxPacketBuffer {
val data = array()
val available = fileReadChannel.readAvailable(data, 8, data.size - 8)
val available = fileReadChannel.copyTo(data, 8, Const.MAX_FILE_PACKET_LENGTH)
when {
available < 0 -> {
Const.Message.DONE.copyInto(data)
Expand All @@ -66,13 +67,14 @@ abstract class BasePushFileRequest(
available > 0 -> {
Const.Message.DATA.copyInto(data)
available.toByteArray().reversedArray().copyInto(data, destinationOffset = 4)
/**
* USB devices are very picky about the size of the DATA buffer. Using the adb's default
*/
socket.writeFully(data, 0, available + 8)
currentPosition += available
sendChannel.send(currentPosition.toDouble() / totalBytes)
}
else -> Unit
}
return false
else -> null
}
}

Expand Down

0 comments on commit 9bb8811

Please sign in to comment.