Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.9.0-RC test task:compileDemoDebugUnitTestKotlin error Suspension functions can only be called within coroutine body. #4179

Open
akingyin1987 opened this issue Jul 16, 2024 · 1 comment
Labels

Comments

@akingyin1987
Copy link

I won't encounter this issue when using version 1.8.1

` @test
fun concurrentExecution1() = runTest {

    println("当前线程=${Thread.currentThread().name}")
    val queue = ArrayBlockingQueue<doOnBackground<String>>(150)
    val scope = CoroutineScope(testScheduler)
    for (index in 1..150) {
        queue.add {
            getDataDelayedTime(RandomUtil.getRandomNum(1, 2000).toLong(), index)
        }
    }
    scope.launch {
        CoroutineRunBlockUtil.asynchronousBatchProcessing(
            coroutineScope = scope,
            jobList = queue,
            onErrorIsIgnore = {false},
            onComplete = {
                println("完成")
            },
            onProgress = {total, progress, error ->
                println("total=$total, progress=$progress, error=$error")
            },
            togetherSize = 5
        ).let {
            println("获取到最后结果")
        }
    }

}

` private suspend fun getDataDelayedTime(delayedTime: Long, index: Int): Result {
println("第${index} 睡眠时长=${delayedTime}")
delay(delayedTime)
println(
"第${index}项完成->${DateUtil.testNowTimeString},thread.name=${Thread.currentThread().name},thread.id=${Thread.currentThread().id},返回结果=${
index.mod(
7
) != 0
}"
)

    return  Result.Success("")
   // return if (index.mod(7) != 0) Result.Success("") else Result.Failure(Exception())
}

`

@akingyin1987
Copy link
Author

` suspend fun asynchronousBatchProcessing(
coroutineScope: CoroutineScope,
jobList: ArrayBlockingQueue<doOnBackground>,
onErrorIsIgnore: (Result) -> Boolean = { false },
togetherSize: Int = 5,
onProgress: (total: Int, progress: Int, error: Int) -> Unit,
onComplete: (Boolean) -> Unit,
): kotlin.Result {

    /** 当前等待执行的任务 */
    val currentWaitDoingTaskList = ArrayBlockingQueue<Deferred<Result<T>>>(togetherSize)

    /** 正在运行的任务 */
    val currentDoingTaskList = ArrayBlockingQueue<Deferred<Result<T>>>(togetherSize)
    /** false 正常执行 */
    val currentTaskState = AtomicBoolean(false)

    /** 当前任务总数 */
    val taskTotal: Int = jobList.size

    /** 当前完成数  */
    val currentCompleteTotal = AtomicInteger(0)

    /** 当前失败数 */
    val currentErrorTotal = AtomicInteger(0)

    var currentJob: Job? = null


    return suspendCancellableCoroutine { continuation ->

        if (coroutineScope.isActive) {
            currentJob = coroutineScope.launch(Dispatchers.IO) {
                //当前任务正在进行中

                while (coroutineScope.isActive && jobList.isNotEmpty() && !currentTaskState.get()) {
                    println("开始进入循环准备执行---->>>>>>,${currentWaitDoingTaskList.size}")
                    if (taskTotal == currentCompleteTotal.get()) {
                        //任务完成
                        currentTaskState.set(true)

                    }
                    if (currentWaitDoingTaskList.size < togetherSize && currentDoingTaskList.size < togetherSize) {
                        jobList.poll()?.let { job ->
                            async {
                                println("开启异步执行=${DateUtil.testNowTimeString}")
                                job.invoke()
                            }.let { task ->
                                if (coroutineScope.isActive) {
                                    //待执行的任务列表
                                    currentWaitDoingTaskList.add(task)
                                    if(currentDoingTaskList.size < togetherSize){
                                        currentDoingTaskList.add(task)
                                    }

                                   // task.start()
                                }


                            }
                        }
                    }
                    if (currentWaitDoingTaskList.size == togetherSize || (jobList.isEmpty() && currentWaitDoingTaskList.size< togetherSize)) {

                        //当前执行中 的数=最大执行数 或者没有待处理的任务且正在进行的任务非空
                        while (currentWaitDoingTaskList.isNotEmpty() && !currentTaskState.get() && currentDoingTaskList.size<= togetherSize) {
                            currentWaitDoingTaskList.poll()?.let {
                                //未执行前加入到正在等待的任务
                                val result = it.await()
                                //从正在等待的任务内移出
                                currentDoingTaskList.remove(it)
                                println("移出已完成的任务=${currentDoingTaskList.size},${DateUtil.testNowTimeString}")

                                currentCompleteTotal.incrementAndGet()
                                if (taskTotal == currentCompleteTotal.get()) {
                                    //任务完成
                                    currentTaskState.set(true)

                                }
                                if (result is Result.Failure) {
                                    println("当前为失败-----》")
                                    //当前为失败
                                    currentErrorTotal.incrementAndGet()
                                    withContext(Dispatchers.Main) {
                                        onProgress.invoke(
                                            taskTotal,
                                            currentCompleteTotal.get(),
                                            currentErrorTotal.get()
                                        )
                                    }

                                    if (!onErrorIsIgnore(result)) {
                                        //如果当前不忽略终止操作
                                        currentTaskState.set(true)
                                        withContext(Dispatchers.Main) {
                                            onComplete.invoke(false)
                                        }
                                        println("正在执行的任务总数:${currentDoingTaskList.size}")
                                        currentDoingTaskList.forEach { doJob ->
                                            println("终止执行=${doJob.isActive},${doJob.isCancelled},${doJob.isCompleted}")
                                            doJob.cancel()
                                        }
                                        //   continuation.resumeWithException(result.exception)
                                        continuation.resume(kotlin.Result.failure(result.exception))



                                        currentJob?.cancel()
                                        return@launch
                                    }
                                } else {
                                    withContext(Dispatchers.Main) {
                                        onProgress.invoke(
                                            taskTotal,
                                            currentCompleteTotal.get(),
                                            currentErrorTotal.get()
                                        )
                                    }
                                }
                            }
                        }
                    }
                    println("循环底部----->>>>>>>>${jobList.size},${currentTaskState.get()}")
                }
                withContext(Dispatchers.Main) {
                    onComplete.invoke(true)
                }
                println("返回结果-----------》》》》》》》")
                continuation.resume(kotlin.Result.success("完成"))

            }


        }
    }


}`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant