From 89a5e697d2b90fe742015e9057e5527a665799d9 Mon Sep 17 00:00:00 2001 From: Adrian Matwiejuk <73438286+wrzontek@users.noreply.github.com> Date: Tue, 16 Jul 2024 14:57:03 +0200 Subject: [PATCH] Stop deployment and reschedule actors less gracefully on reload (backport of #6314) (#6382) --- docs/Changelog.md | 5 + .../periodic/PeriodicDeploymentManager.scala | 23 +++-- .../engine/management/periodic/Utils.scala | 57 ++++-------- .../management/periodic/UtilsSpec.scala | 92 +++++++++++++++++-- 4 files changed, 118 insertions(+), 59 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index b4933d534c6..95af815a11e 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -1,5 +1,10 @@ # Changelog +1.16.1 +------------------------- + +* [#6382](https://github.com/TouK/nussknacker/pull/6382) Avoid timeout on model reload by stopping DeploymentActor and RescheduleFinishedActor non-gracefully. Instead, retry until success while creating new actors. + 1.16.0 (11 July 2024) ------------------------- diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala index 0f4040c4174..d4285ca080d 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/PeriodicDeploymentManager.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.deployment.{CustomActionDefinition, ExternalDeploymentId} import pl.touk.nussknacker.engine.management.FlinkConfig import pl.touk.nussknacker.engine.management.periodic.PeriodicProcessService.PeriodicProcessStatus -import pl.touk.nussknacker.engine.management.periodic.Utils.{gracefulStopActor, runSafely} +import pl.touk.nussknacker.engine.management.periodic.Utils.{createActorWithRetry, runSafely} import pl.touk.nussknacker.engine.management.periodic.db.{DbInitializer, SlickPeriodicProcessesRepository} import pl.touk.nussknacker.engine.management.periodic.flink.FlinkJarManager import pl.touk.nussknacker.engine.management.periodic.service.{ @@ -16,7 +16,7 @@ import pl.touk.nussknacker.engine.management.periodic.service.{ PeriodicProcessListenerFactory, ProcessConfigEnricherFactory } -import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies, newdeployment} +import pl.touk.nussknacker.engine.{BaseModelData, DeploymentManagerDependencies} import slick.jdbc import slick.jdbc.JdbcProfile @@ -60,21 +60,28 @@ object PeriodicDeploymentManager { clock, dependencies.actionService ) - val deploymentActor = dependencies.actorSystem.actorOf( + + // These actors have to be created with retries because they can initially fail to create due to taken names, + // if the actors (with the same names) created before reload aren't fully stopped (and their names freed) yet + val deploymentActor = createActorWithRetry( + s"periodic-${periodicBatchConfig.processingType}-deployer", DeploymentActor.props(service, periodicBatchConfig.deployInterval), - s"periodic-${periodicBatchConfig.processingType}-deployer" + dependencies.actorSystem ) - val rescheduleFinishedActor = dependencies.actorSystem.actorOf( + val rescheduleFinishedActor = createActorWithRetry( + s"periodic-${periodicBatchConfig.processingType}-rescheduler", RescheduleFinishedActor.props(service, periodicBatchConfig.rescheduleCheckInterval), - s"periodic-${periodicBatchConfig.processingType}-rescheduler" + dependencies.actorSystem ) val customActionsProvider = customActionsProviderFactory.create(scheduledProcessesRepository, service) val toClose = () => { runSafely(listener.close()) - runSafely(gracefulStopActor(deploymentActor, dependencies.actorSystem)) - runSafely(gracefulStopActor(rescheduleFinishedActor, dependencies.actorSystem)) + // deploymentActor and rescheduleFinishedActor just call methods from PeriodicProcessService on interval, + // they don't have any internal state, so stopping them non-gracefully is safe + runSafely(dependencies.actorSystem.stop(deploymentActor)) + runSafely(dependencies.actorSystem.stop(rescheduleFinishedActor)) runSafely(db.close()) } new PeriodicDeploymentManager( diff --git a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/Utils.scala b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/Utils.scala index 887a6d6306d..b3fe622b411 100644 --- a/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/Utils.scala +++ b/engine/flink/management/periodic/src/main/scala/pl/touk/nussknacker/engine/management/periodic/Utils.scala @@ -1,20 +1,16 @@ package pl.touk.nussknacker.engine.management.periodic -import akka.actor.{ActorNotFound, ActorPath, ActorRef, ActorSystem} -import akka.pattern.{AskTimeoutException, gracefulStop} +import akka.actor.{ActorRef, ActorSystem, Props} import com.typesafe.scalalogging.LazyLogging -import java.util.concurrent.TimeoutException import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, ExecutionContext, Future} -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} object Utils extends LazyLogging { - private val GracefulStopTimeout = 5 seconds - private val ActorResolutionTimeout = 5 seconds - private val ActorResolutionPause = 50 milliseconds - private val ActorResolutionRetries = 50 + private val ActorCreationPause = 50 milliseconds + private val ActorCreationRetries = 50 def runSafely(action: => Unit): Unit = try { action @@ -22,44 +18,23 @@ object Utils extends LazyLogging { case t: Throwable => logger.error("Error occurred, but skipping it", t) } - def gracefulStopActor(actorRef: ActorRef, actorSystem: ActorSystem): Unit = { - import actorSystem.dispatcher - logger.info(s"Gracefully stopping $actorRef") - - val gracefulStopFuture = for { - _ <- gracefulStop(actorRef, GracefulStopTimeout) - _ <- waitUntilActorNameIsFree( // this step is necessary because gracefulStop does not guarantee that the supervisor is notified of the name being freed - actorRef.path, - actorSystem - ) - } yield {} - - Await.result( - gracefulStopFuture, - GracefulStopTimeout + ActorResolutionRetries * (ActorResolutionTimeout + ActorResolutionPause) + (1 second) - ) - - logger.info(s"Gracefully stopped $actorRef") - } - - private def waitUntilActorNameIsFree(actorPath: ActorPath, actorSystem: ActorSystem)(implicit e: ExecutionContext) = { - retry - .Pause(ActorResolutionRetries, ActorResolutionPause) + def createActorWithRetry(actorName: String, props: Props, actorSystem: ActorSystem)( + implicit ec: ExecutionContext + ): ActorRef = { + val actorRefFuture = retry + .Pause(ActorCreationRetries, ActorCreationPause) .apply { () => - val actorResolutionFuture = - actorSystem - .actorSelection(actorPath) - .resolveOne(ActorResolutionTimeout) - .map(_ => Left(s"Actor path $actorPath is still taken")) - - actorResolutionFuture.recover { case _: ActorNotFound => - Right(s"Actor path $actorPath is free") + Future { + Try(actorSystem.actorOf(props, actorName)) } } .map { - case Left(_) => throw new IllegalStateException(s"Failed to free actor path $actorPath within allowed retries") - case Right(_) => () + case Failure(ex) => + throw new IllegalStateException(s"Failed to create actor '$actorName' within allowed retries: $ex") + case Success(a) => a } + + Await.result(actorRefFuture, ActorCreationRetries * ActorCreationPause + (1 second)) } } diff --git a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/UtilsSpec.scala b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/UtilsSpec.scala index 3fa9ea869e9..a6d53da9c16 100644 --- a/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/UtilsSpec.scala +++ b/engine/flink/management/periodic/src/test/scala/pl/touk/nussknacker/engine/management/periodic/UtilsSpec.scala @@ -2,32 +2,104 @@ package pl.touk.nussknacker.engine.management.periodic import akka.actor.{Actor, ActorSystem, Props} import akka.testkit.TestKit +import com.typesafe.scalalogging.LazyLogging import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import pl.touk.nussknacker.engine.management.periodic.Utils.createActorWithRetry -class UtilsSpec extends TestKit(ActorSystem("UtilsSpec")) with AnyWordSpecLike with Matchers with BeforeAndAfterAll { +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +class UtilsSpec + extends TestKit(ActorSystem("UtilsSpec")) + with AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with LazyLogging { override def afterAll(): Unit = { TestKit.shutdownActorSystem(system) } + class TestActor extends Actor { + override def receive: Receive = { case _ => () } + } + "Utils" should { - "gracefully stop actor and free actor path" in { - class TestActor extends Actor { - override def receive: Receive = { case _ => - () + "create an actor if it's name is free" in { + import system.dispatcher + + val actorName = "actorName1" // unique name in each test so that they don't interfere with each other + + createActorWithRetry(actorName, Props(new TestActor), system) + } + + "create an actor if it's name isn't free but is freed before retrying gives up - idle actor" in { + import system.dispatcher + + val actorName = "actorName2" // unique name in each test so that they don't interfere with each other + + val actorRef = createActorWithRetry(actorName, Props(new TestActor), system) + + val futureA = Future { + createActorWithRetry(actorName, Props(new TestActor), system) + } + + val futureB = Future { + Thread.sleep(1000) + system.stop(actorRef) + } + + Await.result(Future.sequence(Seq(futureA, futureB)), Duration.Inf) + } + + "create an actor if it's name isn't free but is freed before retrying gives up - busy actor" in { + class BusyTestActor extends Actor { + override def receive: Receive = { case msg => + logger.info(s"Sleeping on the job '$msg' ...") + Thread.sleep(1000) } } - val actorName = "actorName" - val actorRef = system.actorOf(Props(new TestActor), actorName) + import system.dispatcher + + val actorName = "actorName3" // unique name in each test so that they don't interfere with each other + + val actorRef = createActorWithRetry(actorName, Props(new BusyTestActor), system) + + var messageCounter = 0 + while (messageCounter < 1000) { + actorRef ! s"message number $messageCounter" + messageCounter += 1 + } + + val futureA = Future { + createActorWithRetry(actorName, Props(new BusyTestActor), system) + } + + val futureB = Future { + Thread.sleep(1000) + + // if this was gracefulStop, it would take too long to stop the actor, as it would continue processing it's messages + system.stop(actorRef) + } + + Await.result(Future.sequence(Seq(futureA, futureB)), Duration.Inf) + } + + "fail to create an actor if it's name isn't freed" in { + import system.dispatcher + + val actorName = "actorName4" // unique name in each test so that they don't interfere with each other + + createActorWithRetry(actorName, Props(new TestActor), system) - Utils.gracefulStopActor(actorRef, system) + (the[IllegalStateException] thrownBy { + createActorWithRetry(actorName, Props(new TestActor), system) + }).getMessage shouldEqual s"Failed to create actor '$actorName' within allowed retries: akka.actor.InvalidActorNameException: actor name [$actorName] is not unique!" - // with normal system.stop(actorRef) or akka.pattern.gracefulStop this throws "actor name is not unique" - system.actorOf(Props(new TestActor), actorName) } "ignore exceptions inside runSafely block" in {