diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index b05c3af2b24..528ceaaf660 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -27,15 +27,6 @@ import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; -import org.apache.orc.storage.ql.exec.vector.ColumnVector; -import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; -import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; -import org.apache.orc.storage.ql.exec.vector.ListColumnVector; -import org.apache.orc.storage.ql.exec.vector.LongColumnVector; -import org.apache.orc.storage.ql.exec.vector.MapColumnVector; -import org.apache.orc.storage.ql.exec.vector.StructColumnVector; -import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import com.google.common.annotations.VisibleForTesting; @@ -45,8 +36,6 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.state.ConstructState; -import static org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE; - /** * A wrapper for ORC-core writer without dependency on Hive SerDe library. */ @@ -56,39 +45,6 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize"; public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000; - private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX + "jvm.memory.mbs"; - private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096; - - private static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = ORC_WRITER_PREFIX + "container.jvmMemoryXmxRatio"; - private static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 1.0; - - static final String CONTAINER_JVM_MEMORY_OVERHEAD_MBS = ORC_WRITER_PREFIX + "container.jvmMemoryOverheadMbs"; - private static final int DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS = 0; - - @VisibleForTesting - static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX + "autoTuneEnabled"; - private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false; - private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024; - - /** - * This value gives an estimation on how many writers are buffering records at the same time in a container. - * Since time-based partition scheme is a commonly used practice, plus the chances for late-arrival data, - * usually there would be 2-3 writers running during the hourly boundary. 3 is chosen here for being conservative. - */ - private static final int ESTIMATED_PARALLELISM_WRITERS = 3; - - // The serialized record size passed from AVG_RECORD_SIZE is smaller than the actual in-memory representation - // of a record. This is just the number represents how many times that the actual buffer storing record is larger - // than the serialized size passed down from upstream constructs. - @VisibleForTesting - static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor"; - static final int DEFAULT_RECORD_SIZE_SCALE_FACTOR = 6; - - /** - * Check comment of {@link #deepCleanRowBatch} for the usage of this configuration. - */ - private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = ORC_WRITER_PREFIX + "deepCleanBatch"; - private final OrcValueWriter valueWriter; @VisibleForTesting VectorizedRowBatch rowBatch; @@ -99,61 +55,14 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { // the close method may be invoked multiple times, but the underlying writer only supports close being called once private volatile boolean closed = false; - private final boolean deepCleanBatch; private final int batchSize; protected final S inputSchema; - /** - * There are a couple of parameters in ORC writer that requires manual tuning based on record size given that executor - * for running these ORC writers has limited heap space. This helper function wrap them and has side effect for the - * argument {@param properties}. - * - * Assumption for current implementation: - * The extractor or source class should set {@link org.apache.gobblin.configuration.ConfigurationKeys#AVG_RECORD_SIZE} - */ - protected void autoTunedOrcWriterParams(State properties) { - double writerRatio = properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double) OrcConf.MEMORY_POOL.getDefaultValue()); - long availableHeapPerWriter = Math.round(availableHeapSize(properties) * writerRatio / ESTIMATED_PARALLELISM_WRITERS); - - // Upstream constructs will need to set this value properly - long estimatedRecordSize = getEstimatedRecordSize(properties); - long rowsBetweenCheck = availableHeapPerWriter * 1024 / estimatedRecordSize; - properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(), - Math.min(rowsBetweenCheck, (int) OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue())); - // Row batch size should be smaller than row_between_check, 4 is just a magic number picked here. - long batchSize = Math.min(rowsBetweenCheck / 4, DEFAULT_ORC_WRITER_BATCH_SIZE); - properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize); - log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + " to be:" + rowsBetweenCheck + "," - + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize); - } - - /** - * Calculate the heap size in MB available for ORC writers. - */ - protected long availableHeapSize(State properties) { - // Calculate the recommended size as the threshold for memory check - long physicalMem = Math.round(properties.getPropAsLong(CONTAINER_MEMORY_MBS, DEFAULT_CONTAINER_MEMORY_MBS) - * properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY)); - long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - return physicalMem - nonHeap; - } - - /** - * Calculate the estimated record size in bytes. - */ - protected long getEstimatedRecordSize(State properties) { - long estimatedRecordSizeScale = properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, DEFAULT_RECORD_SIZE_SCALE_FACTOR); - return (properties.contains(AVG_RECORD_SIZE) ? properties.getPropAsLong(AVG_RECORD_SIZE) - : EXEMPLIFIED_RECORD_SIZE_IN_BYTES) * estimatedRecordSizeScale; - } public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { super(builder, properties); - if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED, ORC_WRITER_AUTO_TUNE_DEFAULT)) { - autoTunedOrcWriterParams(properties); - } // Create value-writer which is essentially a record-by-record-converter with buffering in batch. this.inputSchema = builder.getSchema(); @@ -163,8 +72,6 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) this.rowBatchPool = RowBatchPool.instance(properties); this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false); this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize); - this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false); - log.info("Created ORC writer, batch size: {}, {}: {}", batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), properties.getProp( @@ -192,7 +99,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) protected abstract TypeDescription getOrcSchema(); /** - * Get an {@OrcValueWriter} for the specified schema and configuration. + * Get an {@link OrcValueWriter} for the specified schema and configuration. */ protected abstract OrcValueWriter getOrcValueWriter(TypeDescription typeDescription, S inputSchema, State state); @@ -231,9 +138,6 @@ public void flush() if (rowBatch.size > 0) { orcFileWriter.addRowBatch(rowBatch); rowBatch.reset(); - if (deepCleanBatch) { - deepCleanRowBatch(rowBatch); - } } } @@ -285,65 +189,6 @@ public void write(D record) if (rowBatch.size == this.batchSize) { orcFileWriter.addRowBatch(rowBatch); rowBatch.reset(); - if (deepCleanBatch) { - log.info("A reset of rowBatch is triggered - releasing holding memory for large object"); - deepCleanRowBatch(rowBatch); - } - } - } - - /** - * The reset call of {@link VectorizedRowBatch} doesn't release the memory occupied by each {@link ColumnVector}'s child, - * which is usually an array of objects, while it only set those value to null. - * This method ensure the reference to the child array for {@link ColumnVector} are released and gives a hint of GC, - * so that each reset could release the memory pre-allocated by {@link ColumnVector#ensureSize(int, boolean)} method. - * - * This feature is configurable and should only be turned on if a dataset is: - * 1. Has large per-record size. - * 2. Has {@link org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector} as part of schema, - * like array, map and all nested structures containing these. - */ - @VisibleForTesting - void deepCleanRowBatch(VectorizedRowBatch rowBatch) { - for(int i = 0; i < rowBatch.cols.length; ++i) { - ColumnVector cv = rowBatch.cols[i]; - if (cv != null) { - removeRefOfColumnVectorChild(cv); - } - } - } - - /** - * Set the child field of {@link ColumnVector} to null, assuming input {@link ColumnVector} is nonNull. - */ - private void removeRefOfColumnVectorChild(ColumnVector cv) { - if (cv instanceof StructColumnVector) { - StructColumnVector structCv = (StructColumnVector) cv; - for (ColumnVector childCv: structCv.fields) { - removeRefOfColumnVectorChild(childCv); - } - } else if (cv instanceof ListColumnVector) { - ListColumnVector listCv = (ListColumnVector) cv; - removeRefOfColumnVectorChild(listCv.child); - } else if (cv instanceof MapColumnVector) { - MapColumnVector mapCv = (MapColumnVector) cv; - removeRefOfColumnVectorChild(mapCv.keys); - removeRefOfColumnVectorChild(mapCv.values); - } else if (cv instanceof UnionColumnVector) { - UnionColumnVector unionCv = (UnionColumnVector) cv; - for (ColumnVector unionChildCv : unionCv.fields) { - removeRefOfColumnVectorChild(unionChildCv); - } - } else if (cv instanceof LongColumnVector) { - ((LongColumnVector) cv).vector = null; - } else if (cv instanceof DoubleColumnVector) { - ((DoubleColumnVector) cv).vector = null; - } else if (cv instanceof BytesColumnVector) { - ((BytesColumnVector) cv).vector = null; - ((BytesColumnVector) cv).start = null; - ((BytesColumnVector) cv).length = null; - } else if (cv instanceof DecimalColumnVector) { - ((DecimalColumnVector) cv).vector = null; } } } diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java index e6b5f3d0312..0b0912cf7a7 100644 --- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java +++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java @@ -31,9 +31,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; -import org.apache.orc.OrcConf; -import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; -import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -45,12 +42,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.workunit.WorkUnit; -import static org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE; import static org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords; -import static org.apache.gobblin.writer.GobblinOrcWriter.CONTAINER_JVM_MEMORY_OVERHEAD_MBS; -import static org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_RECORD_SIZE_SCALE_FACTOR; -import static org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_AUTO_TUNE_ENABLED; -import static org.apache.gobblin.writer.GobblinOrcWriter.RECORD_SIZE_SCALE_FACTOR; import static org.mockito.Mockito.*; @@ -81,69 +73,6 @@ public static final List deserializeAvroRecords(Class clazz, Sche return records; } - @Test - public void testAutoTuned() throws Exception { - Closer closer = Closer.create(); - Schema schema = - new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc")); - - FsDataWriterBuilder mockBuilder = - (FsDataWriterBuilder) Mockito.mock(FsDataWriterBuilder.class); - when(mockBuilder.getSchema()).thenReturn(schema); - State properties = new WorkUnit(); - String stagingDir = Files.createTempDir().getAbsolutePath(); - String outputDir = Files.createTempDir().getAbsolutePath(); - properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir); - properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple"); - properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir); - when(mockBuilder.getFileName(properties)).thenReturn("file"); - - properties.setProp(ORC_WRITER_AUTO_TUNE_ENABLED, true); - properties.setProp(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, 2048); - closer.register(new GobblinOrcWriter(mockBuilder, properties)); - // Verify the side effect within the properties object. - Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()), - Math.round((4096 - 2048) * 0.5 * 1024 / 3) / (1024 * properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, DEFAULT_RECORD_SIZE_SCALE_FACTOR))); - - // Will get to 5000 - properties.setProp(AVG_RECORD_SIZE, 10); - closer.register(new GobblinOrcWriter(mockBuilder, properties)); - Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()), 5000); - - closer.close(); - } - - @Test - public void testRowBatchDeepClean() throws Exception { - Schema schema = new Schema.Parser().parse( - this.getClass().getClassLoader().getResourceAsStream("orc_writer_list_test/schema.avsc")); - List recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_list_test/data.json"); - // Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder - FsDataWriterBuilder mockBuilder = - (FsDataWriterBuilder) Mockito.mock(FsDataWriterBuilder.class); - when(mockBuilder.getSchema()).thenReturn(schema); - State dummyState = new WorkUnit(); - String stagingDir = Files.createTempDir().getAbsolutePath(); - String outputDir = Files.createTempDir().getAbsolutePath(); - dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir); - dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple"); - dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir); - dummyState.setProp("orcWriter.deepCleanBatch", "true"); - when(mockBuilder.getFileName(dummyState)).thenReturn("file"); - - Closer closer = Closer.create(); - - GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState)); - for (GenericRecord genericRecord : recordList) { - orcWriter.write(genericRecord); - } - // Manual trigger flush - orcWriter.flush(); - - Assert.assertNull(((BytesColumnVector) ((ListColumnVector) orcWriter.rowBatch.cols[0]).child).vector); - Assert.assertNull(((BytesColumnVector) orcWriter.rowBatch.cols[1]).vector); - } - /** * A basic unit for trivial writer correctness. * TODO: A detailed test suite of ORC-writer for different sorts of schema: