diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 85c53a5098..52d6ba977d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -121,6 +121,11 @@ public class TezRuntimeConfiguration { public static final int TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT = 1024 * 1024; + @ConfigurationProperty(type = "integer") + public static final String TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT = TEZ_RUNTIME_PREFIX + + "spill.files.count.limit"; + public static final int TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT = -1; + // TODO Use the default value @ConfigurationProperty(type = "integer") diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 7915662bbe..69d8b37b3d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -70,6 +70,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS; +import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT; @SuppressWarnings({"unchecked", "rawtypes"}) public class PipelinedSorter extends ExternalSorter { @@ -110,6 +111,8 @@ public class PipelinedSorter extends ExternalSorter { private final ArrayList indexCacheList = new ArrayList(); + private final int spillFilesCountLimit; + private final boolean pipelinedShuffle; private long currentAllocatableMemory; @@ -123,6 +126,8 @@ public class PipelinedSorter extends ExternalSorter { private final Deflater deflater; private final String auxiliaryService; + private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1; + // TODO Set additional countesr - total bytes written, spills etc. public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, @@ -164,6 +169,14 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle; auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + + spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, + TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT); + Preconditions.checkArgument(spillFilesCountLimit == SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + || spillFilesCountLimit > 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT + + " should be greater than 0 or unbounded"); + //sanity checks final long sortmb = this.availableMemoryMb; @@ -532,7 +545,7 @@ private void spillSingleRecord(final Object key, final Object value, spillRec.writeToFile(indexFilename, conf); //TODO: honor cache limits indexCacheList.add(spillRec); - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); //No final merge. Set the number of files offered via shuffle-handler @@ -622,7 +635,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException { spillRec.writeToFile(indexFilename, conf); //TODO: honor cache limits indexCacheList.add(spillRec); - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); //No final merge. Set the number of files offered via shuffle-handler @@ -1179,7 +1192,7 @@ public int size() { public int compareTo(SpanIterator other) { return span.compareInternal(other.getKey(), other.getPartition(), kvindex); } - + @Override public String toString() { return String.format("SpanIterator<%d:%d> (span=%s)", kvindex, maxindex, span.toString()); @@ -1482,4 +1495,18 @@ public TezRawKeyValueIterator filter(int partition) { } } + + /** + * Increments numSpills local counter by taking into consideration + * the max limit on spill files being generated by the job. + * If limit is reached, this function throws an IOException + */ + private void incrementNumSpills() throws IOException { + ++numSpills; + if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) { + throw new IOException("Too many spill files got created, control it with " + + "tez.runtime.spill.files.count.limit, current value: " + spillFilesCountLimit + + ", current spill count: " + numSpills); + } + } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 557a53855f..c0c67fd047 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -65,6 +65,7 @@ import com.google.common.base.Preconditions; import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.SPILL_FILE_PERMS; +import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT; @SuppressWarnings({"unchecked", "rawtypes"}) public final class DefaultSorter extends ExternalSorter implements IndexedSortable { @@ -124,12 +125,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab final ArrayList indexCacheList = new ArrayList(); private final int indexCacheMemoryLimit; + private final int spillFilesCountLimit; private int totalIndexCacheMemory; private long totalKeys = 0; private long sameKey = 0; public static final int MAX_IO_SORT_MB = 1800; + private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1; public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs, @@ -149,6 +152,13 @@ public DefaultSorter(OutputContext outputContext, Configuration conf, int numOut indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT); + spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, + TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT_DEFAULT); + Preconditions.checkArgument(spillFilesCountLimit == SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + || spillFilesCountLimit > 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT + + " should be greater than 0 or unbounded"); + boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); @@ -979,7 +989,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun } LOG.info(outputContext.getDestinationVertexName() + ": " + "Finished spill " + numSpills + " at " + filename.toString()); - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { numShuffleChunks.setValue(numSpills); } else if (numSpills > 1) { @@ -1059,7 +1069,7 @@ private void spillSingleRecord(final Object key, final Object value, totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { numShuffleChunks.setValue(numSpills); } else if (numSpills > 1) { @@ -1313,7 +1323,7 @@ private void mergeParts() throws IOException, InterruptedException { } finally { finalOut.close(); } - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { List events = Lists.newLinkedList(); maybeSendEventForSpill(events, true, sr, 0, true); @@ -1399,4 +1409,18 @@ private void mergeParts() throws IOException, InterruptedException { } } } + + /** + * Increments numSpills local counter by taking into consideration + * the max limit on spill files being generated by the job. + * If limit is reached, this function throws an IOException + */ + private void incrementNumSpills() throws IOException { + ++numSpills; + if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) { + throw new IOException("Too many spill files got created, control it with " + + "tez.runtime.spill.files.count.limit, current value: " + spillFilesCountLimit + + ", current spill count: " + numSpills); + } + } } diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 727f8acab1..1de716621b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -48,6 +48,8 @@ import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter; +import org.apache.tez.runtime.library.common.sort.impl.dflt.TestDefaultSorter; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.testutils.RandomTextGenerator; @@ -55,9 +57,14 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.Map; import java.util.TreeMap; @@ -115,6 +122,9 @@ public void setup() throws IOException { this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); } + @Rule + public ExpectedException exception = ExpectedException.none(); + public static Configuration getConf() { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); @@ -827,6 +837,75 @@ public void testWithLargeKeyValueWithMinBlockSize() throws IOException { basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20); } + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException { + this.numOutputs = 10; + + //128 MB. Do not pre-allocate. + // Get 32 MB buffer first and the another buffer with 96 on filling up + // the 32 MB buffer. + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, -2); + + exception.expect(IllegalArgumentException.class); + exception.expectMessage("tez.runtime.spill.files.count.limit should be greater than 0 or unbounded"); + + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128l << 20)); + + closeSorter(sorter); + } + + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException { + this.numOutputs = 10; + + //128 MB. Do not pre-allocate. + // Get 32 MB buffer first and the another buffer with 96 on filling up + // the 32 MB buffer. + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, 2); + + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128l << 20)); + + Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills"); + numSpillsField.setAccessible(true); + numSpillsField.set(sorter, 2); + + Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills"); + method.setAccessible(true); + boolean gotExceptionWithMessage = false; + try { + method.invoke(sorter); + } catch(InvocationTargetException e) { + Throwable targetException = e.getTargetException(); + if (targetException != null) { + String errorMessage = targetException.getMessage(); + if (errorMessage != null) { + if(errorMessage.equals("Too many spill files got created, control it with " + + "tez.runtime.spill.files.count.limit, current value: 2, current spill count: 3")) { + gotExceptionWithMessage = true; + } + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + closeSorter(sorter); + + Assert.assertTrue(gotExceptionWithMessage); + } + private void verifyOutputPermissions(String spillId) throws IOException { String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index e0fb15320c..44a652c435 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -36,6 +36,9 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; @@ -82,7 +85,9 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -130,6 +135,9 @@ public void reset() throws IOException { localFs.mkdirs(workingDir); } + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test(timeout = 5000) public void testSortSpillPercent() throws Exception { OutputContext context = createTezOutputContext(); @@ -578,6 +586,71 @@ public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException { verifyCounters(sorter, context); } + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException { + OutputContext context = createTezOutputContext(); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, -2); + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + + exception.expect(IllegalArgumentException.class); + exception.expectMessage("tez.runtime.spill.files.count.limit should be greater than 0 or unbounded"); + + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + sorterWrapper.getSorter(); + sorterWrapper.close(); + } + + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException, NoSuchFieldException, IllegalAccessException { + OutputContext context = createTezOutputContext(); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SPILL_FILES_COUNT_LIMIT, 2); + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); + + Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills"); + numSpillsField.setAccessible(true); + numSpillsField.set(sorter, 2); + + Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills"); + method.setAccessible(true); + boolean gotExceptionWithMessage = false; + try { + method.invoke(sorter); + } catch(InvocationTargetException e) { + Throwable targetException = e.getTargetException(); + if (targetException != null) { + String errorMessage = targetException.getMessage(); + if (errorMessage != null) { + if(errorMessage.equals("Too many spill files got created, control it with " + + "tez.runtime.spill.files.count.limit, current value: 2, current spill count: 3")) { + gotExceptionWithMessage = true; + } + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + sorterWrapper.close(); + + Assert.assertTrue(gotExceptionWithMessage); + } + private void verifyOutputPermissions(String spillId) throws IOException { String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;