Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Nov 29, 2024
1 parent 3828d2e commit 00c58fe
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
metricsSnapshot.foreach(metric => sb.append(metric))
if (leftMetricsNum <= 0) {
logWarning(
s"The number of metrics exceed the output metrics strings capacity! Full metrics num: ${getAllMetricsNum}")
s"The number of metrics exceed the output metrics strings capacity! Full metrics num: $getAllMetricsNum")
}
sb.toString()
}
Expand All @@ -514,35 +514,42 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val appCount0Metrics = ArrayBuffer[String]()
for (m <- metricList) {
if (addNum >= leftNum) breakOut
var strMetrics = ""
var isApp = false
m match {
case c: NamedCounter =>
val counterMetric = getCounterMetrics(c)
strMetrics = getCounterMetrics(c)
if (c.isApp) {
isApp = true
if (c.counter.getCount > 0) {
appMetricsSnapshot += counterMetric
appMetricsSnapshot += strMetrics
} else {
appCount0Metrics += counterMetric
appCount0Metrics += strMetrics
}
} else metricsSnapshot += counterMetric
}
case g: NamedGauge[_] =>
val gaugeMetric = getGaugeMetrics(g)
strMetrics = getGaugeMetrics(g)
if (g.isApp) {
appMetricsSnapshot += gaugeMetric
} else metricsSnapshot += gaugeMetric
appMetricsSnapshot += strMetrics
isApp = true
}
case m: NamedMeter =>
metricsSnapshot += getMeterMetrics(m)
strMetrics = getMeterMetrics(m)
case h: NamedHistogram =>
metricsSnapshot += getHistogramMetrics(h)
strMetrics = getHistogramMetrics(h)
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
metricsSnapshot += getTimerMetrics(t)
strMetrics = getTimerMetrics(t)
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
metricsSnapshot += s.toString
strMetrics = s.toString
}
if (!isApp) {
metricsSnapshot += strMetrics
addNum = addNum + 1
}
addNum = addNum + 1
}
appMetricsSnapshot ++= appCount0Metrics
leftNum - addNum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,24 @@ import org.apache.celeborn.common.CelebornConf

class CelebornSourceSuite extends CelebornFunSuite {

test("test getMetrics with customized label") {
val conf = new CelebornConf()
createAbstractSourceAndCheck(conf, "", Role.MASTER)
createAbstractSourceAndCheck(conf, "", Role.WORKER)
}

def createAbstractSourceAndCheck(
def createAbstractSource(
conf: CelebornConf,
extraLabels: String,
role: String = "mock"): Unit = {
role: String = "mock"): (String, List[String]) = {
val mockSource = new AbstractSource(conf, role) {
override def sourceName: String = "mockSource"
}
val user1 = Map("user" -> "user1")
val user2 = Map("user" -> "user2")
val user3 = Map("user" -> "user3")
mockSource.addGauge("Gauge1") { () => 1000 }
mockSource.addGauge("Gauge2", user1) { () => 2000 }
mockSource.addCounter("Counter1")
mockSource.addCounter("Counter2", user2)
mockSource.addGauge("Gauge2", user1, true) { () => 2000 }
mockSource.addCounter("Counter1", Map.empty[String, String], true)
mockSource.addCounter("Counter2", user2, true)
// test operation with and without label
mockSource.incCounter("Counter1", 3000)
mockSource.incCounter("Counter2", 4000, user2)
mockSource.incCounter("Counter2", -4000, user2)
mockSource.addTimer("Timer1")
mockSource.addTimer("Timer2", user3)
// ditto
Expand All @@ -66,37 +61,83 @@ class CelebornSourceSuite extends CelebornFunSuite {
s"""metrics_Gauge2_Value{${extraLabelsStr}${instanceLabelStr}role="$role",user="user1"} 2000"""
val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 3000"""
val exp4 =
s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 4000"""
s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 0"""
val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1"""
val exp6 =
s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1"""

assert(res.contains(exp1))
assert(res.contains(exp2))
assert(res.contains(exp3))
assert(res.contains(exp4))
assert(res.contains(exp5))
assert(res.contains(exp6))
val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6)
(res, expList)
}

def checkMetricsRes(res: String, labelList: List[String]): Unit = {
labelList.foreach { exp =>
assert(res.contains(exp))
}
}

test("test getMetrics with customized label by conf") {
val conf = new CelebornConf()
val (resM, expsM) = createAbstractSource(conf, "", Role.MASTER)
checkMetricsRes(resM, expsM)
val (resW, expsW) = createAbstractSource(conf, "", Role.WORKER)
checkMetricsRes(resW, expsW)

// label's is normal
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3")
val extraLabels = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels)
val (res, exps) = createAbstractSource(conf, extraLabels)
checkMetricsRes(res, exps)

// labels' kv not correct
assertThrows[IllegalArgumentException] {
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=")
val extraLabels2 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels2)
val (res2, exps2) = createAbstractSource(conf, extraLabels2)
checkMetricsRes(res2, exps2)
}

// there are spaces in labels
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3 =v3 ")
val extraLabels3 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels3)
val (res3, exps3) = createAbstractSource(conf, extraLabels3)
checkMetricsRes(res3, exps3)
}

test("test getMetrics with full capacity and isAppEnable false") {
val conf = new CelebornConf()

// metrics won't contain appMetrics
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false")
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val (res1, exps1) = createAbstractSource(conf, "")
List[Int](0, 4, 5).foreach { i =>
assert(res1.contains(exps1(i)))
}
List[Int](1, 2, 3).foreach { i =>
assert(!res1.contains(exps1(i)))
}

// app metrics will fall behind when it reaches capacity
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "3")
val (res2, exps2) = createAbstractSource(conf, "")
List[Int](0, 4, 5).foreach { i =>
assert(res2.contains(exps2(i)))
}
List[Int](1, 2, 3).foreach { i =>
assert(!res2.contains(exps2(i)))
}

// app metrics count0 will fall behind
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "5")
val (res3, exps3) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 1, 2).foreach { i =>
assert(res3.contains(exps3(i)))
}
List[Int](3).foreach { i =>
assert(!res3.contains(exps3(i)))
}
}
}

0 comments on commit 00c58fe

Please sign in to comment.