Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlling num spills via config #2

Open
wants to merge 3 commits into
base: fdp-master-0.9.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public class TezRuntimeConfiguration {
public static final int TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT =
1024 * 1024;

@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT = TEZ_RUNTIME_PREFIX +
"spill.files.count.limit";
public static final int TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;


// TODO Use the default value
@ConfigurationProperty(type = "integer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT;

@SuppressWarnings({"unchecked", "rawtypes"})
public class PipelinedSorter extends ExternalSorter {
Expand Down Expand Up @@ -110,6 +111,8 @@ public class PipelinedSorter extends ExternalSorter {
private final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();

private final int spillFilesCountLimit;

private final boolean pipelinedShuffle;

private long currentAllocatableMemory;
Expand All @@ -123,6 +126,8 @@ public class PipelinedSorter extends ExternalSorter {
private final Deflater deflater;
private final String auxiliaryService;

private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;

// TODO Set additional countesr - total bytes written, spills etc.

public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
Expand Down Expand Up @@ -164,6 +169,14 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO
pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);

spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT,
TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT);
Preconditions.checkArgument(spillFilesCountLimit == SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
|| spillFilesCountLimit > 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT
+ " should be greater than 0 or unbounded");

//sanity checks
final long sortmb = this.availableMemoryMb;

Expand Down Expand Up @@ -532,7 +545,7 @@ private void spillSingleRecord(final Object key, final Object value,
spillRec.writeToFile(indexFilename, conf);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
Expand Down Expand Up @@ -622,7 +635,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
spillRec.writeToFile(indexFilename, conf);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
Expand Down Expand Up @@ -1179,7 +1192,7 @@ public int size() {
public int compareTo(SpanIterator other) {
return span.compareInternal(other.getKey(), other.getPartition(), kvindex);
}

@Override
public String toString() {
return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString());
Expand Down Expand Up @@ -1482,4 +1495,18 @@ public TezRawKeyValueIterator filter(int partition) {
}

}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"tez.runtime.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.google.common.base.Preconditions;

import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS;
import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT;

@SuppressWarnings({"unchecked", "rawtypes"})
public final class DefaultSorter extends ExternalSorter implements IndexedSortable {
Expand Down Expand Up @@ -124,12 +125,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private final int indexCacheMemoryLimit;
private final int spillFilesCountLimit;
private int totalIndexCacheMemory;

private long totalKeys = 0;
private long sameKey = 0;

public static final int MAX_IO_SORT_MB = 1800;
private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;


public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
Expand All @@ -149,6 +152,13 @@ public DefaultSorter(OutputContext outputContext, Configuration conf, int numOut
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);

spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT,
TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT);
Preconditions.checkArgument(spillFilesCountLimit == SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
|| spillFilesCountLimit > 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT
+ " should be greater than 0 or unbounded");

boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
Expand Down Expand Up @@ -979,7 +989,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun
}
LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills
+ " at " + filename.toString());
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
Expand Down Expand Up @@ -1059,7 +1069,7 @@ private void spillSingleRecord(final Object key, final Object value,
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
Expand Down Expand Up @@ -1313,7 +1323,7 @@ private void mergeParts() throws IOException, InterruptedException {
} finally {
finalOut.close();
}
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
List<Event> events = Lists.newLinkedList();
maybeSendEventForSpill(events, true, sr, 0, true);
Expand Down Expand Up @@ -1399,4 +1409,18 @@ private void mergeParts() throws IOException, InterruptedException {
}
}
}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"tez.runtime.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,23 @@
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.TestDefaultSorter;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.testutils.RandomTextGenerator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -115,6 +122,9 @@ public void setup() throws IOException {
this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
}

@Rule
public ExpectedException exception = ExpectedException.none();

public static Configuration getConf() {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
Expand Down Expand Up @@ -827,6 +837,75 @@ public void testWithLargeKeyValueWithMinBlockSize() throws IOException {
basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException {
this.numOutputs = 10;

//128 MB. Do not pre-allocate.
// Get 32 MB buffer first and the another buffer with 96 on filling up
// the 32 MB buffer.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
conf.setInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, -2);

exception.expect(IllegalArgumentException.class);
exception.expectMessage("tez.runtime.spill.files.count.limit should be greater than 0 or unbounded");

PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
numOutputs, (128l << 20));

closeSorter(sorter);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException {
this.numOutputs = 10;

//128 MB. Do not pre-allocate.
// Get 32 MB buffer first and the another buffer with 96 on filling up
// the 32 MB buffer.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
conf.setInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, 2);

PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
numOutputs, (128l << 20));

Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills");
numSpillsField.setAccessible(true);
numSpillsField.set(sorter, 2);

Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(sorter);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"tez.runtime.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

closeSorter(sorter);

Assert.assertTrue(gotExceptionWithMessage);
}

private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import static org.mockito.internal.verification.VerificationModeFactory.times;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -82,7 +85,9 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -130,6 +135,9 @@ public void reset() throws IOException {
localFs.mkdirs(workingDir);
}

@Rule
public ExpectedException exception = ExpectedException.none();

@Test(timeout = 5000)
public void testSortSpillPercent() throws Exception {
OutputContext context = createTezOutputContext();
Expand Down Expand Up @@ -578,6 +586,71 @@ public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
verifyCounters(sorter, context);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException {
OutputContext context = createTezOutputContext();

conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, -2);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);

exception.expect(IllegalArgumentException.class);
exception.expectMessage("tez.runtime.spill.files.count.limit should be greater than 0 or unbounded");

SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
sorterWrapper.getSorter();
sorterWrapper.close();
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException {
OutputContext context = createTezOutputContext();

conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, 2);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();

Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills");
numSpillsField.setAccessible(true);
numSpillsField.set(sorter, 2);

Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(sorter);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"tez.runtime.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

sorterWrapper.close();

Assert.assertTrue(gotExceptionWithMessage);
}

private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Expand Down