Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/crash stop socket #1658

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import android.widget.TextView
import androidx.annotation.RequiresApi
import androidx.fragment.app.Fragment
import com.pedro.common.ConnectChecker
import com.pedro.library.base.recording.RecordController
import com.pedro.library.generic.GenericStream
import com.pedro.encoder.input.sources.video.Camera1Source
import com.pedro.encoder.input.sources.video.Camera2Source
import com.pedro.extrasources.CameraXSource
import com.pedro.library.base.recording.RecordController
import com.pedro.library.generic.GenericStream
import com.pedro.library.util.BitrateAdapter
import com.pedro.streamer.R
import com.pedro.streamer.utils.PathUtils
Expand Down
25 changes: 11 additions & 14 deletions common/src/main/java/com/pedro/common/socket/TcpStreamSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,19 @@ package com.pedro.common.socket
import io.ktor.network.selector.SelectorManager
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.readByte
import io.ktor.utils.io.readFully
import io.ktor.utils.io.readUTF8Line
import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withTimeout
import java.net.ConnectException

/**
* Created by pedro on 22/9/24.
*/
abstract class TcpStreamSocket: StreamSocket {

private val timeout = 5000L
protected val timeout = 5000L
protected var input: ByteReadChannel? = null
protected var output: ByteWriteChannel? = null
protected var selectorManager = SelectorManager(Dispatchers.IO)
Expand All @@ -45,16 +42,16 @@ abstract class TcpStreamSocket: StreamSocket {
output?.flush()
}

suspend fun write(b: Int) = withTimeout(timeout) {
suspend fun write(b: Int) {
output?.writeByte(b.toByte())
}

suspend fun write(b: ByteArray) = withTimeout(timeout) {
suspend fun write(b: ByteArray) {
output?.writeFully(b)
}

suspend fun write(b: ByteArray, offset: Int, size: Int) = withTimeout(timeout) {
output?.writeFully(b, offset, offset + size)
suspend fun write(b: ByteArray, offset: Int, size: Int) {
output?.writeFully(b, offset, size)
}

suspend fun writeUInt16(b: Int) {
Expand All @@ -73,13 +70,13 @@ abstract class TcpStreamSocket: StreamSocket {
writeUInt32(Integer.reverseBytes(b))
}

suspend fun write(string: String) = withTimeout(timeout) {
suspend fun write(string: String) {
output?.writeStringUtf8(string)
}

suspend fun read(): Int = withTimeout(timeout) {
suspend fun read(): Int {
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
input.readByte().toInt()
return input.readByte().toInt()
}

suspend fun readUInt16(): Int {
Expand All @@ -104,13 +101,13 @@ abstract class TcpStreamSocket: StreamSocket {
return Integer.reverseBytes(readUInt32())
}

suspend fun readUntil(b: ByteArray) = withTimeout(timeout) {
suspend fun readUntil(b: ByteArray) {
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
input.readFully(b)
}

suspend fun readLine(): String? = withTimeout(timeout) {
suspend fun readLine(): String? {
val input = input ?: throw ConnectException("Read with socket closed, broken pipe")
input.readUTF8Line()
return input.readUTF8Line()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ class TcpStreamSocketImp(

override suspend fun connect() {
selectorManager = SelectorManager(Dispatchers.IO)
val builder = aSocket(selectorManager).tcp().connect(remoteAddress = InetSocketAddress(host, port))
val builder = aSocket(selectorManager).tcp().connect(
remoteAddress = InetSocketAddress(host, port),
configure = {
if (!secured) socketTimeout = timeout
}
)
val socket = if (secured) {
builder.tls(Dispatchers.Default) {
trustManager = certificate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.isClosed
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.remaining
import io.ktor.utils.io.core.readBytes
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.io.readByteArray
import java.net.ConnectException
import java.net.InetAddress

Expand Down Expand Up @@ -78,7 +77,7 @@ class UdpStreamSocket(
val socket = socket ?: throw ConnectException("Read with socket closed, broken pipe")
val packet = socket.receive().packet
val length = packet.remaining.toInt()
return packet.readByteArray().sliceArray(0 until length)
return packet.readBytes().sliceArray(0 until length)
}

suspend fun writePacket(bytes: ByteArray) {
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ annotation = "1.9.1"
coroutines = "1.9.0"
junit = "4.13.2"
mockito = "5.4.0"
ktor = "3.0.1"
ktor = "2.3.13"
uvcandroid = "1.0.7"
media3 = "1.5.0"

Expand Down