diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java index 7c9562e7c9..5cb2d221b3 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java @@ -87,7 +87,12 @@ public interface InputInitializerContext { * @return Resource */ Resource getVertexTaskResource(); - + + /** + * Get the vertex id as integer that belongs to this input. + */ + int getVertexId(); + /** * Get the total resource allocated to this vertex. If the DAG is running in * a busy cluster then it may have no resources available dedicated to it. The diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java index a62a34154f..8cf0616e6b 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputDataInformationEvent.java @@ -49,8 +49,8 @@ public final class InputDataInformationEvent extends Event { private final int sourceIndex; private int targetIndex; // TODO Likely to be multiple at a later point. private final ByteBuffer userPayload; + private String serializedPath; private final Object userPayloadObject; - private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) { this.sourceIndex = srcIndex; @@ -79,6 +79,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex, return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null); } + public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) { + InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null); + event.serializedPath = serializedPath; + return event; + } + public int getSourceIndex() { return this.sourceIndex; } @@ -90,19 +96,29 @@ public int getTargetIndex() { public void setTargetIndex(int target) { this.targetIndex = target; } - + + public String getSerializedPath() { + return serializedPath; + } + public ByteBuffer getUserPayload() { return userPayload == null ? null : userPayload.asReadOnlyBuffer(); } - + public Object getDeserializedUserPayload() { return this.userPayloadObject; } - @Override public String toString() { - return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex=" - + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null) - + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]"; - } + StringBuilder sb = new StringBuilder(); + sb.append("InputDataInformationEvent [sourceIndex=").append(sourceIndex) + .append(", targetIndex=").append(targetIndex) + .append(", serializedUserPayloadExists=").append(userPayload != null) + .append(", deserializedUserPayloadExists=").append(userPayloadObject != null); + if (serializedPath != null) { + sb.append(", serializedPath=").append(serializedPath); + } + sb.append("]"); + return sb.toString(); + } } diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto index 9949b0bc8c..05896ac62e 100644 --- a/tez-api/src/main/proto/Events.proto +++ b/tez-api/src/main/proto/Events.proto @@ -58,6 +58,7 @@ message RootInputDataInformationEventProto { optional int32 source_index = 1; optional int32 target_index = 2; optional bytes user_payload = 3; + optional bytes serialized_path = 4; } message CompositeEventProto { diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java similarity index 90% rename from tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java rename to tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java index 7dce6991a4..98c238a1dd 100644 --- a/tez-api/src/test/java/org/apache/tez/runtime/api/event/TestCompositeDataMovementEvent.java +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestCompositeDataMovementEvent.java @@ -16,12 +16,10 @@ * limitations under the License. */ -package org.apache.tez.runtime.api.event; +package org.apache.tez.runtime.api.events; import java.nio.ByteBuffer; -import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.DataMovementEvent; import org.junit.Assert; import org.junit.Test; diff --git a/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java new file mode 100644 index 0000000000..6e002e26c4 --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/runtime/api/events/TestInputDataInformationEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.api.events; + +import java.nio.ByteBuffer; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Charsets; + +public class TestInputDataInformationEvent { + + @Test + public void testApiPayloadOrPath() { + InputDataInformationEvent eventWithSerializedPayload = + InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes())); + // event created by createWithSerializedPayload should contain serialized payload + // but not a path or a deserialized payload + Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString()); + Assert.assertNull(eventWithSerializedPayload.getSerializedPath()); + Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload()); + + InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2"); + // event created by eventWithObjectPayload should contain a deserialized payload + // but not a path or serialized payload + Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload()); + Assert.assertNull(eventWithObjectPayload.getSerializedPath()); + Assert.assertNull(eventWithObjectPayload.getUserPayload()); + + InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello"); + // event created by createWithSerializedPath should contain a path + // but neither serialized nor deserialized payload + Assert.assertEquals("file://hello", eventWithPath.getSerializedPath()); + Assert.assertNull(eventWithPath.getUserPayload()); + Assert.assertNull(eventWithPath.getDeserializedUserPayload()); + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index cfbdb19e30..f3e94993ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -272,9 +272,8 @@ public List call() throws Exception { List events = ugi.doAs(new PrivilegedExceptionAction>() { @Override public List run() throws Exception { - LOG.info( - "Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() + - " on vertex " + initializerWrapper.getVertexLogIdentifier()); + LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(), + initializerWrapper.getVertexLogIdentifier()); try { TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), initializerWrapper.vertexId); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java index be4ee6068e..1c8c326c54 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java @@ -102,6 +102,11 @@ public Resource getVertexTaskResource() { return vertex.getTaskResource(); } + @Override + public int getVertexId() { + return vertex.getVertexId().getId(); + } + @Override public Resource getTotalAvailableResource() { return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier()); diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 26bba4d002..3c47d5986d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.hadoop; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,12 +31,15 @@ import java.util.Objects; import com.google.common.base.Function; +import com.google.common.base.Strings; + import org.apache.tez.common.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -72,6 +77,7 @@ import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.protos.MRRuntimeProtos; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; @Public @Unstable @@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) { return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER); } + public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException { + return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromPayload(initEvent) + : readProtoFromFs(initEvent, jobConf); + } + + private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException { + String serializedPath = initEvent.getSerializedPath(); + Path filePath = new Path(serializedPath); + LOG.info("Reading InputDataInformationEvent from path: {}", filePath); + + MRSplitProto splitProto = null; + FileSystem fs = filePath.getFileSystem(jobConf); + + try (FSDataInputStream in = fs.open(filePath)) { + splitProto = MRSplitProto.parseFrom(in); + fs.delete(filePath, false); + } + return splitProto; + } + + private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException { + ByteBuffer payload = initEvent.getUserPayload(); + LOG.info("Reading InputDataInformationEvent from payload: {}", payload); + return MRSplitProto.parseFrom(ByteString.copyFrom(payload)); + } } diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java index ee907f5d74..8c3d5d5dbb 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java @@ -27,8 +27,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import com.google.protobuf.ByteString; - import org.apache.tez.runtime.api.ProgressFailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +70,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.common.Preconditions; + import com.google.common.collect.Lists; /** @@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event"); } Objects.requireNonNull(initEvent, "InitEvent must be specified"); - MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload())); + MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf); Object splitObj = null; long splitLength = -1; if (useNewApi) { diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java index 5c90f8a9df..4d5aa4b695 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java @@ -104,6 +104,11 @@ public Resource getVertexTaskResource() { return Resource.newInstance(1024, 1); } + @Override + public int getVertexId() { + return 0; + } + @Override public Resource getTotalAvailableResource() { return Resource.newInstance(10240, 10); diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java index 11b1271a8a..a7501e8aed 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java @@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.hadoop; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -42,27 +44,37 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import com.google.protobuf.ByteString; + public class TestMRInputHelpers { protected static MiniDFSCluster dfsCluster; private static Configuration conf = new Configuration(); private static FileSystem remoteFs; + private static LocalFileSystem localFs; private static Path testFilePath; private static Path oldSplitsDir; private static Path newSplitsDir; - private static String TEST_ROOT_DIR = "target" - + Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir"; + private static Path testRootDir; + private static Path localTestRootDir; @BeforeClass public static void setup() throws IOException { + testRootDir = new Path(Files.createTempDirectory(TestMRHelpers.class.getName()).toString()); + localTestRootDir = new Path(Files.createTempDirectory(TestMRHelpers.class.getName() + "-local").toString()); + try { - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testRootDir.toString()); dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) .format(true).racks(null).build(); remoteFs = dfsCluster.getFileSystem(); @@ -98,6 +110,8 @@ public static void setup() throws IOException { oldSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirOld/")); newSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirNew/")); + + localFs = FileSystem.getLocal(conf); } @@ -188,6 +202,42 @@ public void testInputSplitLocalResourceCreation() throws Exception { MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); } + @Test + public void testInputEventSerializedPayload() throws IOException { + MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); + + InputDataInformationEvent initEvent = + InputDataInformationEvent.createWithSerializedPayload(0, proto.toByteString().asReadOnlyByteBuffer()); + MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf)); + + Assert.assertEquals(proto, protoFromEvent); + } + + @Test + public void testInputEventSerializedPath() throws IOException { + MRSplitProto proto = MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom("splits".getBytes())).build(); + + Path splitsDir = localFs.resolvePath(localTestRootDir); + + Path serializedPath = new Path(splitsDir + Path.SEPARATOR + "splitpayload"); + + try (FSDataOutputStream out = localFs.create(serializedPath)) { + proto.writeTo(out); + } + + // event file is present on fs + Assert.assertTrue("Event file should be present on fs", localFs.exists(serializedPath)); + + InputDataInformationEvent initEvent = + InputDataInformationEvent.createWithSerializedPath(0, serializedPath.toUri().toString()); + MRSplitProto protoFromEvent = MRInputHelpers.getProto(initEvent, new JobConf(conf)); + + Assert.assertEquals(proto, protoFromEvent); + + // event file is deleted after read + Assert.assertFalse("Event file should be deleted after read", localFs.exists(serializedPath)); + } + private void verifyLocationHints(Path inputSplitsDir, List actual) throws Exception { JobID jobId = new JobID("dummy", 1); @@ -231,31 +281,30 @@ private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplits @Test(timeout = 5000) public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception { - FileSystem localFs = FileSystem.getLocal(conf); - Path LOCAL_TEST_ROOT_DIR = new Path("target" - + Path.SEPARATOR + TestMRHelpers.class.getName() + "-localtmpDir"); - - try { - localFs.mkdirs(LOCAL_TEST_ROOT_DIR); - - Path splitsDir = localFs.resolvePath(LOCAL_TEST_ROOT_DIR); + Path splitsDir = localFs.resolvePath(localTestRootDir); - DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); + DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(splitsDir); - Map localResources = dataSource.getAdditionalLocalFiles(); + Map localResources = dataSource.getAdditionalLocalFiles(); - Assert.assertEquals(2, localResources.size()); - Assert.assertTrue(localResources.containsKey( - MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); - Assert.assertTrue(localResources.containsKey( - MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); + Assert.assertEquals(2, localResources.size()); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_RESOURCE_NAME)); + Assert.assertTrue(localResources.containsKey( + MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME)); - for (LocalResource lr : localResources.values()) { - Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); - } - } finally { - localFs.delete(LOCAL_TEST_ROOT_DIR, true); + for (LocalResource lr : localResources.values()) { + Assert.assertFalse(lr.getResource().getScheme().contains(remoteFs.getScheme())); } } + @Before + public void before() throws IOException { + localFs.mkdirs(localTestRootDir); + } + + @After + public void after() throws IOException { + localFs.delete(localTestRootDir, true); + } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java index 2c9ad86018..bf68944143 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -18,8 +18,11 @@ package org.apache.tez.common; +import com.google.common.base.Charsets; import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; + import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.CustomProcessorEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; @@ -135,15 +138,22 @@ public static VertexManagerEvent convertVertexManagerEventFromProto( if (event.getUserPayload() != null) { builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); } + if (event.getSerializedPath() != null) { + builder.setSerializedPath(ByteString.copyFrom(event.getSerializedPath().getBytes(Charsets.UTF_8))); + } return builder.build(); } - public static InputDataInformationEvent - convertRootInputDataInformationEventFromProto( + public static InputDataInformationEvent convertRootInputDataInformationEventFromProto( EventProtos.RootInputDataInformationEventProto proto) { - InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload( - proto.getSourceIndex(), - proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null); + ByteBuffer payload = proto.hasUserPayload() ? proto.getUserPayload().asReadOnlyByteBuffer() : null; + InputDataInformationEvent diEvent = null; + if (proto.hasSerializedPath()) { + diEvent = InputDataInformationEvent.createWithSerializedPath(proto.getSourceIndex(), + proto.getSerializedPath().toStringUtf8()); + } else { + diEvent = InputDataInformationEvent.createWithSerializedPayload(proto.getSourceIndex(), payload); + } diEvent.setTargetIndex(proto.getTargetIndex()); return diEvent; }