Skip to content

Commit

Permalink
[CELEBORN-1634][FOLLOWUP] Add rpc metrics into grafana dashboard
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1. rename the RPC metrics name from `${name}_${metric}` to `Rpc${metric}{name=$name}` so that it is easy to add into grafana dashboard
2. Use MASTER/WORKER/CLIENT Role for rpc env.
3. add the rpc metrics into grafana dashboard.

### Why are the changes needed?

For monitoring

### Does this PR introduce _any_ user-facing change?
No, it has not been released

### How was this patch tested?
UT for  metrics source `instance`.

<img width="1456" alt="image" src="https://github.com/user-attachments/assets/90284390-54ad-49ef-a868-fa537d2301b8">

<img width="1880" alt="image" src="https://github.com/user-attachments/assets/e8101e47-d649-4c66-9978-1efb4faa047f">

Closes #2990 from turboFei/rpc_metrics.

Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
2 people authored and RexXiong committed Dec 24, 2024
1 parent 2eb4c23 commit 03656b5
Show file tree
Hide file tree
Showing 14 changed files with 707 additions and 23 deletions.
654 changes: 654 additions & 0 deletions assets/grafana/celeborn-dashboard.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.metrics.source.Role;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
import org.apache.celeborn.common.network.client.RpcResponseCallback;
Expand Down Expand Up @@ -201,6 +202,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
Utils.localHostName(conf),
0,
conf,
Role.CLIENT(),
scala.None$.empty());

String module = TransportModuleConstants.DATA_MODULE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo}
import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
Expand Down Expand Up @@ -170,6 +171,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
lifecycleHost,
conf.shuffleManagerPort,
conf,
Role.CLIENT,
None)
rpcEnv.setupEndpoint(RpcNameConstants.LIFECYCLE_MANAGER_EP, this)

Expand All @@ -189,6 +191,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
lifecycleHost,
0,
conf,
Role.CLIENT,
createRpcSecurityContext(
appSecret,
addClientRegistrationBootstrap = true,
Expand All @@ -200,6 +203,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
lifecycleHost,
0,
conf,
Role.CLIENT,
createRpcSecurityContext(appSecret))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ package org.apache.celeborn.common.metrics.source
object Role {
val MASTER = "master"
val WORKER = "worker"
val CLIENT = "client"
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ object RpcEnv {
host: String,
port: Int,
conf: CelebornConf,
role: String,
securityContext: Option[RpcSecurityContext]): RpcEnv = {
create(name, transportModule, host, host, port, conf, 0, securityContext)
create(name, transportModule, host, host, port, conf, 0, role, securityContext)
}

def create(
Expand All @@ -50,6 +51,7 @@ object RpcEnv {
port: Int,
conf: CelebornConf,
numUsableCores: Int,
role: String,
securityContext: Option[RpcSecurityContext],
source: Option[AbstractSource]): RpcEnv = {
val bindAddress =
Expand All @@ -63,6 +65,7 @@ object RpcEnv {
port,
conf,
numUsableCores,
role,
securityContext,
source)
}
Expand All @@ -75,6 +78,7 @@ object RpcEnv {
port: Int,
conf: CelebornConf,
numUsableCores: Int,
role: String,
securityContext: Option[RpcSecurityContext] = None,
source: Option[AbstractSource] = None): RpcEnv = {
val config =
Expand All @@ -86,6 +90,7 @@ object RpcEnv {
advertiseAddress,
port,
numUsableCores,
role,
securityContext,
source)
new NettyRpcEnvFactory().create(config)
Expand Down Expand Up @@ -222,6 +227,7 @@ private[celeborn] case class RpcEnvConfig(
advertiseAddress: String,
port: Int,
numUsableCores: Int,
role: String,
securityContext: Option[RpcSecurityContext],
source: Option[AbstractSource]) {
assert(RpcEnvConfig.VALID_TRANSPORT_MODULES.contains(transportModule))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,17 @@ private[celeborn] class RpcMetricsTracker(
} else {
false
}
final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}"
final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}"
final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}"
final private val NAME_LABEL = Map("name" -> name)

private var queueLengthFunc: () => Long = _

def init(lengthFunc: () => Long): Unit = {
queueLengthFunc = lengthFunc
if (name != null) {
rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc)
rpcSource.addGauge(RpcSource.QUEUE_LENGTH, NAME_LABEL)(queueLengthFunc)

rpcSource.addTimer(QUEUE_TIME_METRIC)
rpcSource.addTimer(PROCESS_TIME_METRIC)
rpcSource.addTimer(RpcSource.QUEUE_TIME, NAME_LABEL)
rpcSource.addTimer(RpcSource.PROCESS_TIME, NAME_LABEL)
}
}

