diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index cbc66c40493..94bb7bdaa80 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -879,6 +879,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def metricsWorkerForceAppendPauseSpentTimeThreshold: Int = get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD) def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED) + def metricsAppEnabled: Boolean = get(METRICS_APP_ENABLED) // ////////////////////////////////////////////////////// // Quota // @@ -5340,6 +5341,14 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(true) + val METRICS_APP_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.metrics.application.enabled") + .categories("metrics") + .doc("When false, the metrics of application won't return to reduce the num of metrics.") + .version("0.6.0") + .booleanConf + .createWithDefault(true) + val IDENTITY_PROVIDER: ConfigEntry[String] = buildConf("celeborn.identity.provider") .withAlternative("celeborn.quota.identity.provider") diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index e9c4cfa3b0a..51e6efa71ae 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -34,10 +34,18 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils, Utils} // Can Remove this if celeborn don't support scala211 in future import org.apache.celeborn.common.util.FunctionConverter._ -case class NamedCounter(name: String, counter: Counter, labels: Map[String, String]) +case class NamedCounter( + name: String, + counter: Counter, + labels: Map[String, String], + isApp: Boolean = false) extends MetricLabels -case class NamedGauge[T](name: String, gauge: Gauge[T], labels: Map[String, String]) +case class NamedGauge[T]( + name: String, + gauge: Gauge[T], + labels: Map[String, String], + isApp: Boolean = false) extends MetricLabels case class NamedMeter(name: String, meter: Meter, labels: Map[String, String]) @@ -76,6 +84,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel ++ instanceLabel val staticLabelsString: String = MetricLabels.labelString(staticLabels) + val metricsAppEnabled: Boolean = conf.metricsAppEnabled + val applicationLabel = "applicationId" val timerMetrics: ConcurrentLinkedQueue[String] = new ConcurrentLinkedQueue[String]() @@ -101,18 +111,28 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def addGauge[T]( name: String, labels: Map[String, String], - gauge: Gauge[T]): Unit = { - // filter out non-number type gauges - if (gauge.getValue.isInstanceOf[Number]) { - namedGauges.putIfAbsent( - metricNameWithCustomizedLabels(name, labels), - NamedGauge(name, gauge, labels ++ staticLabels)) - } else { - logWarning( - s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") + gauge: Gauge[T], + isAppMetrics: Boolean): Unit = { + if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) { + // filter out non-number type gauges + if (gauge.getValue.isInstanceOf[Number]) { + namedGauges.putIfAbsent( + metricNameWithCustomizedLabels(name, labels), + NamedGauge(name, gauge, labels ++ staticLabels, isAppMetrics)) + } else { + logWarning( + s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") + } } } + def addGauge[T]( + name: String, + labels: Map[String, String], + gauge: Gauge[T]): Unit = { + addGauge(name, labels, gauge, false) + } + def addGauge[T]( name: String, labels: JMap[String, String], @@ -120,11 +140,15 @@ abstract class AbstractSource(conf: CelebornConf, role: String) addGauge(name, labels.asScala.toMap, gauge) } - def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () => T): Unit = { + def addGauge[T]( + name: String, + labels: Map[String, String] = Map.empty, + isAppMetrics: Boolean = false)(f: () => T): Unit = { addGauge( name, labels, - metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f))) + metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new GaugeSupplier[T](f)), + isAppMetrics) } def addGauge[T](name: String, gauge: Gauge[T]): Unit = { @@ -176,11 +200,17 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def addCounter(name: String): Unit = addCounter(name, Map.empty[String, String]) - def addCounter(name: String, labels: Map[String, String]): Unit = { - val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) - namedCounters.putIfAbsent( - metricNameWithLabel, - NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels ++ staticLabels)) + def addCounter(name: String, labels: Map[String, String], isAppMetrics: Boolean = false): Unit = { + if (metricsAppEnabled || (!metricsAppEnabled && !isAppMetrics)) { + val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels) + namedCounters.putIfAbsent( + metricNameWithLabel, + NamedCounter( + name, + metricRegistry.counter(metricNameWithLabel), + labels ++ staticLabels, + isAppMetrics)) + } } def counters(): List[NamedCounter] = { diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index d6eeb23581d..8f90f547c53 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -22,16 +22,10 @@ import org.apache.celeborn.common.CelebornConf class CelebornSourceSuite extends CelebornFunSuite { - test("test getMetrics with customized label") { - val conf = new CelebornConf() - createAbstractSourceAndCheck(conf, "", Role.MASTER) - createAbstractSourceAndCheck(conf, "", Role.WORKER) - } - - def createAbstractSourceAndCheck( + def createAbstractSource( conf: CelebornConf, extraLabels: String, - role: String = "mock"): Unit = { + role: String = "mock"): (String, List[String]) = { val mockSource = new AbstractSource(conf, role) { override def sourceName: String = "mockSource" } @@ -39,12 +33,13 @@ class CelebornSourceSuite extends CelebornFunSuite { val user2 = Map("user" -> "user2") val user3 = Map("user" -> "user3") mockSource.addGauge("Gauge1") { () => 1000 } - mockSource.addGauge("Gauge2", user1) { () => 2000 } - mockSource.addCounter("Counter1") - mockSource.addCounter("Counter2", user2) + mockSource.addGauge("Gauge2", user1, true) { () => 2000 } + mockSource.addCounter("Counter1", Map.empty[String, String], true) + mockSource.addCounter("Counter2", user2, true) // test operation with and without label mockSource.incCounter("Counter1", 3000) mockSource.incCounter("Counter2", 4000, user2) + mockSource.incCounter("Counter2", -4000, user2) mockSource.addTimer("Timer1") mockSource.addTimer("Timer2", user3) // ditto @@ -54,6 +49,8 @@ class CelebornSourceSuite extends CelebornFunSuite { mockSource.stopTimer("Timer1", "key1") mockSource.stopTimer("Timer2", "key2", user3) + mockSource.timerMetrics.add("testTimerMetricsMap") + val res = mockSource.getMetrics() var extraLabelsStr = extraLabels if (extraLabels.nonEmpty) { @@ -66,37 +63,86 @@ class CelebornSourceSuite extends CelebornFunSuite { s"""metrics_Gauge2_Value{${extraLabelsStr}${instanceLabelStr}role="$role",user="user1"} 2000""" val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 3000""" val exp4 = - s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 4000""" + s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 0""" val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1""" val exp6 = s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1""" + val exp7 = "testTimerMetricsMap" - assert(res.contains(exp1)) - assert(res.contains(exp2)) - assert(res.contains(exp3)) - assert(res.contains(exp4)) - assert(res.contains(exp5)) - assert(res.contains(exp6)) + val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6, exp7) + (res, expList) + } + + def checkMetricsRes(res: String, labelList: List[String]): Unit = { + labelList.foreach { exp => + assert(res.contains(exp)) + } } test("test getMetrics with customized label by conf") { val conf = new CelebornConf() + val (resM, expsM) = createAbstractSource(conf, "", Role.MASTER) + checkMetricsRes(resM, expsM) + val (resW, expsW) = createAbstractSource(conf, "", Role.WORKER) + checkMetricsRes(resW, expsW) + // label's is normal conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3") val extraLabels = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels) + val (res, exps) = createAbstractSource(conf, extraLabels) + checkMetricsRes(res, exps) // labels' kv not correct assertThrows[IllegalArgumentException] { conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=") val extraLabels2 = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels2) + val (res2, exps2) = createAbstractSource(conf, extraLabels2) + checkMetricsRes(res2, exps2) } // there are spaces in labels conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3 =v3 ") val extraLabels3 = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels3) + val (res3, exps3) = createAbstractSource(conf, extraLabels3) + checkMetricsRes(res3, exps3) + } + + test("test getMetrics with full capacity and isAppEnable false") { + val conf = new CelebornConf() + + // metrics won't contain appMetrics + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false") + conf.set(CelebornConf.METRICS_CAPACITY.key, "7") + val (res1, exps1) = createAbstractSource(conf, "") + List[Int](0, 4, 5, 6).foreach { i => + assert(res1.contains(exps1(i))) + } + List[Int](1, 2, 3).foreach { i => + assert(!res1.contains(exps1(i))) + } + + // metrics contain appMetrics + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") + conf.set(CelebornConf.METRICS_CAPACITY.key, "7") + val (res2, exps2) = createAbstractSource(conf, "") + checkMetricsRes(res2, exps2) + } + + test("test getAndClearTimerMetrics in timerMetrics") { + val conf = new CelebornConf() + conf.set(CelebornConf.METRICS_CAPACITY.key, "6") + val role = "mock" + val mockSource = new AbstractSource(conf, role) { + override def sourceName: String = "mockSource" + } + val exp1 = "testTimerMetrics1" + val exp2 = "testTimerMetrics2" + mockSource.timerMetrics.add(exp1) + val res1 = mockSource.getMetrics() + mockSource.timerMetrics.add(exp2) + val res2 = mockSource.getMetrics() + assert(res1.contains(exp1) && !res1.contains(exp2)) + assert(res2.contains(exp2) && !res2.contains(exp1)) } } diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index a5fe1731875..880598c34af 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -19,6 +19,7 @@ license: | | Key | Default | isDynamic | Description | Since | Deprecated | | --- | ------- | --------- | ----------- | ----- | ---------- | +| celeborn.metrics.application.enabled | true | false | When false, the metrics of application won't return to reduce the num of metrics. | 0.6.0 | | | celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | | | celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | | | celeborn.metrics.conf | <undefined> | false | Custom metrics configuration file path. Default use `metrics.properties` in classpath. | 0.3.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 3439a2e86d1..62f3df9f673 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -703,23 +703,27 @@ private[celeborn] class Worker( resourceConsumptionLabel += (resourceConsumptionSource.applicationLabel -> applicationId) resourceConsumptionSource.addGauge( ResourceConsumptionSource.DISK_FILE_COUNT, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).diskFileCount } resourceConsumptionSource.addGauge( ResourceConsumptionSource.DISK_BYTES_WRITTEN, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).diskBytesWritten } if (hasHDFSStorage) { resourceConsumptionSource.addGauge( ResourceConsumptionSource.HDFS_FILE_COUNT, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).hdfsFileCount } resourceConsumptionSource.addGauge( ResourceConsumptionSource.HDFS_BYTES_WRITTEN, - resourceConsumptionLabel) { () => + resourceConsumptionLabel, + true) { () => computeResourceConsumption(userIdentifier, resourceConsumption).hdfsBytesWritten } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 26532a6bf9a..f3eaf4ee4fb 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -107,7 +107,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, Role.WORKER) val applicationIds = appActiveConnections.get(client.getChannel.id().asLongText()) val applicationId = Utils.splitShuffleKey(shuffleKey)._1 if (applicationIds != null && !applicationIds.contains(applicationId)) { - addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId)) + addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> applicationId), true) incCounter(ACTIVE_CONNECTION_COUNT, 1, Map(applicationLabel -> applicationId)) applicationIds.add(applicationId) }