Skip to content

Commit

Permalink
reduce application metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Nov 28, 2024
1 parent f5d21ac commit 3828d2e
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
def metricsAppEnabled: Boolean = get(METRICS_APP_ENABLED)

// //////////////////////////////////////////////////////
// Quota //
Expand Down Expand Up @@ -5329,6 +5330,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val METRICS_APP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.application.enabled")
.categories("metrics")
.doc("When false, the metrics of application won't return to reduce the num of metrics.")
.version("0.6.0")
.booleanConf
.createWithDefault(true)

val QUOTA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.quota.enabled")
.categories("quota", "master", "client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.lang
import java.util.{Map => JMap}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}

import scala.collection.{breakOut, mutable}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

Expand All @@ -35,10 +35,18 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils}
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._

case class NamedCounter(name: String, counter: Counter, labels: Map[String, String])
case class NamedCounter(
name: String,
counter: Counter,
labels: Map[String, String],
isApp: Boolean = false)
extends MetricLabels

case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String])
case class NamedGauge[T](
name: String,
gauge: Gauge[T],
labels: Map[String, String],
isApp: Boolean = false)
extends MetricLabels

case class NamedMeter(name: String, meter: Meter, labels: Map[String, String])
Expand Down Expand Up @@ -77,6 +85,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel ++ instanceLabel
val staticLabelsString: String = MetricLabels.labelString(staticLabels)

val metricsAppEnabled: Boolean = conf.metricsAppEnabled

val applicationLabel = "applicationId"

val timerMetricsMap: ConcurrentHashMap.KeySetView[String, lang.Boolean] =
Expand All @@ -103,30 +113,42 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
def addGauge[T](
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
gauge: Gauge[T],
isAppMetrics: Boolean): Unit = {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
namedGauges.putIfAbsent(
metricNameWithCustomizedLabels(name, labels),
NamedGauge(name, gauge, labels ++ staticLabels))
NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number")
}
}

def addGauge[T](
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
addGauge(name, labels, gauge, false)
}

def addGauge[T](
name: String,
labels: JMap[String, String],
gauge: Gauge[T]): Unit = {
addGauge(name, labels.asScala.toMap, gauge)
}

def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () => T): Unit = {
def addGauge[T](
name: String,
labels: Map[String, String] = Map.empty,
isAppMetrics: Boolean = false)(f: () => T): Unit = {
addGauge(
name,
labels,
metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)))
metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)),
isAppMetrics)
}

def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
Expand Down Expand Up @@ -178,11 +200,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String)

def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String])

def addCounter(name: String, labels: Map[String, String]): Unit = {
def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
namedCounters.putIfAbsent(
metricNameWithLabel,
NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels))
NamedCounter(
name,
metricRegistry.counter(metricNameWithLabel),
labels ++ staticLabels,
isAppMetrics))
}

def counters(): List[NamedCounter] = {
Expand Down Expand Up @@ -439,50 +465,87 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
sb.toString()
}

def getAllMetricsNum: Int = {
val sum = timerMetricsMap.size() +
namedTimers.size() +
namedMeters.size() +
namedGauges.size() +
namedCounters.size()
sum
}

override def getMetrics(): String = {
var leftMetricsNum = metricsCapacity
val metricsSnapshot = ArrayBuffer[String]()
leftMetricsNum = fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, metricsSnapshot)
leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, metricsSnapshot)
val appMetricsSnapshot = ArrayBuffer[String]()
leftMetricsNum =
fillInnerMetricsSnapshot(timerMetrics(), leftMetricsNum, metricsSnapshot, appMetricsSnapshot)
leftMetricsNum =
fillInnerMetricsSnapshot(timers(), leftMetricsNum, metricsSnapshot, appMetricsSnapshot)
leftMetricsNum =
fillInnerMetricsSnapshot(histograms(), leftMetricsNum, metricsSnapshot, appMetricsSnapshot)
leftMetricsNum =
fillInnerMetricsSnapshot(meters(), leftMetricsNum, metricsSnapshot, appMetricsSnapshot)
leftMetricsNum =
fillInnerMetricsSnapshot(gauges(), leftMetricsNum, metricsSnapshot, appMetricsSnapshot)
leftMetricsNum =
fillInnerMetricsSnapshot(counters(), leftMetricsNum, metricsSnapshot, appMetricsSnapshot)
if (leftMetricsNum > 0 && metricsAppEnabled) {
metricsSnapshot ++= appMetricsSnapshot.toList.take(leftMetricsNum)
}
val sb = new mutable.StringBuilder
metricsSnapshot.foreach(metric => sb.append(metric))
if (leftMetricsNum <= 0) {
logWarning("The number of metrics exceed the output metrics strings capacity!")
logWarning(
s"The number of metrics exceed the output metrics strings capacity! Full metrics num: ${getAllMetricsNum}")
}
sb.toString()
}