Expand Down Expand Up @@ -115,12 +113,12 @@ private[celeborn] class RpcMetricsTracker(
val msgName = messageName(message)

if (useHistogram) {
updateHistogram(QUEUE_TIME_METRIC, queueTime)
updateHistogram(PROCESS_TIME_METRIC, processTime)
updateHistogram(RpcSource.QUEUE_TIME, queueTime)
updateHistogram(RpcSource.PROCESS_TIME, processTime)
updateHistogram(msgName, processTime)
} else {
rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime)
rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime)
rpcSource.updateTimer(RpcSource.QUEUE_TIME, queueTime, NAME_LABEL)
rpcSource.updateTimer(RpcSource.PROCESS_TIME, processTime, NAME_LABEL)
rpcSource.updateTimer(msgName, processTime)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ package org.apache.celeborn.common.rpc
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.AbstractSource

class RpcSource(conf: CelebornConf) extends AbstractSource(conf, RpcSource.ROLE_RPC) {
override def sourceName: String = RpcSource.ROLE_RPC
class RpcSource(conf: CelebornConf, role: String) extends AbstractSource(conf, role) {
override def sourceName: String = "RPC"

startCleaner()
}

object RpcSource {
val ROLE_RPC = "RPC"

val QUEUE_LENGTH = "QueueLength"
val QUEUE_TIME = "QueueTime"
val PROCESS_TIME = "ProcessTime"
val QUEUE_LENGTH = "RpcQueueLength"
val QUEUE_TIME = "RpcQueueTime"
val PROCESS_TIME = "RpcProcessTime"
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ class NettyRpcEnv(
config.transportModule,
celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))

private val source: RpcSource = new RpcSource(celebornConf)
private val _rpcSource: RpcSource = new RpcSource(celebornConf, config.role)

private val dispatcher: Dispatcher = new Dispatcher(this, source)
private val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)

private var worker: RpcEndpoint = null

Expand Down Expand Up @@ -364,7 +364,7 @@ class NettyRpcEnv(
}
}

override def rpcSource(): RpcSource = source
override def rpcSource(): RpcSource = _rpcSource
}

private[celeborn] object NettyRpcEnv extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals, assertNotNull}
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
Expand Down Expand Up @@ -283,6 +284,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
12345,
conf,
64,
Role.WORKER,
None,
None)
val worker4 = new WorkerInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.rpc.{RpcAddress, RpcMetricsTracker, RpcSource, TestRpcEndpoint}
import org.apache.celeborn.common.util.ThreadUtils

Expand All @@ -38,7 +39,7 @@ class InboxSuite extends CelebornFunSuite with BeforeAndAfter {
onDropOverride: Option[InboxMessage => T]): Inbox = {
val rpcEnvRef = mock(classOf[NettyRpcEndpointRef])
val conf = new CelebornConf()
val source: RpcSource = new RpcSource(conf)
val source: RpcSource = new RpcSource(conf, Role.CLIENT)
if (onDropOverride.isEmpty) {
new Inbox(
rpcEnvRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.network.client.TransportClient
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.rpc._
Expand All @@ -49,6 +50,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"localhost",
port,
0,
Role.CLIENT,
None,
None)
new NettyRpcEnvFactory().create(config)
Expand All @@ -73,6 +75,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"example.com",
0,
0,
Role.CLIENT,
None,
None)
val env = new NettyRpcEnvFactory().create(config)
Expand Down Expand Up @@ -123,6 +126,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"localhost",
0,
numUsableCores,
Role.CLIENT,
None,
None)
val anotherEnv = new NettyRpcEnvFactory().create(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private[celeborn] class Master(
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Role.MASTER,
None,
None)
} else {
Expand All @@ -118,6 +119,7 @@ private[celeborn] class Master(
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Role.MASTER,
Some(externalSecurityContext),
None)
}
Expand All @@ -137,6 +139,7 @@ private[celeborn] class Master(
masterArgs.internalPort,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Role.MASTER,
None,
None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
test("metrics") {
var response = webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
assert(HttpServletResponse.SC_OK == response.getStatus)
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
val metricLines = response.readEntity(classOf[String]).split("\n")
Seq(
"metrics_jvm_memory_heap_max_Value",
"metrics_RpcQueueLength_Value",
"metrics_RpcQueueTime_Max",
"metrics_RpcProcessTime_Max").foreach { metric =>
assert(metricLines.exists(l =>
l.contains(metric) && l.contains(s"""instance="${httpService.connectionUrl}"""")))
}

response = webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
assert(HttpServletResponse.SC_OK == response.getStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private[celeborn] class Worker(
workerArgs.port,
conf,
Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
Role.WORKER,
None,
Some(workerSource))
} else {
Expand All @@ -121,6 +122,7 @@ private[celeborn] class Worker(
workerArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
Role.WORKER,
Some(externalSecurityContext),
Some(workerSource))
}
Expand All @@ -137,6 +139,7 @@ private[celeborn] class Worker(
workerArgs.internalPort,
conf,
Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
Role.WORKER,
None,
Some(workerSource))
}
Expand Down

0 comments on commit 03656b5

Please sign in to comment.