From d96af3bfc902db2e609a0e046d06c25de2eba470 Mon Sep 17 00:00:00 2001 From: Anton Malinskiy Date: Wed, 26 Jun 2024 20:37:34 +1000 Subject: [PATCH] fix(core): wait for cancellation of jobs before terminating DeviceActor (#951) --- .../marathon/execution/device/DeviceActor.kt | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt b/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt index 7c01a7cca..d717840ab 100644 --- a/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt +++ b/core/src/main/kotlin/com/malinskiy/marathon/execution/device/DeviceActor.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.CompletionHandler import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.async +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -155,11 +156,12 @@ class DeviceActor( } } - private var job: Job? = null + private var initializeJob: Job? = null + private var executeJob: Job? = null private fun initialize() { logger.debug { "initialize ${device.serialNumber}" } - job = async { + initializeJob = async { try { withRetry(30, 10000) { if (isActive) { @@ -183,7 +185,7 @@ class DeviceActor( private fun executeBatch(batch: TestBatch, result: CompletableDeferred) { logger.debug { "executeBatch ${device.serialNumber}" } - job = async { + executeJob = async { try { device.execute(configuration, devicePoolId, batch, result) state.transition(DeviceEvent.Complete) @@ -195,28 +197,18 @@ class DeviceActor( state.transition(DeviceEvent.Terminate) } catch (e: TestBatchExecutionException) { logger.warn(e) { "Test batch failed execution" } - pool.send( - DevicePoolMessage.FromDevice.ReturnTestBatch( - device, - batch, - "Test batch failed execution:\n${e.stackTraceToString()}" - ) - ) + returnBatchAnd(batch, "Test batch failed execution:\n" + e.stackTraceToString()) {} state.transition(DeviceEvent.Complete) } catch (e: Throwable) { logger.error(e) { "Unknown vendor exception caught. Considering this a recoverable error" } - pool.send( - DevicePoolMessage.FromDevice.ReturnTestBatch( - device, batch, "Unknown vendor exception caught. \n" + - "${e.stackTraceToString()}" - ) - ) + returnBatchAnd(batch, "Unknown vendor exception caught:\n" + e.stackTraceToString()) {} state.transition(DeviceEvent.Complete) } } } private fun returnBatchAnd(batch: TestBatch, reason: String, completionHandler: CompletionHandler = {}): Job { + logger.debug { "Returning batch ${batch.id}. Reason: $reason" } return launch { withContext(NonCancellable) { pool.send(DevicePoolMessage.FromDevice.ReturnTestBatch(device, batch, reason)) @@ -228,8 +220,11 @@ class DeviceActor( private fun terminate() { logger.debug { "terminate ${device.serialNumber}" } - job?.cancel() - close() + launch { + initializeJob?.cancelAndJoin() + executeJob?.cancelAndJoin() + close() + } } }