private def fillInnerMetricsSnapshot(
metricList: List[AnyRef],
leftNum: Int,
metricsSnapshot: ArrayBuffer[String]): Int = {
metricsSnapshot: ArrayBuffer[String],
appMetricsSnapshot: ArrayBuffer[String]): Int = {
if (leftNum <= 0) {
return 0
}
val addList = metricList.take(leftNum)
addList.foreach {
case c: NamedCounter =>
metricsSnapshot += getCounterMetrics(c)
case g: NamedGauge[_] =>
metricsSnapshot += getGaugeMetrics(g)
case m: NamedMeter =>
metricsSnapshot += getMeterMetrics(m)
case h: NamedHistogram =>
metricsSnapshot += getHistogramMetrics(h)
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
metricsSnapshot += getTimerMetrics(t)
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
metricsSnapshot += s.toString
var addNum = 0
val appCount0Metrics = ArrayBuffer[String]()
for (m <- metricList) {
if (addNum >= leftNum) breakOut
m match {
case c: NamedCounter =>
val counterMetric = getCounterMetrics(c)
if (c.isApp) {
if (c.counter.getCount > 0) {
appMetricsSnapshot += counterMetric
} else {
appCount0Metrics += counterMetric
}
} else metricsSnapshot += counterMetric
case g: NamedGauge[_] =>
val gaugeMetric = getGaugeMetrics(g)
if (g.isApp) {
appMetricsSnapshot += gaugeMetric
} else metricsSnapshot += gaugeMetric
case m: NamedMeter =>
metricsSnapshot += getMeterMetrics(m)
case h: NamedHistogram =>
metricsSnapshot += getHistogramMetrics(h)
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case t: NamedTimer =>
metricsSnapshot += getTimerMetrics(t)
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
case s =>
metricsSnapshot += s.toString
}
addNum = addNum + 1
}
leftNum - addList.size
appMetricsSnapshot ++= appCount0Metrics
leftNum - addNum
}

override def destroy(): Unit = {
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ license: |
| celeborn.metrics.app.topDiskUsage.count | 50 | false | Size for top items about top disk usage applications list. | 0.2.0 | |
| celeborn.metrics.app.topDiskUsage.interval | 10min | false | Time length for a window about top disk usage application list. | 0.2.0 | |
| celeborn.metrics.app.topDiskUsage.windowSize | 24 | false | Window size about top disk usage application list. | 0.2.0 | |
| celeborn.metrics.application.enabled | true | false | When false, the metrics of application won't return to reduce the num of metrics. | 0.6.0 | |
| celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | |
| celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | |
| celeborn.metrics.conf | &lt;undefined&gt; | false | Custom metrics configuration file path. Default use `metrics.properties` in classpath. | 0.3.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,22 +707,26 @@ private[celeborn] class Worker(
resourceConsumptionLabel += (resourceConsumptionSource.applicationLabel -> applicationId)
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_FILE_COUNT,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).diskFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).diskBytesWritten
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_FILE_COUNT,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).hdfsFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
resourceConsumptionLabel,
true) { () =>
computeResourceConsumption(userIdentifier, resourceConsumption).hdfsBytesWritten
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER)
val applicationIds = appActiveConnections.get(client.getChannel.id().asLongText())
val applicationId = Utils.splitShuffleKey(shuffleKey)._1
if (applicationIds != null && !applicationIds.contains(applicationId)) {
addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId))
addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId), true)
incCounter(ACTIVE_CONNECTION_COUNT, 1, Map(applicationLabel -> applicationId))
applicationIds.add(applicationId)
}
Expand Down

0 comments on commit 3828d2e

Please sign in to comment.