From 223bda858a080a9da53077cd14383de7dda84e63 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 19 Mar 2024 14:00:20 +0100 Subject: [PATCH 1/6] TEZ-4548: InputDataInformationEvent to be read from serialized payload from filesystem --- .../runtime/api/InputInitializerContext.java | 7 +- .../api/events/InputDataInformationEvent.java | 21 +++-- tez-api/src/main/proto/Events.proto | 1 + .../TestCompositeDataMovementEvent.java | 4 +- .../events/TestInputDataInformationEvent.java | 53 +++++++++++ .../app/dag/RootInputInitializerManager.java | 5 +- .../TezRootInputInitializerContextImpl.java | 5 ++ .../tez/mapreduce/hadoop/MRInputHelpers.java | 31 +++++++ .../apache/tez/mapreduce/input/MRInput.java | 5 +- .../apache/tez/mapreduce/TezTestUtils.java | 5 ++ .../mapreduce/hadoop/TestMRInputHelpers.java | 88 ++++++++++++++----- .../apache/tez/common/ProtoConverters.java | 20 +++-- 12 files changed, 205 insertions(+), 40 deletions(-) rename tez-api/src/test/java/org/apache/tez/runtime/api/{event => events}/TestCompositeDataMovementEvent.java (90%) create mode 100644 tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java index 7c9562e7c9..5cb2d221b3 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java @@ -87,7 +87,12 @@ public interface InputInitializerContext { * @return Resource */ Resource getVertexTaskResource(); - + + /** + * Get the vertex id as integer that belongs to this input. + */ + int getVertexId(); + /** * Get the total resource allocated to this vertex. If the DAG is running in * a busy cluster then it may have no resources available dedicated to it. The diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index a62a34154f..b142bf9800 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -49,8 +49,8 @@ public final class InputDataInformationEvent extends Event { private final int sourceIndex; private int targetIndex; // TODO Likely to be multiple at a later point. private final ByteBuffer userPayload; + private String serializedPath; private final Object userPayloadObject; - private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) { this.sourceIndex = srcIndex; @@ -79,6 +79,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex, return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null); } + public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) { + InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null); + event.serializedPath = serializedPath; + return event; + } + public int getSourceIndex() { return this.sourceIndex; } @@ -90,11 +96,15 @@ public int getTargetIndex() { public void setTargetIndex(int target) { this.targetIndex = target; } - + + public String getSerializedPath() { + return serializedPath; + } + public ByteBuffer getUserPayload() { return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } - + public Object getDeserializedUserPayload() { return this.userPayloadObject; } @@ -103,6 +113,7 @@ public Object getDeserializedUserPayload() { public String toString() { return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex=" + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null) - + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]"; - } + + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + + ", serializedPath=" + serializedPath + "]"; + } } diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto index 9949b0bc8c..05896ac62e 100644 --- a/tez-api/src/main/proto/Events.proto +++ b/tez-api/src/main/proto/Events.proto @@ -58,6 +58,7 @@ message RootInputDataInformationEventProto { optional int32 source_index = 1; optional int32 target_index = 2; optional bytes user_payload = 3; + optional bytes serialized_path = 4; } message CompositeEventProto { diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java similarity index 90% rename from tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java rename to tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java index 7dce6991a4..98c238a1dd 100644 --- a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java @@ -16,12 +16,10 @@ * limitations under the License. */ -package org.apache.tez.runtime.api.event; +package org.apache.tez.runtime.api.events; import java.nio.ByteBuffer; -import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.junit.Assert; import org.junit.Test; diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java new file mode 100644 index 0000000000..6e002e26c4 --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.api.events; + +import java.nio.ByteBuffer; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Charsets; + +public class TestInputDataInformationEvent { + + @Test + public void testApiPayloadOrPath() { + InputDataInformationEvent eventWithSerializedPayload = + InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes())); + // event created by createWithSerializedPayload should contain serialized payload + // but not a path or a deserialized payload + Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString()); + Assert.assertNull(eventWithSerializedPayload.getSerializedPath()); + Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload()); + + InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2"); + // event created by eventWithObjectPayload should contain a deserialized payload + // but not a path or serialized payload + Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload()); + Assert.assertNull(eventWithObjectPayload.getSerializedPath()); + Assert.assertNull(eventWithObjectPayload.getUserPayload()); + + InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello"); + // event created by createWithSerializedPath should contain a path + // but neither serialized nor deserialized payload + Assert.assertEquals("file://hello", eventWithPath.getSerializedPath()); + Assert.assertNull(eventWithPath.getUserPayload()); + Assert.assertNull(eventWithPath.getDeserializedUserPayload()); + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index cfbdb19e30..f3e94993ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -272,9 +272,8 @@ public List call() throws Exception { List events = ugi.doAs(new PrivilegedExceptionAction>() { @Override public List run() throws Exception { - LOG.info( - "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() + - " on vertex " + initializerWrapper.getVertexLogIdentifier()); + LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(), + initializerWrapper.getVertexLogIdentifier()); try { TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), initializerWrapper.vertexId); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java index be4ee6068e..1c8c326c54 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java @@ -102,6 +102,11 @@ public Resource getVertexTaskResource() { return vertex.getTaskResource(); } + @Override + public int getVertexId() { + return vertex.getVertexId().getId(); + } + @Override public Resource getTotalAvailableResource() { return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 26bba4d002..39ac77a873 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.hadoop; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,12 +31,15 @@ import java.util.Objects; import com.google.common.base.Function; +import com.google.common.base.Strings; + import org.apache.tez.common.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -72,6 +77,7 @@ import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.protos.MRRuntimeProtos; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; @Public @Unstable @@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) { return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER); } + public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException { + return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromPayload(initEvent) + : readProtoFromFs(initEvent, jobConf); + } + + private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException { + String serializedPath = initEvent.getSerializedPath(); + Path filePath = new Path(serializedPath); + LOG.info("Reading InputDataInformationEvent from path: {}", filePath); + + MRSplitProto splitProto = null; + FileSystem fs = FileSystem.get(filePath.toUri(), jobConf); + + try (FSDataInputStream in = fs.open(filePath)) { + splitProto = MRSplitProto.parseFrom(in); + fs.delete(filePath, false); + } + return splitProto; + } + + private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException { + ByteBuffer payload = initEvent.getUserPayload(); + LOG.info("Reading InputDataInformationEvent from payload, size: {} bytes}", payload.limit()); + return MRSplitProto.parseFrom(ByteString.copyFrom(payload)); + } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index ee907f5d74..8c3d5d5dbb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -27,8 +27,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import com.google.protobuf.ByteString; - import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +70,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.common.Preconditions; + import com.google.common.collect.Lists; /** @@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event"); } Objects.requireNonNull(initEvent, "InitEvent must be specified"); - MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); + MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf); Object splitObj = null; long splitLength = -1; if (useNewApi) { diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java index 5c90f8a9df..4d5aa4b695 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java @@ -104,6 +104,11 @@ public Resource getVertexTaskResource() { return Resource.newInstance(1024, 1); } + @Override + public int getVertexId() { + return 0; + } + @Override public Resource getTotalAvailableResource() { return Resource.newInstance(10240, 10); diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 11b1271a8a..58abe4547c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -42,10 +42,16 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.protobuf.ByteString; + public class TestMRInputHelpers { protected static MiniDFSCluster dfsCluster; @@ -56,9 +62,12 @@ public class TestMRInputHelpers { private static Path oldSplitsDir; private static Path newSplitsDir; - private static String TEST_ROOT_DIR = "target" + private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir"; + private static final Path LOCAL_TEST_ROOT_DIR = new Path("target" + + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); + @BeforeClass public static void setup() throws IOException { try { @@ -188,6 +197,43 @@ public void testInputSplitLocalResourceCreation() throws Exception { MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); } + @Test + public void testInputEventSerializedPayload() throws IOException { + MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); + + InputDataInformationEvent initEvent = + InputDataInformationEvent.createWithSerializedPayload(0, proto.toByteString().asReadOnlyByteBuffer()); + MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf)); + + Assert.assertEquals(proto, protoFromEvent); + } + + @Test + public void testInputEventSerializedPath() throws IOException { + MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); + + FileSystem localFs = FileSystem.getLocal(conf); + Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + + Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload"); + + try (FSDataOutputStream out = localFs.create(serializedPath)) { + proto.writeTo(out); + } + + // event file is present on fs + Assert.assertTrue("Event file should be present on fs", localFs.exists(serializedPath)); + + InputDataInformationEvent initEvent = + InputDataInformationEvent.createWithSerializedPath(0, serializedPath.toUri().toString()); + MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf)); + + Assert.assertEquals(proto, protoFromEvent); + + // event file is deleted after read + Assert.assertFalse("Event file should be deleted after read", localFs.exists(serializedPath)); + } + private void verifyLocationHints(Path inputSplitsDir, List actual) throws Exception { JobID jobId = new JobID("dummy", 1); @@ -232,30 +278,32 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits @Test(timeout = 5000) public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception { FileSystem localFs = FileSystem.getLocal(conf); - Path LOCAL_TEST_ROOT_DIR = new Path("target" - + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); + Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); - try { - localFs.mkdirs(LOCAL_TEST_ROOT_DIR); - - Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); - DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); - - Map localResources = dataSource.getAdditionalLocalFiles(); + Map localResources = dataSource.getAdditionalLocalFiles(); - Assert.assertEquals(2, localResources.size()); - Assert.assertTrue(localResources.containsKey( - MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); - Assert.assertTrue(localResources.containsKey( - MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); + Assert.assertEquals(2, localResources.size()); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); - for (LocalResource lr : localResources.values()) { - Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); - } - } finally { - localFs.delete(LOCAL_TEST_ROOT_DIR, true); + for (LocalResource lr : localResources.values()) { + Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); } } + @Before + public void before() throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + localFs.mkdirs(LOCAL_TEST_ROOT_DIR); + } + + @After + public void after() throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + localFs.delete(LOCAL_TEST_ROOT_DIR, true); + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index 2c9ad86018..eb8e37aeb9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -18,8 +18,11 @@ package org.apache.tez.common; +import com.google.common.base.Charsets; import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; + import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; @@ -135,15 +138,22 @@ public static VertexManagerEvent convertVertexManagerEventFromProto( if (event.getUserPayload() != null) { builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); } + if (event.getSerializedPath() != null) { + builder.setSerializedPath(ByteString.copyFrom(event.getSerializedPath().getBytes(Charsets.UTF_8))); + } return builder.build(); } - public static InputDataInformationEvent - convertRootInputDataInformationEventFromProto( + public static InputDataInformationEvent convertRootInputDataInformationEventFromProto( EventProtos.RootInputDataInformationEventProto proto) { - InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( - proto.getSourceIndex(), - proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null); + ByteBuffer payload = proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null; + InputDataInformationEvent diEvent = null; + if (!proto.getSerializedPath().isEmpty()) { + diEvent = InputDataInformationEvent.createWithSerializedPath(proto.getSourceIndex(), + proto.getSerializedPath().toStringUtf8()); + } else { + diEvent = InputDataInformationEvent.createWithSerializedPayload(proto.getSourceIndex(), payload); + } diEvent.setTargetIndex(proto.getTargetIndex()); return diEvent; } From 40694b77b1ff7d49e25ed7ed8abda4f0d6195367 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Apr 2024 08:17:50 +0200 Subject: [PATCH 2/6] InputDataInformationEvent toString fix --- .../tez/runtime/api/events/InputDataInformationEvent.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index b142bf9800..a81e6482ad 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -114,6 +114,6 @@ public String toString() { return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex=" + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null) + ", deserializedUserPayloadExists=" + (userPayloadObject != null) - + ", serializedPath=" + serializedPath + "]"; - } + + serializedPath != null ? (", serializedPath=" + serializedPath) : "" + "]"; + } } From 564949466ddbd95b00f4314e373ebcf4277ae9e2 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Apr 2024 10:25:28 +0200 Subject: [PATCH 3/6] addressing PR comments --- .../api/events/InputDataInformationEvent.java | 15 ++++++++++----- .../tez/mapreduce/hadoop/MRInputHelpers.java | 4 ++-- .../mapreduce/hadoop/TestMRInputHelpers.java | 18 +++++++++++------- .../org/apache/tez/common/ProtoConverters.java | 2 +- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index a81e6482ad..8cf0616e6b 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -109,11 +109,16 @@ public Object getDeserializedUserPayload() { return this.userPayloadObject; } - @Override public String toString() { - return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex=" - + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null) - + ", deserializedUserPayloadExists=" + (userPayloadObject != null) - + serializedPath != null ? (", serializedPath=" + serializedPath) : "" + "]"; + StringBuilder sb = new StringBuilder(); + sb.append("InputDataInformationEvent [sourceIndex=").append(sourceIndex) + .append(", targetIndex=").append(targetIndex) + .append(", serializedUserPayloadExists=").append(userPayload != null) + .append(", deserializedUserPayloadExists=").append(userPayloadObject != null); + if (serializedPath != null) { + sb.append(", serializedPath=").append(serializedPath); } + sb.append("]"); + return sb.toString(); + } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 39ac77a873..bd96302277 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -906,7 +906,7 @@ private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, LOG.info("Reading InputDataInformationEvent from path: {}", filePath); MRSplitProto splitProto = null; - FileSystem fs = FileSystem.get(filePath.toUri(), jobConf); + FileSystem fs = filePath.getFileSystem(jobConf); try (FSDataInputStream in = fs.open(filePath)) { splitProto = MRSplitProto.parseFrom(in); @@ -917,7 +917,7 @@ private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException { ByteBuffer payload = initEvent.getUserPayload(); - LOG.info("Reading InputDataInformationEvent from payload, size: {} bytes}", payload.limit()); + LOG.info("Reading InputDataInformationEvent from payload: {}}", payload); return MRSplitProto.parseFrom(ByteString.copyFrom(payload)); } } diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 58abe4547c..150956e90f 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.hadoop; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -58,20 +60,22 @@ public class TestMRInputHelpers { private static Configuration conf = new Configuration(); private static FileSystem remoteFs; + private static LocalFileSystem localFs; private static Path testFilePath; private static Path oldSplitsDir; private static Path newSplitsDir; - private static final String TEST_ROOT_DIR = "target" - + Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir"; - private static final Path LOCAL_TEST_ROOT_DIR = new Path("target" - + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); + private static Path TEST_ROOT_DIR; + private static Path LOCAL_TEST_ROOT_DIR; @BeforeClass public static void setup() throws IOException { + TEST_ROOT_DIR = new Path(Files.createTempDirectory(TestMRHelpers.class.getName()).toString()); + LOCAL_TEST_ROOT_DIR = new Path(Files.createTempDirectory(TestMRHelpers.class.getName() + "-local").toString()); + try { - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) .format(true).racks(null).build(); remoteFs = dfsCluster.getFileSystem(); @@ -107,6 +111,8 @@ public static void setup() throws IOException { oldSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirOld/")); newSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirNew/")); + + localFs = FileSystem.getLocal(conf); } @@ -297,13 +303,11 @@ public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exceptio @Before public void before() throws IOException { - FileSystem localFs = FileSystem.getLocal(conf); localFs.mkdirs(LOCAL_TEST_ROOT_DIR); } @After public void after() throws IOException { - FileSystem localFs = FileSystem.getLocal(conf); localFs.delete(LOCAL_TEST_ROOT_DIR, true); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index eb8e37aeb9..bf68944143 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -148,7 +148,7 @@ public static InputDataInformationEvent convertRootInputDataInformationEventFrom EventProtos.RootInputDataInformationEventProto proto) { ByteBuffer payload = proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null; InputDataInformationEvent diEvent = null; - if (!proto.getSerializedPath().isEmpty()) { + if (proto.hasSerializedPath()) { diEvent = InputDataInformationEvent.createWithSerializedPath(proto.getSourceIndex(), proto.getSerializedPath().toStringUtf8()); } else { From 8b5a97c12a850434eae172e24ddf99e8deb86c88 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Apr 2024 14:55:03 +0200 Subject: [PATCH 4/6] checkstyle --- .../mapreduce/hadoop/TestMRInputHelpers.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 150956e90f..c97158d141 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -65,17 +65,16 @@ public class TestMRInputHelpers { private static Path oldSplitsDir; private static Path newSplitsDir; - - private static Path TEST_ROOT_DIR; - private static Path LOCAL_TEST_ROOT_DIR; + private static Path testRootDir; + private static Path localTestRootDir; @BeforeClass public static void setup() throws IOException { - TEST_ROOT_DIR = new Path(Files.createTempDirectory(TestMRHelpers.class.getName()).toString()); - LOCAL_TEST_ROOT_DIR = new Path(Files.createTempDirectory(TestMRHelpers.class.getName() + "-local").toString()); + testRootDir = new Path(Files.createTempDirectory(TestMRHelpers.class.getName()).toString()); + localTestRootDir = new Path(Files.createTempDirectory(TestMRHelpers.class.getName() + "-local").toString()); try { - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString()); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testRootDir.toString()); dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) .format(true).racks(null).build(); remoteFs = dfsCluster.getFileSystem(); @@ -219,7 +218,7 @@ public void testInputEventSerializedPath() throws IOException { MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); FileSystem localFs = FileSystem.getLocal(conf); - Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + Path splitsDir = localFs.resolvePath(localTestRootDir); Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload"); @@ -284,7 +283,7 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits @Test(timeout = 5000) public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception { FileSystem localFs = FileSystem.getLocal(conf); - Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + Path splitsDir = localFs.resolvePath(localTestRootDir); DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); @@ -303,11 +302,11 @@ public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exceptio @Before public void before() throws IOException { - localFs.mkdirs(LOCAL_TEST_ROOT_DIR); + localFs.mkdirs(localTestRootDir); } @After public void after() throws IOException { - localFs.delete(LOCAL_TEST_ROOT_DIR, true); + localFs.delete(localTestRootDir, true); } } From 42ab261a53a549a81a4a62721eb23e238223f176 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Apr 2024 14:58:55 +0200 Subject: [PATCH 5/6] removed useless curly bracket --- .../java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index bd96302277..3c47d5986d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -917,7 +917,7 @@ private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException { ByteBuffer payload = initEvent.getUserPayload(); - LOG.info("Reading InputDataInformationEvent from payload: {}}", payload); + LOG.info("Reading InputDataInformationEvent from payload: {}", payload); return MRSplitProto.parseFrom(ByteString.copyFrom(payload)); } } From 9dce61ed71a8384d23fc801acc3d737f994cb824 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Tue, 2 Apr 2024 15:19:43 +0200 Subject: [PATCH 6/6] checkstyle pt 2 --- .../org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index c97158d141..a7501e8aed 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -217,7 +217,6 @@ public void testInputEventSerializedPayload() throws IOException { public void testInputEventSerializedPath() throws IOException { MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); - FileSystem localFs = FileSystem.getLocal(conf); Path splitsDir = localFs.resolvePath(localTestRootDir); Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload"); @@ -282,7 +281,6 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits @Test(timeout = 5000) public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception { - FileSystem localFs = FileSystem.getLocal(conf); Path splitsDir = localFs.resolvePath(localTestRootDir); DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir);