Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1743] [FOLLOWUP]Introduces a configuration option to determine whether application metrics should be included #2964

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
def metricsAppEnabled: Boolean = get(METRICS_APP_ENABLED)

// //////////////////////////////////////////////////////
// Quota //
Expand Down Expand Up @@ -5340,6 +5341,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val METRICS_APP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.application.enabled")
.categories("metrics")
.doc("When false, the metrics of application won't return to reduce the num of metrics.")
.version("0.6.0")
.booleanConf
.createWithDefault(true)

val IDENTITY_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.identity.provider")
.withAlternative("celeborn.quota.identity.provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._

case class NamedCounter(name: String, counter: Counter, labels: Map[String, String])
case class NamedCounter(
name: String,
counter: Counter,
labels: Map[String, String],
isApp: Boolean = false)
extends MetricLabels

case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String])
case class NamedGauge[T](
name: String,
gauge: Gauge[T],
labels: Map[String, String],
isApp: Boolean = false)
extends MetricLabels

case class NamedMeter(name: String, meter: Meter, labels: Map[String, String])
Expand Down Expand Up @@ -76,6 +84,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel ++ instanceLabel
val staticLabelsString: String = MetricLabels.labelString(staticLabels)

val metricsAppEnabled: Boolean = conf.metricsAppEnabled

val applicationLabel = "applicationId"

val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]()
Expand All @@ -101,30 +111,42 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
def addGauge[T](
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
gauge: Gauge[T],
isAppMetrics: Boolean): Unit = {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels))
NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
}
}

def addGauge[T](
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
addGauge(name, labels, gauge, false)
}

def addGauge[T](
name: String,
labels: JMap[String, String],
gauge: Gauge[T]): Unit = {
addGauge(name, labels.asScala.toMap, gauge)
}

def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () => T): Unit = {
def addGauge[T](
name: String,
labels: Map[String, String] = Map.empty,
isAppMetrics: Boolean = false)(f: () => T): Unit = {
addGauge(
name,
labels,
metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)))
metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)),
isAppMetrics)
}

def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
Expand Down Expand Up @@ -176,11 +198,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])

def addCounter(name: String, labels: Map[String, String]): Unit = {
def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))
NamedCounter(
name,
metricRegistry.counter(metricNameWithLabel),
labels ++ staticLabels,
isAppMetrics))
}

def counters(): List[NamedCounter] = {
Expand Down Expand Up @@ -457,12 +483,19 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
override def getMetrics(): String = {
var leftMetricsNum = metricsCapacity
val sb = new mutable.StringBuilder
leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb)
val appMetricsSnapshot = ArrayBuffer[String]()
leftMetricsNum =
fillInnerMetricsSnapshot(getAndClearTimerMetrics(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb, appMetricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb, appMetricsSnapshot)
if (leftMetricsNum > 0 && metricsAppEnabled) {
appMetricsSnapshot.toList.take(leftMetricsNum).foreach { appMetrics =>
sb.append(appMetrics)
}
}
if (leftMetricsNum <= 0) {
logWarning(
zaynt4606 marked this conversation as resolved.
Show resolved Hide resolved
s"The number of metrics exceed the output metrics strings capacity! All metrics Num: $getAllMetricsNum")
Expand All @@ -473,30 +506,62 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
private def fillInnerMetricsSnapshot(
metricList: List[AnyRef],
leftNum: Int,
sb: mutable.StringBuilder): Int = {
sb: mutable.StringBuilder,
appMetricsSnapshot: ArrayBuffer[String]): Int = {
if (leftNum <= 0) {
return 0
}
val addList = metricList.take(leftNum)
addList.foreach {
case c: NamedCounter =>
sb.append(getCounterMetrics(c))
case g: NamedGauge[_] =>
sb.append(getGaugeMetrics(g))
case m: NamedMeter =>
sb.append(getMeterMetrics(m))
case h: NamedHistogram =>
sb.append(getHistogramMetrics(h))
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
sb.append(getTimerMetrics(t))
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
sb.append(s.toString)
var nonAppMetricsAddNum = 0
val appCount0Metrics = ArrayBuffer[String]()
for (m <- metricList if nonAppMetricsAddNum < leftNum) {
var strMetrics = ""
var isApp = false
var isCount0 = false
m match {
case c: NamedCounter =>
strMetrics = getCounterMetrics(c)
if (c.isApp) {
isApp = true
if (c.counter.getCount <= 0) {
isCount0 = true
}
}
case g: NamedGauge[_] =>
strMetrics = getGaugeMetrics(g)
if (g.isApp) {
isApp = true
}
case m: NamedMeter =>
strMetrics = getMeterMetrics(m)
case h: NamedHistogram =>
strMetrics = getHistogramMetrics(h)
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
strMetrics = getTimerMetrics(t)
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
strMetrics = s.toString
}
if (!isApp) {
sb.append(strMetrics)
nonAppMetricsAddNum = nonAppMetricsAddNum + 1
} else {
if (leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size > 0) {
if (isCount0) {
appCount0Metrics += strMetrics
} else {
appMetricsSnapshot += strMetrics
}
}
}
}
val leftAppMetricsNum = leftNum - nonAppMetricsAddNum - appMetricsSnapshot.size
if (appCount0Metrics.nonEmpty && leftAppMetricsNum > 0) {
appMetricsSnapshot ++= appCount0Metrics.toList.take(leftAppMetricsNum)
}
leftNum - addList.size
leftNum - nonAppMetricsAddNum
}

override def destroy(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,24 @@ import org.apache.celeborn.common.CelebornConf

class CelebornSourceSuite extends CelebornFunSuite {

test("test getMetrics with customized label") {
val conf = new CelebornConf()
createAbstractSourceAndCheck(conf, "", Role.MASTER)
createAbstractSourceAndCheck(conf, "", Role.WORKER)
}

def createAbstractSourceAndCheck(
def createAbstractSource(
conf: CelebornConf,
extraLabels: String,
role: String = "mock"): Unit = {
role: String = "mock"): (String, List[String]) = {
val mockSource = new AbstractSource(conf, role) {
override def sourceName: String = "mockSource"
}
val user1 = Map("user" -> "user1")
val user2 = Map("user" -> "user2")
val user3 = Map("user" -> "user3")
mockSource.addGauge("Gauge1") { () => 1000 }
mockSource.addGauge("Gauge2", user1) { () => 2000 }
mockSource.addCounter("Counter1")
mockSource.addCounter("Counter2", user2)
mockSource.addGauge("Gauge2", user1, true) { () => 2000 }
mockSource.addCounter("Counter1", Map.empty[String, String], true)
mockSource.addCounter("Counter2", user2, true)
// test operation with and without label
mockSource.incCounter("Counter1", 3000)
mockSource.incCounter("Counter2", 4000, user2)
mockSource.incCounter("Counter2", -4000, user2)
mockSource.addTimer("Timer1")
mockSource.addTimer("Timer2", user3)
// ditto
Expand All @@ -54,6 +49,8 @@ class CelebornSourceSuite extends CelebornFunSuite {
mockSource.stopTimer("Timer1", "key1")
mockSource.stopTimer("Timer2", "key2", user3)

mockSource.timerMetrics.add("testTimerMetricsMap")

val res = mockSource.getMetrics()
var extraLabelsStr = extraLabels
if (extraLabels.nonEmpty) {
Expand All @@ -66,37 +63,102 @@ class CelebornSourceSuite extends CelebornFunSuite {
s"""metrics_Gauge2_Value{${extraLabelsStr}${instanceLabelStr}role="$role",user="user1"} 2000"""
val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 3000"""
val exp4 =
s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 4000"""
s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 0"""
val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1"""
val exp6 =
s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1"""
val exp7 = "testTimerMetricsMap"

assert(res.contains(exp1))
assert(res.contains(exp2))
assert(res.contains(exp3))
assert(res.contains(exp4))
assert(res.contains(exp5))
assert(res.contains(exp6))
val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6, exp7)
(res, expList)
}

def checkMetricsRes(res: String, labelList: List[String]): Unit = {
labelList.foreach { exp =>
assert(res.contains(exp))
}
}

test("test getMetrics with customized label by conf") {
val conf = new CelebornConf()
val (resM, expsM) = createAbstractSource(conf, "", Role.MASTER)
checkMetricsRes(resM, expsM)
val (resW, expsW) = createAbstractSource(conf, "", Role.WORKER)
checkMetricsRes(resW, expsW)

// label's is normal
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3")
val extraLabels = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels)
val (res, exps) = createAbstractSource(conf, extraLabels)
checkMetricsRes(res, exps)

// labels' kv not correct
assertThrows[IllegalArgumentException] {
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=")
val extraLabels2 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels2)
val (res2, exps2) = createAbstractSource(conf, extraLabels2)
checkMetricsRes(res2, exps2)
}

// there are spaces in labels
conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3 =v3 ")
val extraLabels3 = """l1="v1",l2="v2",l3="v3""""
createAbstractSourceAndCheck(conf, extraLabels3)
val (res3, exps3) = createAbstractSource(conf, extraLabels3)
checkMetricsRes(res3, exps3)
}

test("test getMetrics with full capacity and isAppEnable false") {
val conf = new CelebornConf()

// metrics won't contain appMetrics
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false")
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val (res1, exps1) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6).foreach { i =>
assert(res1.contains(exps1(i)))
}
List[Int](1, 2, 3).foreach { i =>
assert(!res1.contains(exps1(i)))
}

// app metrics will fall behind when it reaches capacity
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "4")
val (res2, exps2) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6).foreach { i =>
assert(res2.contains(exps2(i)))
}
List[Int](1, 2, 3).foreach { i =>
assert(!res2.contains(exps2(i)))
}

// app metrics count0 will fall behind
conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true")
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val (res3, exps3) = createAbstractSource(conf, "")
List[Int](0, 4, 5, 6, 1, 2).foreach { i =>
assert(res3.contains(exps3(i)))
}
List[Int](3).foreach { i =>
assert(!res3.contains(exps3(i)))
}
}

test("test getAndClearTimerMetrics in timerMetrics") {
val conf = new CelebornConf()
conf.set(CelebornConf.METRICS_CAPACITY.key, "6")
val role = "mock"
val mockSource = new AbstractSource(conf, role) {
override def sourceName: String = "mockSource"
}
val exp1 = "testTimerMetrics1"
val exp2 = "testTimerMetrics2"
mockSource.timerMetrics.add(exp1)
val res1 = mockSource.getMetrics()
mockSource.timerMetrics.add(exp2)
val res2 = mockSource.getMetrics()

assert(res1.contains(exp1) && !res1.contains(exp2))
assert(res2.contains(exp2) && !res2.contains(exp1))
}
}
1 change: 1 addition & 0 deletions docs/configuration/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ license: |
<!--begin-include-->
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.metrics.application.enabled | true | false | When false, the metrics of application won't return to reduce the num of metrics. | 0.6.0 | |
| celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | |
| celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | |
| celeborn.metrics.conf | &lt;undefined&gt; | false | Custom metrics configuration file path. Default use `metrics.properties` in classpath. | 0.3.0 | |
Expand Down
Loading
Loading