Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Exit JVM if an error is thrown or the main loop exits #119

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ akka {
loglevel = "INFO"

logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"


logger-startup-timeout = 30s

http.client {

# The time period within which the TCP connecting process must be completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class KeepAliveFramework(settings: KeepAliveFrameWorkSettings, authorization: Op
import TaskState._
def activeTask(status: TaskStatus) = Seq(TASK_STAGING, TASK_STARTING, TASK_RUNNING).contains(status.getState)
// We're only interested in the bad task statuses for our pod
val failedTasks = taskStatuses.filterNot { case (id, status) => activeTask(status) }
val failedTasks = taskStatuses.filterNot { case (taskId, status) => activeTask(status) }
if (failedTasks.nonEmpty) {
logger.info(s"Restarting Pod $id")
val newId = KeepAlivePodSpecHelper.createNewIncarnationId(id)
Expand Down Expand Up @@ -110,6 +110,7 @@ class KeepAliveFramework(settings: KeepAliveFrameWorkSettings, authorization: Op
// We let the framework run "forever"
val result = Await.result(end, Duration.Inf)
logger.warn(s"Framework finished with $result")
Await.result(system.terminate(), Duration.Inf)
}

object KeepAliveFramework {
Expand Down Expand Up @@ -202,7 +203,14 @@ object KeepAliveFramework {
MesosClientSettings.fromConfig(conf).withMasters(Seq(mesosUrl.toURL)),
conf.getInt("keep-alive-framework.tasks-started"),
mesosRole)
new KeepAliveFramework(settings, provider)
try {
new KeepAliveFramework(settings, provider)
} catch {
case _: Throwable => {
Await.result(system.terminate(), Duration.Inf)
sys.exit(1)
}
}
case _ =>
sys.exit(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import java.util.UUID
import com.mesosphere.usi.core.models.resources.{ResourceType, ScalarRequirement}
import com.mesosphere.usi.core.models.template.{RunTemplate, SimpleRunTemplateFactory}
import com.mesosphere.usi.core.models.{PodId, RunningPodSpec}
import com.typesafe.scalalogging.StrictLogging

/**
* This is a helper object that generates pod specs and snapshots.
*/
object KeepAlivePodSpecHelper {
object KeepAlivePodSpecHelper extends StrictLogging {

val runSpec: RunTemplate = SimpleRunTemplateFactory(
resourceRequirements = List(ScalarRequirement(ResourceType.CPUS, 0.001), ScalarRequirement(ResourceType.MEM, 32)),
Expand All @@ -18,8 +19,9 @@ object KeepAlivePodSpecHelper {
)

def generatePodSpec(): RunningPodSpec = {
val podId = PodId(s"hello-world.${UUID.randomUUID()}.1")
val podId = PodId(s"hello-world_${UUID.randomUUID()}_1")

logger.info(s"Generating PodSpec for '${podId}'")
val podSpec = RunningPodSpec(id = podId, runSpec = runSpec)
podSpec
}
Expand All @@ -28,12 +30,13 @@ object KeepAlivePodSpecHelper {
(1 to numberOfPods).map(_ => generatePodSpec())(collection.breakOut)

def createNewIncarnationId(podId: PodId): PodId = {
val idAndIncarnation = """^(.+\..*)\.(\d+)$""".r
val idAndIncarnation = """^(.+_.*)_(\d+)$""".r
val (podIdWithoutIncarnation, currentIncarnation) = podId.value match {
case idAndIncarnation(id, inc) =>
id -> inc.toLong
case _ => throw new IllegalArgumentException(s"Failed to create new incarnation id for ${podId.value}")
}
PodId(s"$podIdWithoutIncarnation.${currentIncarnation + 1}")
PodId(s"${podIdWithoutIncarnation}_${currentIncarnation + 1}")
}

}
15 changes: 14 additions & 1 deletion mesos-client/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,17 @@ akka {
# outgoing connection might be closed if no call is issued to Mesos for a while
idle-timeout = infinite
}
}

stream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a library, I feel like we shouldn't rely on putting global configuration here for Akka. We should instead be adding the appropriate catch-all error catching (which will expose exceptions properly).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that does make sense, and I think you're correct there. I couldn't really figure out why we actually need that increased timeout, something slows down the whole startup sequence, and we should probably clean that up instead of adjusting the timeout.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@timcharper I've figured it out: The ConnectionPoolSettings creation does a reverse DNS lookup which is really slow on my macbook, which triggered the timeout. I've added a timeout check there and throw a reasonable error message now, so users of the lib will see something useful.


# Default materializer settings
materializer {

# Cleanup leaked publishers and subscribers when they are not used within a given
# deadline
subscription-timeout {
timeout = 25s
}
}
}
}