Skip to content

Commit

Permalink
addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Apr 2, 2024
1 parent 40694b7 commit 5649494
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,16 @@ public Object getDeserializedUserPayload() {
return this.userPayloadObject;
}

@Override
public String toString() {
return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+ targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
+ ", deserializedUserPayloadExists=" + (userPayloadObject != null)
+ serializedPath != null ? (", serializedPath=" + serializedPath) : "" + "]";
StringBuilder sb = new StringBuilder();
sb.append("InputDataInformationEvent [sourceIndex=").append(sourceIndex)
.append(", targetIndex=").append(targetIndex)
.append(", serializedUserPayloadExists=").append(userPayload != null)
.append(", deserializedUserPayloadExists=").append(userPayloadObject != null);
if (serializedPath != null) {
sb.append(", serializedPath=").append(serializedPath);
}
sb.append("]");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent,
LOG.info("Reading InputDataInformationEvent from path: {}", filePath);

MRSplitProto splitProto = null;
FileSystem fs = FileSystem.get(filePath.toUri(), jobConf);
FileSystem fs = filePath.getFileSystem(jobConf);

try (FSDataInputStream in = fs.open(filePath)) {
splitProto = MRSplitProto.parseFrom(in);
Expand All @@ -917,7 +917,7 @@ private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent,

private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException {
ByteBuffer payload = initEvent.getUserPayload();
LOG.info("Reading InputDataInformationEvent from payload, size: {} bytes}", payload.limit());
LOG.info("Reading InputDataInformationEvent from payload: {}}", payload);
return MRSplitProto.parseFrom(ByteString.copyFrom(payload));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.mapreduce.hadoop;

import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -29,6 +30,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
Expand Down Expand Up @@ -58,20 +60,22 @@ public class TestMRInputHelpers {

private static Configuration conf = new Configuration();
private static FileSystem remoteFs;
private static LocalFileSystem localFs;
private static Path testFilePath;
private static Path oldSplitsDir;
private static Path newSplitsDir;

private static final String TEST_ROOT_DIR = "target"
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir";

private static final Path LOCAL_TEST_ROOT_DIR = new Path("target"
+ Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir");
private static Path TEST_ROOT_DIR;
private static Path LOCAL_TEST_ROOT_DIR;

@BeforeClass
public static void setup() throws IOException {
TEST_ROOT_DIR = new Path(Files.createTempDirectory(TestMRHelpers.class.getName()).toString());
LOCAL_TEST_ROOT_DIR = new Path(Files.createTempDirectory(TestMRHelpers.class.getName() + "-local").toString());

try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).racks(null).build();
remoteFs = dfsCluster.getFileSystem();
Expand Down Expand Up @@ -107,6 +111,8 @@ public static void setup() throws IOException {

oldSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirOld/"));
newSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirNew/"));

localFs = FileSystem.getLocal(conf);
}


Expand Down Expand Up @@ -297,13 +303,11 @@ public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exceptio

@Before
public void before() throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
localFs.mkdirs(LOCAL_TEST_ROOT_DIR);
}

@After
public void after() throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(LOCAL_TEST_ROOT_DIR, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static InputDataInformationEvent convertRootInputDataInformationEventFrom
EventProtos.RootInputDataInformationEventProto proto) {
ByteBuffer payload = proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null;
InputDataInformationEvent diEvent = null;
if (!proto.getSerializedPath().isEmpty()) {
if (proto.hasSerializedPath()) {
diEvent = InputDataInformationEvent.createWithSerializedPath(proto.getSourceIndex(),
proto.getSerializedPath().toStringUtf8());
} else {
Expand Down

0 comments on commit 5649494

Please sign in to comment.