Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,14 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
"before it is assumed that the executor failed.")
.intConf
.checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
"must be a positive integer")
.createWithDefault(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't this not 10 earlier ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I think 5 is a more sensible default than 10. @mccheah @foxish WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably keep it the same as before, as it's what we've been running with already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed back to 10.


val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)

private val executorLostReasonCheckMaxAttempts = conf.get(
KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)

private val allocatorRunnable = new Runnable {

// Maintains a map of executor id to count of checks performed to learn the loss reason
Expand Down Expand Up @@ -174,7 +177,7 @@ private[spark] class KubernetesClusterSchedulerBackend(

def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS) {
if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
deleteExecutorFromClusterAndDataStructures(executorId)
} else {
Expand Down Expand Up @@ -427,7 +430,4 @@ private[spark] class KubernetesClusterSchedulerBackend(

private object KubernetesClusterSchedulerBackend {
private val UNKNOWN_EXIT_CODE = -1
// Number of times we are allowed check for the loss reason for an executor before we give up
// and assume the executor failed for good, and attribute it to a framework fault.
val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
ENV_EXECUTOR_CORES -> "1",
ENV_EXECUTOR_MEMORY -> "1g",
ENV_APPLICATION_ID -> "dummy",
ENV_EXECUTOR_POD_IP -> null,
ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars
ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars

assert(executor.getSpec.getContainers.size() === 1)
assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
sparkConf
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
val executorLostReasonCheckMaxAttempts = sparkConf.get(
KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)

val scheduler = newSchedulerBackend()
scheduler.start()
Expand All @@ -346,7 +348,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
.apply(registerFirstExecutorMessage)

driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
1 to KubernetesClusterSchedulerBackend.MAX_EXECUTOR_LOST_REASON_CHECKS foreach { _ =>
1 to executorLostReasonCheckMaxAttempts foreach { _ =>
allocatorRunnable.getValue.run()
verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
}
Expand Down