From bc0823d175de5dceb71fadac75fffdbdeb407422 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Thu, 18 Apr 2024 16:50:42 +1000 Subject: [PATCH] fix(core): verbose coroutinecontext closures --- .../kotlin/com/malinskiy/marathon/actor/Actor.kt | 7 +++++-- .../marathon/coroutines/ThreadPoolDispatcher.kt | 13 +++++++++++++ .../malinskiy/marathon/execution/DevicePoolActor.kt | 4 +--- .../marathon/execution/device/DeviceActor.kt | 4 +--- .../marathon/execution/queue/QueueActor.kt | 4 +--- .../marathon/android/adam/AdamAndroidDevice.kt | 7 ++++--- .../marathon/android/adam/AdamDeviceProvider.kt | 7 +++++-- .../marathon/android/adam/LogcatManager.kt | 8 +++++--- .../executor/listeners/video/ScreenRecorder.kt | 2 +- .../listeners/video/ScreenRecorderStopper.kt | 1 + 10 files changed, 37 insertions(+), 20 deletions(-) create mode 100644 core/src/main/kotlin/com/malinskiy/marathon/coroutines/ThreadPoolDispatcher.kt diff --git a/core/src/main/kotlin/com/malinskiy/marathon/actor/Actor.kt b/core/src/main/kotlin/com/malinskiy/marathon/actor/Actor.kt index ed515b514..712ea4e9d 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/actor/Actor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/actor/Actor.kt @@ -1,5 +1,6 @@ package com.malinskiy.marathon.actor +import com.malinskiy.marathon.coroutines.newCoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job @@ -9,16 +10,18 @@ import kotlinx.coroutines.channels.ChannelResult import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.channels.actor import kotlinx.coroutines.selects.SelectClause2 +import mu.KLogger import kotlin.coroutines.CoroutineContext abstract class Actor( parent: Job? = null, - val context: CoroutineContext + val context: CoroutineContext, + protected val logger: KLogger, ) : SendChannel, CoroutineScope { protected abstract suspend fun receive(msg: T) final override val coroutineContext: CoroutineContext - get() = context + actorJob + get() = context + actorJob + newCoroutineExceptionHandler(logger) private val actorJob = Job(parent) diff --git a/core/src/main/kotlin/com/malinskiy/marathon/coroutines/ThreadPoolDispatcher.kt b/core/src/main/kotlin/com/malinskiy/marathon/coroutines/ThreadPoolDispatcher.kt new file mode 100644 index 000000000..a12652b6e --- /dev/null +++ b/core/src/main/kotlin/com/malinskiy/marathon/coroutines/ThreadPoolDispatcher.kt @@ -0,0 +1,13 @@ +package com.malinskiy.marathon.coroutines + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineExceptionHandler +import mu.KLogger + +fun newCoroutineExceptionHandler(logger: KLogger) = CoroutineExceptionHandler { context, exception -> + when (exception) { + null -> logger.debug { "CoroutineContext finished" } + is CancellationException -> logger.debug(exception) { "CoroutineContext cancelled" } + else -> logger.error(exception) { "CoroutineContext closing due to unrecoverable error" } + } +} diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt index 06bebdbef..cccfdb718 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/DevicePoolActor.kt @@ -34,9 +34,7 @@ class DevicePoolActor( context: CoroutineContext, testBundleIdentifier: TestBundleIdentifier?, ) : - Actor(parent = parent, context = context) { - - private val logger = MarathonLogging.logger("DevicePoolActor[${poolId.name}]") + Actor(parent = parent, context = context, logger = MarathonLogging.logger("DevicePoolActor[${poolId.name}]")) { override suspend fun receive(msg: DevicePoolMessage) { when (msg) { diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt index 43a135f99..91e1cf0b5 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt @@ -31,7 +31,7 @@ class DeviceActor( parent: Job, context: CoroutineContext ) : - Actor(parent = parent, context = context) { + Actor(parent = parent, context = context, logger = MarathonLogging.logger("DevicePool[${devicePoolId.name}]_DeviceActor[${device.serialNumber}]")) { private val state = StateMachine.create { initialState(DeviceState.Connected) @@ -124,8 +124,6 @@ class DeviceActor( } } } - private val logger = MarathonLogging.logger("DevicePool[${devicePoolId.name}]_DeviceActor[${device.serialNumber}]") - val isAvailable: Boolean get() { return !isClosedForSend && state.state == DeviceState.Ready diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt index fdbc51aa8..88375f035 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/queue/QueueActor.kt @@ -39,9 +39,7 @@ class QueueActor( poolJob: Job, coroutineContext: CoroutineContext ) : - Actor(parent = poolJob, context = coroutineContext) { - - private val logger = MarathonLogging.logger("QueueActor[$poolId]") + Actor(parent = poolJob, context = coroutineContext, logger = MarathonLogging.logger("QueueActor[$poolId]")) { private val sortingStrategy = configuration.sortingStrategy.toSortingStrategy() diff --git a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamAndroidDevice.kt b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamAndroidDevice.kt index bb9cd3f70..dc94ef6d0 100644 --- a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamAndroidDevice.kt +++ b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamAndroidDevice.kt @@ -48,6 +48,7 @@ import com.malinskiy.marathon.config.Configuration import com.malinskiy.marathon.config.vendor.VendorConfiguration import com.malinskiy.marathon.config.vendor.android.SerialStrategy import com.malinskiy.marathon.config.vendor.android.VideoConfiguration +import com.malinskiy.marathon.coroutines.newCoroutineExceptionHandler import com.malinskiy.marathon.device.DevicePoolId import com.malinskiy.marathon.device.NetworkState import com.malinskiy.marathon.device.file.measureFileTransfer @@ -71,7 +72,6 @@ import java.awt.image.BufferedImage import java.io.File import java.time.Duration import java.util.concurrent.CopyOnWriteArrayList -import kotlin.coroutines.CoroutineContext import com.malinskiy.marathon.android.model.ShellCommandResult as MarathonShellCommandResult class AdamAndroidDevice( @@ -117,7 +117,8 @@ class AdamAndroidDevice( private val dispatcher by lazy { newFixedThreadPoolContext(2, "AndroidDevice - execution - ${client.host.hostAddress}:${client.port}:${adbSerial}") } - override val coroutineContext: CoroutineContext = dispatcher + + override val coroutineContext = dispatcher + newCoroutineExceptionHandler(logger) private var props: Map = emptyMap() @@ -373,7 +374,7 @@ class AdamAndroidDevice( } } } catch (e: CancellationException) { - //Ignore + logger.warn(e) { "screenrecord start was interrupted" } } catch (e: Exception) { logger.error("Unable to start screenrecord", e) } diff --git a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamDeviceProvider.kt b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamDeviceProvider.kt index 1ebc072c0..1ad0ddf6e 100644 --- a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamDeviceProvider.kt +++ b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/AdamDeviceProvider.kt @@ -13,6 +13,7 @@ import com.malinskiy.marathon.analytics.internal.pub.Track import com.malinskiy.marathon.android.AndroidTestBundleIdentifier import com.malinskiy.marathon.config.Configuration import com.malinskiy.marathon.config.vendor.VendorConfiguration +import com.malinskiy.marathon.coroutines.newCoroutineExceptionHandler import com.malinskiy.marathon.device.DeviceProvider import com.malinskiy.marathon.exceptions.NoDevicesException import com.malinskiy.marathon.log.MarathonLogging @@ -36,7 +37,6 @@ import kotlinx.coroutines.withTimeoutOrNull import java.net.ConnectException import java.net.InetAddress import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.CoroutineContext const val DEFAULT_WAIT_FOR_DEVICES_SLEEP_TIME = 500L @@ -51,7 +51,10 @@ class AdamDeviceProvider( private val logger = MarathonLogging.logger("AdamDeviceProvider") private val channel: Channel = unboundedChannel() - override val coroutineContext: CoroutineContext by lazy { newFixedThreadPoolContext(1, "DeviceMonitor") } + + private val dispatcher = newFixedThreadPoolContext(1, "DeviceMonitor") + override val coroutineContext = dispatcher + newCoroutineExceptionHandler(logger) + private val setupSupervisor = SupervisorJob() private var providerJob: Job? = null diff --git a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/LogcatManager.kt b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/LogcatManager.kt index 696d3890b..7270a7055 100644 --- a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/LogcatManager.kt +++ b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/adam/LogcatManager.kt @@ -3,18 +3,20 @@ package com.malinskiy.marathon.android.adam import com.malinskiy.adam.request.logcat.ChanneledLogcatRequest import com.malinskiy.adam.request.logcat.LogcatReadMode import com.malinskiy.marathon.android.adam.log.LogCatMessageParser +import com.malinskiy.marathon.coroutines.newCoroutineExceptionHandler +import com.malinskiy.marathon.log.MarathonLogging import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import kotlinx.coroutines.supervisorScope import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.CoroutineContext class LogcatManager : CoroutineScope { + private val logger = MarathonLogging.logger {} + private val dispatcher = newFixedThreadPoolContext(4, "LogcatManager") - override val coroutineContext: CoroutineContext - get() = dispatcher + override val coroutineContext = dispatcher + newCoroutineExceptionHandler(logger) private val devices: ConcurrentHashMap = ConcurrentHashMap() diff --git a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorder.kt b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorder.kt index 261b175f1..08491b6c5 100644 --- a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorder.kt +++ b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorder.kt @@ -16,7 +16,7 @@ internal class ScreenRecorder( try { startRecordingTestVideo() } catch (e: CancellationException) { - //Ignore + logger.warn(e) { "screenrecord start was interrupted" } } catch (e: Exception) { logger.error("Something went wrong while screen recording", e) } diff --git a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorderStopper.kt b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorderStopper.kt index d5ee8e54d..6f2c54af5 100644 --- a/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorderStopper.kt +++ b/vendor/vendor-android/src/main/kotlin/com/malinskiy/marathon/android/executor/listeners/video/ScreenRecorderStopper.kt @@ -51,6 +51,7 @@ internal class ScreenRecorderStopper(private val device: AndroidDevice) { try { Thread.sleep(PAUSE_BETWEEN_RECORDER_PROCESS_KILL.toLong()) } catch (ignored: InterruptedException) { + logger.warn(ignored) { "screenrecord stop was interrupted" } } }