Skip to content

Commit

Permalink
TEZ-4547: Add Tez AM JobID to the JobConf (#339) (Venkatasubrahmanian…
Browse files Browse the repository at this point in the history
… Narayanan reviewed by Laszlo Bodor)
  • Loading branch information
VenkatSNarayanan authored Aug 5, 2024
1 parent 70d79b2 commit 563b494
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ public interface OutputCommitterContext {
*/
public int getVertexIndex();

public int getDagIdentifier();

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
private final String dagName;
private final String vertexName;
private final int vertexIdx;
private final int dagIdentifier;
private final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output;

public OutputCommitterContextImpl(ApplicationId applicationId,
int dagAttemptNumber,
String dagName,
String vertexName,
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output,
int vertexIdx) {
int dagAttemptNumber,
String dagName,
String vertexName,
int dagIdentifier,
int vertexIdx,
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
Objects.requireNonNull(applicationId, "applicationId is null");
Objects.requireNonNull(dagName, "dagName is null");
Objects.requireNonNull(vertexName, "vertexName is null");
Expand All @@ -52,6 +54,7 @@ public OutputCommitterContextImpl(ApplicationId applicationId,
this.vertexName = vertexName;
this.output = output;
this.vertexIdx = vertexIdx;
this.dagIdentifier = dagIdentifier;
}

@Override
Expand Down Expand Up @@ -94,4 +97,9 @@ public int getVertexIndex() {
return vertexIdx;
}

@Override
public int getDagIdentifier() {
return dagIdentifier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2560,8 +2560,10 @@ public Void run() throws Exception {
appContext.getApplicationAttemptId().getAttemptId(),
appContext.getCurrentDAG().getName(),
vertexName,
od,
vertexId.getId());
appContext.getCurrentDAG().getID().getId(),
vertexId.getId(),
od
);
OutputCommitter outputCommitter = ReflectionUtils
.createClazzInstance(od.getControllerDescriptor().getClassName(),
new Class[]{OutputCommitterContext.class},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.mapreduce.committer;

import org.apache.tez.mapreduce.common.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -78,6 +79,7 @@ public void initialize() throws IOException {
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext()));
jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex());
committer = getOutputCommitter(getContext());
jobContext = getJobContextFromVertexContext(getContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.OutputContext;

@Private
public final class Utils {
Expand Down Expand Up @@ -63,5 +66,12 @@ public static Counter getMRCounter(TezCounter tezCounter) {
Objects.requireNonNull(tezCounter);
return new MRCounters.MRCounter(tezCounter);
}


public static String getDAGID(OutputCommitterContext context) {
return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString();
}

public static String getDAGID(OutputContext context) {
return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public interface MRJobConfig {

public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";

/**
* Used by committers to set a job-wide UUID.
*/
public static final String JOB_COMMITTER_UUID = "job.committer.uuid";

public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.tez.common.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -413,6 +414,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException
}
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext()));
TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
Expand Down Expand Up @@ -131,6 +133,26 @@ public void testMergeConfig() throws Exception {
assertEquals("base-value", mergedConf.get("base-key"));
}

@Test
public void testJobUUIDSet() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
DataSinkDescriptor dataSink = MROutput
.createConfigBuilder(conf, TextOutputFormat.class,
tmpDir.getPath())
.build();

OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(),
new Configuration(false));
MROutput output = new MROutput(outputContext, 2);
output.initialize();
String invalidDAGID = "invalid default";
String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID);
assertNotEquals(dagID, invalidDAGID);
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID);
assertEquals(dagID, Utils.getDAGID(outputContext));
}

@Test(timeout = 5000)
public void testOldAPI_TextOutputFormat() throws Exception {
Configuration conf = new Configuration();
Expand Down

0 comments on commit 563b494

Please sign in to comment.