Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4548: InputDataInformationEvent to be read from serialized payload from filesystem #341

Merged
merged 6 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public interface InputInitializerContext {
* @return Resource
*/
Resource getVertexTaskResource();


/**
* Get the vertex id as integer that belongs to this input.
*/
int getVertexId();
abstractdog marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the total resource allocated to this vertex. If the DAG is running in
* a busy cluster then it may have no resources available dedicated to it. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public final class InputDataInformationEvent extends Event {
private final int sourceIndex;
private int targetIndex; // TODO Likely to be multiple at a later point.
private final ByteBuffer userPayload;
private String serializedPath;
private final Object userPayloadObject;


private InputDataInformationEvent(int srcIndex, ByteBuffer userPayload) {
this.sourceIndex = srcIndex;
Expand Down Expand Up @@ -79,6 +79,12 @@ public static InputDataInformationEvent createWithObjectPayload(int srcIndex,
return new InputDataInformationEvent(srcIndex, userPayloadDeserialized, null);
}

public static InputDataInformationEvent createWithSerializedPath(int srcIndex, String serializedPath) {
InputDataInformationEvent event = new InputDataInformationEvent(srcIndex, null);
event.serializedPath = serializedPath;
abstractdog marked this conversation as resolved.
Show resolved Hide resolved
return event;
}

public int getSourceIndex() {
return this.sourceIndex;
}
Expand All @@ -90,19 +96,29 @@ public int getTargetIndex() {
public void setTargetIndex(int target) {
this.targetIndex = target;
}


public String getSerializedPath() {
return serializedPath;
}

public ByteBuffer getUserPayload() {
return userPayload == null ? null : userPayload.asReadOnlyBuffer();
}

public Object getDeserializedUserPayload() {
return this.userPayloadObject;
}

@Override
public String toString() {
return "InputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+ targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
+ ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]";
}
StringBuilder sb = new StringBuilder();
sb.append("InputDataInformationEvent [sourceIndex=").append(sourceIndex)
.append(", targetIndex=").append(targetIndex)
.append(", serializedUserPayloadExists=").append(userPayload != null)
.append(", deserializedUserPayloadExists=").append(userPayloadObject != null);
if (serializedPath != null) {
sb.append(", serializedPath=").append(serializedPath);
}
sb.append("]");
return sb.toString();
}
}
1 change: 1 addition & 0 deletions tez-api/src/main/proto/Events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message RootInputDataInformationEventProto {
optional int32 source_index = 1;
optional int32 target_index = 2;
optional bytes user_payload = 3;
optional bytes serialized_path = 4;
}

message CompositeEventProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
* limitations under the License.
*/

package org.apache.tez.runtime.api.event;
package org.apache.tez.runtime.api.events;

import java.nio.ByteBuffer;

import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.junit.Assert;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.api.events;

import java.nio.ByteBuffer;

import org.junit.Assert;
import org.junit.Test;

import com.google.common.base.Charsets;

public class TestInputDataInformationEvent {

@Test
public void testApiPayloadOrPath() {
InputDataInformationEvent eventWithSerializedPayload =
InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap("payload1".getBytes()));
// event created by createWithSerializedPayload should contain serialized payload
// but not a path or a deserialized payload
Assert.assertEquals("payload1", Charsets.UTF_8.decode(eventWithSerializedPayload.getUserPayload()).toString());
Assert.assertNull(eventWithSerializedPayload.getSerializedPath());
Assert.assertNull(eventWithSerializedPayload.getDeserializedUserPayload());

InputDataInformationEvent eventWithObjectPayload = InputDataInformationEvent.createWithObjectPayload(0, "payload2");
// event created by eventWithObjectPayload should contain a deserialized payload
// but not a path or serialized payload
Assert.assertEquals("payload2", eventWithObjectPayload.getDeserializedUserPayload());
Assert.assertNull(eventWithObjectPayload.getSerializedPath());
Assert.assertNull(eventWithObjectPayload.getUserPayload());

InputDataInformationEvent eventWithPath = InputDataInformationEvent.createWithSerializedPath(0, "file://hello");
// event created by createWithSerializedPath should contain a path
// but neither serialized nor deserialized payload
Assert.assertEquals("file://hello", eventWithPath.getSerializedPath());
Assert.assertNull(eventWithPath.getUserPayload());
Assert.assertNull(eventWithPath.getDeserializedUserPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ public List<Event> call() throws Exception {
List<Event> events = ugi.doAs(new PrivilegedExceptionAction<List<Event>>() {
@Override
public List<Event> run() throws Exception {
LOG.info(
"Starting InputInitializer for Input: " + initializerWrapper.getInput().getName() +
" on vertex " + initializerWrapper.getVertexLogIdentifier());
LOG.info("Starting InputInitializer for Input: {} on vertex {}", initializerWrapper.getInput().getName(),
initializerWrapper.getVertexLogIdentifier());
try {
TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(),
initializerWrapper.vertexId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public Resource getVertexTaskResource() {
return vertex.getTaskResource();
}

@Override
public int getVertexId() {
return vertex.getVertexId().getId();
}

@Override
public Resource getTotalAvailableResource() {
return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.mapreduce.hadoop;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -30,19 +31,23 @@
import java.util.Objects;

import com.google.common.base.Function;
import com.google.common.base.Strings;

import org.apache.tez.common.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -72,6 +77,7 @@
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;

@Public
@Unstable
Expand Down Expand Up @@ -889,4 +895,29 @@ public static int getDagAttemptNumber(Configuration conf) {
return getIntProperty(conf, MRInput.TEZ_MAPREDUCE_DAG_ATTEMPT_NUMBER);
}

public static MRSplitProto getProto(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
return Strings.isNullOrEmpty(initEvent.getSerializedPath()) ? readProtoFromPayload(initEvent)
: readProtoFromFs(initEvent, jobConf);
}

private static MRSplitProto readProtoFromFs(InputDataInformationEvent initEvent, JobConf jobConf) throws IOException {
String serializedPath = initEvent.getSerializedPath();
Path filePath = new Path(serializedPath);
LOG.info("Reading InputDataInformationEvent from path: {}", filePath);

MRSplitProto splitProto = null;
FileSystem fs = filePath.getFileSystem(jobConf);

try (FSDataInputStream in = fs.open(filePath)) {
splitProto = MRSplitProto.parseFrom(in);
fs.delete(filePath, false);
abstractdog marked this conversation as resolved.
Show resolved Hide resolved
}
return splitProto;
}

private static MRSplitProto readProtoFromPayload(InputDataInformationEvent initEvent) throws IOException {
ByteBuffer payload = initEvent.getUserPayload();
LOG.info("Reading InputDataInformationEvent from payload: {}", payload);
return MRSplitProto.parseFrom(ByteString.copyFrom(payload));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.ProgressFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,6 +70,7 @@
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;

import org.apache.tez.common.Preconditions;

import com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -672,7 +671,7 @@ private void initFromEventInternal(InputDataInformationEvent initEvent) throws I
LOG.debug(getContext().getInputOutputVertexNames() + " initializing RecordReader from event");
}
Objects.requireNonNull(initEvent, "InitEvent must be specified");
MRSplitProto splitProto = MRSplitProto.parseFrom(ByteString.copyFrom(initEvent.getUserPayload()));
MRSplitProto splitProto = MRInputHelpers.getProto(initEvent, jobConf);
Object splitObj = null;
long splitLength = -1;
if (useNewApi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ public Resource getVertexTaskResource() {
return Resource.newInstance(1024, 1);
}

@Override
public int getVertexId() {
return 0;
}

@Override
public Resource getTotalAvailableResource() {
return Resource.newInstance(10240, 10);
Expand Down
Loading
Loading