Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4518: Added capability to limit number of spill files being generated #312

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ private TezRuntimeConfiguration() {}
public static final int TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT =
1024 * 1024;

@ConfigurationProperty(type = "integer")
public static final String TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT = TEZ_RUNTIME_PREFIX +
"sort.spill.files.count.limit";
public static final int TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;


// TODO Use the default value
@ConfigurationProperty(type = "integer")
Expand Down Expand Up @@ -616,6 +621,7 @@ private TezRuntimeConfiguration() {}
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_PERCENT);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IO_SORT_MB);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
TEZ_RUNTIME_KEYS.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.Lists;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,6 +68,7 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT;
import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -110,6 +110,8 @@ public class PipelinedSorter extends ExternalSorter {
private final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();

private final int spillFilesCountLimit;

private final boolean pipelinedShuffle;

private long currentAllocatableMemory;
Expand All @@ -130,6 +132,8 @@ public class PipelinedSorter extends ExternalSorter {
*/
private final List<Event> finalEvents;

private static final int SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;

// TODO Set additional countesr - total bytes written, spills etc.

public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs,
Expand Down Expand Up @@ -171,6 +175,14 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO
pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle;
auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);

spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT,
TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT);
Preconditions.checkArgument(spillFilesCountLimit == SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
|| spillFilesCountLimit > 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT
+ " should be greater than 0 or unbounded");

//sanity checks
final long sortmb = this.availableMemoryMb;

Expand Down Expand Up @@ -542,7 +554,7 @@ private void spillSingleRecord(final Object key, final Object value,
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
Expand Down Expand Up @@ -633,7 +645,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
spillRec.writeToFile(indexFilename, conf, localFs);
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen());
//No final merge. Set the number of files offered via shuffle-handler
Expand Down Expand Up @@ -1503,4 +1515,18 @@ public TezRawKeyValueIterator filter(int partition) {
}

}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import org.apache.tez.common.Preconditions;

import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT;
import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -123,12 +124,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
final ArrayList<TezSpillRecord> indexCacheList =
new ArrayList<TezSpillRecord>();
private final int indexCacheMemoryLimit;
private final int spillFilesCountLimit;
private int totalIndexCacheMemory;

private long totalKeys = 0;
private long sameKey = 0;

public static final int MAX_IO_SORT_MB = 1800;
private static final int SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;


public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs,
Expand All @@ -148,6 +151,13 @@ public DefaultSorter(OutputContext outputContext, Configuration conf, int numOut
indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT);

spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT,
TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT);
Preconditions.checkArgument(spillFilesCountLimit == SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
|| spillFilesCountLimit > 0,
TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT
+ " should be greater than 0 or unbounded");

boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT);
Expand Down Expand Up @@ -978,7 +988,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun
}
LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
+ " at " + filename.toString());
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
Expand Down Expand Up @@ -1057,7 +1067,7 @@ private void spillSingleRecord(final Object key, final Object value,
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
numShuffleChunks.setValue(numSpills);
} else if (numSpills > 1) {
Expand Down Expand Up @@ -1312,7 +1322,7 @@ private void mergeParts() throws IOException, InterruptedException {
} finally {
finalOut.close();
}
++numSpills;
incrementNumSpills();
if (!isFinalMergeEnabled()) {
List<Event> events = Lists.newLinkedList();
maybeSendEventForSpill(events, true, sr, 0, true);
Expand Down Expand Up @@ -1399,4 +1409,18 @@ private void mergeParts() throws IOException, InterruptedException {
}
}
}

/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this duplicated logic go to ExternalSorter instead of all children (PipelinedSorter, DefaultSorter)?

++numSpills;
if(spillFilesCountLimit != SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -116,6 +121,9 @@ public void setup() throws IOException {
this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
}

@Rule
public ExpectedException exception = ExpectedException.none();

public static Configuration getConf() {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
Expand Down Expand Up @@ -858,6 +866,75 @@ public void testWithLargeKeyValueWithMinBlockSize() throws IOException {
basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
this.numOutputs = 10;

//128 MB. Do not pre-allocate.
// Get 32 MB buffer first and the another buffer with 96 on filling up
// the 32 MB buffer.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
conf.setInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, -2);

exception.expect(IllegalArgumentException.class);
exception.expectMessage("tez.runtime.sort.spill.files.count.limit should be greater than 0 or unbounded");

PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
numOutputs, (128L << 20));

closeSorter(sorter);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
this.numOutputs = 10;

//128 MB. Do not pre-allocate.
// Get 32 MB buffer first and the another buffer with 96 on filling up
// the 32 MB buffer.
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true);
conf.setInt(TezRuntimeConfiguration
.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, 2);

PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf,
numOutputs, (128L << 20));

Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills");
numSpillsField.setAccessible(true);
numSpillsField.set(sorter, 2);

Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(sorter);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

Assert.assertTrue(gotExceptionWithMessage);
}

private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import static org.mockito.internal.verification.VerificationModeFactory.times;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -81,7 +84,9 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -129,6 +134,9 @@ public void reset() throws IOException {
localFs.mkdirs(workingDir);
}

@Rule
public ExpectedException exception = ExpectedException.none();

@Test(timeout = 5000)
public void testSortSpillPercent() throws Exception {
OutputContext context = createTezOutputContext();
Expand Down Expand Up @@ -577,6 +585,71 @@ public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
verifyCounters(sorter, context);
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
OutputContext context = createTezOutputContext();

conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, -2);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);

exception.expect(IllegalArgumentException.class);
exception.expectMessage("tez.runtime.sort.spill.files.count.limit should be greater than 0 or unbounded");

SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
sorterWrapper.getSorter();
sorterWrapper.close();
}

@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException,
NoSuchFieldException, IllegalAccessException {
OutputContext context = createTezOutputContext();

conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, 2);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();

Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills");
numSpillsField.setAccessible(true);
numSpillsField.set(sorter, 2);

Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(sorter);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"tez.runtime.sort.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}

Assert.assertTrue(gotExceptionWithMessage);
}

private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Expand Down