Skip to content

Commit

Permalink
Fix metric emission in the segment generation phase (apache#16146)
Browse files Browse the repository at this point in the history
Fix metric emission in the segment generation phase
  • Loading branch information
abhishekagarwal87 authored Mar 18, 2024
1 parent 3b35fb7 commit 7d307df
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.InputSourceProcessor;
Expand All @@ -40,15 +42,13 @@
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
Expand Down Expand Up @@ -179,9 +179,10 @@ private List<DataSegment> generateSegments(
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();

RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
Collections.singletonList(fireDepartmentForMetrics),
Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(
this,
fireDepartmentForMetrics,
buildSegmentsMeters
);
toolbox.addMonitor(metricsMonitor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.BatchAppenderators;
Expand All @@ -53,7 +55,6 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
Expand All @@ -63,7 +64,6 @@
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
Expand Down Expand Up @@ -373,9 +373,10 @@ private Set<DataSegment> generateAndPushSegments(
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();

RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(
Collections.singletonList(fireDepartmentForMetrics),
Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(
this,
fireDepartmentForMetrics,
rowIngestionMeters
);
toolbox.addMonitor(metricsMonitor);

Expand Down

0 comments on commit 7d307df

Please sign in to comment.