Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vnarayanan committed May 28, 2024
1 parent 5cef95a commit 83cd25f
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
private final RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output;

public OutputCommitterContextImpl(ApplicationId applicationId,
int dagAttemptNumber,
String dagName,
String vertexName,
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output,
int vertexIdx,
int dagIdentifier) {
int dagAttemptNumber,
String dagName,
String vertexName,
int dagIdentifier, int vertexIdx, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) {
Objects.requireNonNull(applicationId, "applicationId is null");
Objects.requireNonNull(dagName, "dagName is null");
Objects.requireNonNull(vertexName, "vertexName is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2560,9 +2560,10 @@ public Void run() throws Exception {
appContext.getApplicationAttemptId().getAttemptId(),
appContext.getCurrentDAG().getName(),
vertexName,
od,
appContext.getCurrentDAG().getID().getId(),
vertexId.getId(),
appContext.getCurrentDAG().getID().getId());
od
);
OutputCommitter outputCommitter = ReflectionUtils
.createClazzInstance(od.getControllerDescriptor().getClassName(),
new Class[]{OutputCommitterContext.class},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void initialize() throws IOException {
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.getDAGID(
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(
getContext().getApplicationId(),
getContext().getDagIdentifier()));
jobConf.setInt(MRJobConfig.VERTEX_ID, getContext().getVertexIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ public interface MRJobConfig {
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";

/**
* Used by Hadoop's MagicS3Guard and Staging committers to set a job-wide UUID
* Used by committers to set a job-wide UUID
*/
public static final String FS_S3A_COMMITTER_UUID = "fs.s3a.committer.uuid";
public static final String JOB_COMMITTER_UUID = "job.committer.uuid";

public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION = "mapreduce.fileoutputcommitter.algorithm.version";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected List<Event> initializeBase() throws IOException, InterruptedException
}
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
jobConf.set(MRJobConfig.FS_S3A_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(),
jobConf.set(MRJobConfig.JOB_COMMITTER_UUID, Utils.getDAGID(getContext().getApplicationId(),
getContext().getDagIdentifier()));
TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testJobUUIDSet() throws Exception {
MROutput output = new MROutput(outputContext, 2);
output.initialize();
String invalidDAGID = "invalid default";
String dagID = output.jobConf.get(MRJobConfig.FS_S3A_COMMITTER_UUID, invalidDAGID);
String dagID = output.jobConf.get(MRJobConfig.JOB_COMMITTER_UUID, invalidDAGID);
assertNotEquals(dagID, invalidDAGID);
assertNotEquals(output.jobConf.get(org.apache.hadoop.mapred.JobContext.TASK_ATTEMPT_ID), dagID);
assertEquals(dagID, Utils.getDAGID(outputContext.getApplicationId(), outputContext.getDagIdentifier()));
Expand Down

0 comments on commit 83cd25f

Please sign in to comment.