Skip to content

Commit

Permalink
Rewriting YARNScalar
Browse files Browse the repository at this point in the history
- We now scale up in steps (until we reach the maxNodes capacity)
- We never scale down if we're already in maxNodes and demand > 0
  • Loading branch information
ashwanthkumar committed Apr 24, 2018
1 parent 4f988f0 commit 0050814
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 51 deletions.
56 changes: 34 additions & 22 deletions src/main/scala/in/ashwanthkumar/vamana2/apps/YARNScalar.scala
Original file line number Diff line number Diff line change
@@ -1,53 +1,65 @@
package in.ashwanthkumar.vamana2.apps

import com.typesafe.scalalogging.slf4j.Logger
import in.ashwanthkumar.vamana2.Vamana._
import in.ashwanthkumar.vamana2.core._
import org.joda.time.DateTime
import org.slf4j.LoggerFactory

import scala.concurrent.duration.Duration

case class CDemand(containersPending: Double) extends Demand {
def quantity = containersPending
case class CDemand(containersPending: Double, containersAllocated: Double, activeNodes: Double) extends Demand {
def quantity = {
val nodes = math.max(activeNodes, 1.0)
val containersPerNode = containersAllocated / nodes
math.ceil((containersPending + containersAllocated) / math.max(containersPerNode, 1.0))
}
}
case class CSupply(containersAllocated: Double) extends Supply {
def available = containersAllocated
case class CSupply(activeNodes: Double) extends Supply {
def available = activeNodes
}

/**
* Initial version of YARN Scalar that scales up the cluster to ClusterConfiguration.maxSize (if there's demand)
* or scales down the cluster to ClusterConfiguration.minSize.
*
*/
* Initial version of YARN Scalar that scales up the cluster to ClusterConfiguration.maxSize (if there's demand)
* or scales down the cluster to ClusterConfiguration.minSize.
*
*/
class YARNScalar extends Scalar[CDemand, CSupply] {
private val log = Logger(LoggerFactory.getLogger(getClass))
/**
* @inheritdoc
*/
* @inheritdoc
*/
override def requiredNodes(demand: CDemand, supply: CSupply, ctx: Context): Int = {
log.info(s"Demand found is $demand")
log.info(s"Supply found is $supply")

if (demand.quantity == 0.0) ctx.cluster.minNodes
else if (demand.quantity > supply.available || ctx.currentSize > ctx.cluster.maxNodes) ctx.cluster.maxNodes
else if (demand.quantity > supply.available) {
// don't scale down if we're already scaled up further than required (causes jobs to fail if they're using Datanodes)
// if we're just starting to scale up, scale up slowly
val maxOfCurrentAndRequired = math.max(ctx.currentSize, demand.quantity.toInt)
math.min(ctx.cluster.maxNodes, maxOfCurrentAndRequired)
}
else if (ctx.currentSize > ctx.cluster.maxNodes) ctx.cluster.maxNodes
else ctx.currentSize
}

/**
* @inheritdoc
*/
* @inheritdoc
*/
override def demand(metrics: List[Metric]): CDemand = {
val containersPending = containerMetrics(metrics, "containers_pending")
CDemand(containersPending.sum)
val containersAllocated = containerMetrics(metrics, "containers_allocated")
val activeNodes = containerMetrics(metrics, "active_nodes")
CDemand(
containersPending.headOption.getOrElse(0.0),
containersAllocated.headOption.getOrElse(0.0),
activeNodes.headOption.getOrElse(0.0)
)
}

/**
* @inheritdoc
*/
* @inheritdoc
*/
override def supply(metrics: List[Metric]): CSupply = {
val containersAllocated = containerMetrics(metrics, "containers_allocated")
CSupply(containersAllocated.sum)
val activeNodes = containerMetrics(metrics, "active_nodes")
CSupply(activeNodes.headOption.getOrElse(0.0))
}

private[apps] def containerMetrics(metrics: List[Metric], metricName: String): List[Double] = {
Expand Down
57 changes: 28 additions & 29 deletions src/test/scala/in/ashwanthkumar/vamana2/apps/YARNScalarTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,52 @@ class YARNScalarTest extends FlatSpec {
"YARNScalar" should "return maxNodes when there's demand and cluster is not in full capacity" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 1, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(1000.0), CSupply(100.0), mockContext)
val numberOfNodes = scalar.requiredNodes(CDemand(containersPending = 10000, containersAllocated = 10, activeNodes = 1), CSupply(activeNodes = 1), mockContext)
numberOfNodes should be(10)
}

it should "return the minNodes when there's no demand" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 5, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(0.0), CSupply(100.0), mockContext)
val numberOfNodes = scalar.requiredNodes(CDemand(containersPending = 0.0, containersAllocated = 0.0, activeNodes = 10), CSupply(activeNodes = 10.0), mockContext)
numberOfNodes should be(1)
}

it should "return the currentSize when there's demand and cluster is already at full capacity" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 10, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(100.0), CSupply(100.0), mockContext)
val numberOfNodes = scalar.requiredNodes(CDemand(containersPending = 100.0, containersAllocated = 100.0, activeNodes = 10.0), CSupply(activeNodes = 10.0), mockContext)
numberOfNodes should be(10)
}

it should "scale down when max nodes is reduced with demand being high" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 15, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(100.0), CSupply(100.0), mockContext)
numberOfNodes should be(10)
}
it should "scale down when max nodes is reduced with demand being high" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 15, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(containersPending = 0.0, containersAllocated = 100.0, activeNodes = 15.0), CSupply(activeNodes = 15.0), mockContext)
numberOfNodes should be(10)
}

it should "scale down when max nodes is reduced with demand being reduced as well" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 15, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(130.0), CSupply(150.0), mockContext)
numberOfNodes should be(10)
}
it should "scale down when max nodes is reduced with demand being reduced as well" in {
val scalar = new YARNScalar()
val mockContext = context(currentSize = 15, minNodes = 1, maxNodes = 10)
val numberOfNodes = scalar.requiredNodes(CDemand(containersPending = 0.0, containersAllocated = 10.0, activeNodes = 15.0), CSupply(activeNodes = 15.0), mockContext)
numberOfNodes should be(10)
}

it should "compute sum for demand from metrics" in {
val scalar = new YARNScalar()
val metrics = List(
Metric("containers_pending", List(Point(10.0, 1l), Point(15.0, 2l)))
)
scalar.demand(metrics) should be(CDemand(25.0))
}
"CDemand" should "return 0 when there's no pending or allocated" in {
val demand = CDemand(containersPending = 0.0, containersAllocated = 0.0, activeNodes = 1.0)
demand.quantity should be(0.0)
}

it should "pick the latest metrics for supply" in {
val scalar = new YARNScalar()
val metrics = List(
Metric("containers_allocated", List(Point(10.0, 1l), Point(15.0, 2l)))
)
scalar.supply(metrics) should be(CSupply(25.0))
}
it should "return pending by allocated when nodes = 1" in {
val demand = CDemand(containersPending = 0.0, containersAllocated = 5.0, activeNodes = 1.0)
demand.quantity should be(1.0)
}

it should "return allocated + pending / nodes when nodes > 1" in {
val demand = CDemand(containersPending = 19995.0, containersAllocated = 5.0, activeNodes = 1.0)
demand.quantity should be(4000.0)
}

def context(currentSize: Int, minNodes: Int, maxNodes: Int): Context = {
Context(currentSize,
Expand Down

0 comments on commit 0050814

Please sign in to comment.