Skip to content

Commit

Permalink
[GOBBLIN-1848] Add tags to dagmanager metrics for extensibility (#3712)
Browse files Browse the repository at this point in the history
* Add tags to dagmanager metrics for extensibility

* Fix concurrency bug in test

* Add job level metrics in dagmanager test

* Test not cleaning dm threads

* Only cleanup metrics if threads started by the dagmanager
  • Loading branch information
Will-Lo authored Jul 10, 2023
1 parent 2c68476 commit 2c2ffac
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.metrics;

public class MetricTagNames {
public static final String METRIC_BACKEND_REPRESENTATION = "metricBackendRepresentation";
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean
} else {
this.eventSubmitter = Optional.absent();
}
this.dagManagerMetrics = new DagManagerMetrics(metricContext);
this.dagManagerMetrics = new DagManagerMetrics();
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.MetricTagNames;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
Expand Down Expand Up @@ -75,6 +81,13 @@ public DagManagerMetrics(MetricContext metricContext) {
this.metricContext = metricContext;
}

public DagManagerMetrics() {
// Create a new metric context for the DagManagerMetrics tagged appropriately
List<Tag<?>> tags = new ArrayList<>();
tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER));
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags);
}

public void activate() {
if (this.metricContext != null) {
allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
Expand Down Expand Up @@ -249,7 +262,7 @@ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {

public void cleanup() {
// Add null check so that unit test will not affect each other when we de-active non-instrumented DagManager
if(this.metricContext != null) {
if(this.metricContext != null && this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName())) {
// The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,9 @@ public void testResumeCancelledDag() throws URISyntaxException, IOException {

@Test (dependsOnMethods = "testResumeCancelledDag")
public void testJobStartSLAKilledDag() throws URISyntaxException, IOException {
String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
long slaKilledMeterCount = metricContext.getParent().get().getMeters().get(slakilledMeterName) == null? 0 :
metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount();
long flowExecutionId = System.currentTimeMillis();
String flowGroupId = "0";
String flowGroup = "group" + flowGroupId;
Expand Down Expand Up @@ -780,8 +783,7 @@ public void testJobStartSLAKilledDag() throws URISyntaxException, IOException {
Assert.assertEquals(this.dagToJobs.size(), 1);
Assert.assertTrue(this.dags.containsKey(dagId1));

String slakilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "job0", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), 1);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName).getCount(), slaKilledMeterCount + 1);

// Cleanup
this._dagManagerThread.run();
Expand Down Expand Up @@ -1162,10 +1164,13 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException {
Config executorOneConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
.withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));
Config executorTwoConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"))
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
.withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10))
.withValue(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS, ConfigValueFactory.fromAnyRef(true));

List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));

Expand Down Expand Up @@ -1229,7 +1234,6 @@ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException {
String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS);

String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
// Cleanup
Expand Down

0 comments on commit 2c2ffac

Please sign in to comment.