diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java index d254a6a99e..5b8906d520 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java @@ -78,4 +78,6 @@ public interface OutputCommitterContext { */ public int getVertexIndex(); + public int getDagIdentifier(); + } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java index dc89514950..06be989b9e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java @@ -34,14 +34,16 @@ public class OutputCommitterContextImpl implements OutputCommitterContext { private final String dagName; private final String vertexName; private final int vertexIdx; + private final int dagIdentifier; private final RootInputLeafOutput output; public OutputCommitterContextImpl(ApplicationId applicationId, - int dagAttemptNumber, - String dagName, - String vertexName, - RootInputLeafOutput output, - int vertexIdx) { + int dagAttemptNumber, + String dagName, + String vertexName, + int dagIdentifier, + int vertexIdx, + RootInputLeafOutput output) { Objects.requireNonNull(applicationId, "applicationId is null"); Objects.requireNonNull(dagName, "dagName is null"); Objects.requireNonNull(vertexName, "vertexName is null"); @@ -52,6 +54,7 @@ public OutputCommitterContextImpl(ApplicationId applicationId, this.vertexName = vertexName; this.output = output; this.vertexIdx = vertexIdx; + this.dagIdentifier = dagIdentifier; } @Override @@ -94,4 +97,9 @@ public int getVertexIndex() { return vertexIdx; } + @Override + public int getDagIdentifier() { + return dagIdentifier; + } + } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f8f2750267..c7cf176af7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2560,8 +2560,10 @@ public Void run() throws Exception { appContext.getApplicationAttemptId().getAttemptId(), appContext.getCurrentDAG().getName(), vertexName, - od, - vertexId.getId()); + appContext.getCurrentDAG().getID().getId(), + vertexId.getId(), + od + ); OutputCommitter outputCommitter = ReflectionUtils .createClazzInstance(od.getControllerDescriptor().getClassName(), new Class[]{OutputCommitterContext.class}, diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java index 71e5681cbf..4a648dc901 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java @@ -18,6 +18,7 @@ package org.apache.tez.mapreduce.committer; +import org.apache.tez.mapreduce.common.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -78,6 +79,7 @@ public void initialize() throws IOException { jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials()); jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); + jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext())); jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex()); committer = getOutputCommitter(getContext()); jobContext = getJobContextFromVertexContext(getContext()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java index 670ee5db4e..85483fc598 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java @@ -30,7 +30,10 @@ import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.mapreduce.hadoop.mapred.MRCounters; +import org.apache.tez.runtime.api.OutputCommitterContext; +import org.apache.tez.runtime.api.OutputContext; @Private public final class Utils { @@ -63,5 +66,12 @@ public static Counter getMRCounter(TezCounter tezCounter) { Objects.requireNonNull(tezCounter); return new MRCounters.MRCounter(tezCounter); } - + + public static String getDAGID(OutputCommitterContext context) { + return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString(); + } + + public static String getDAGID(OutputContext context) { + return TezDAGID.getInstance(context.getApplicationId(), context.getDagIdentifier()).toString(); + } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index e162460773..f1183742fc 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -131,6 +131,11 @@ public interface MRJobConfig { public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities"; + /** + * Used by committers to set a job-wide UUID. + */ + public static final String JOB_COMMITTER_UUID = "job.committer.uuid"; + public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version"; /** diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java index 9aeae25bd9..b8ac1b3a54 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java @@ -33,6 +33,7 @@ import org.apache.tez.common.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.tez.mapreduce.common.Utils; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -413,6 +414,7 @@ protected List initializeBase() throws IOException, InterruptedException } jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, getContext().getDAGAttemptNumber()); + jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext())); TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java index bfc09dc9b8..3359a6eda2 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java @@ -56,7 +56,9 @@ import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.mapreduce.TestUmbilical; import org.apache.tez.mapreduce.TezTestUtils; +import org.apache.tez.mapreduce.common.Utils; import org.apache.tez.mapreduce.hadoop.MRConfig; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.ProcessorContext; @@ -131,6 +133,26 @@ public void testMergeConfig() throws Exception { assertEquals("base-value", mergedConf.get("base-key")); } + @Test + public void testJobUUIDSet() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, true); + DataSinkDescriptor dataSink = MROutput + .createConfigBuilder(conf, TextOutputFormat.class, + tmpDir.getPath()) + .build(); + + OutputContext outputContext = createMockOutputContext(dataSink.getOutputDescriptor().getUserPayload(), + new Configuration(false)); + MROutput output = new MROutput(outputContext, 2); + output.initialize(); + String invalidDAGID = "invalid default"; + String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID); + assertNotEquals(dagID, invalidDAGID); + assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID); + assertEquals(dagID, Utils.getDAGID(outputContext)); + } + @Test(timeout = 5000) public void testOldAPI_TextOutputFormat() throws Exception { Configuration conf = new Configuration();