Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1854] Remove unused ORC writer code #3710

Merged
merged 1 commit into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,6 @@
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -45,8 +36,6 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.state.ConstructState;

import static org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;

/**
* A wrapper for ORC-core writer without dependency on Hive SerDe library.
*/
Expand All @@ -56,39 +45,6 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize";
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;

private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX + "jvm.memory.mbs";
private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;

private static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = ORC_WRITER_PREFIX + "container.jvmMemoryXmxRatio";
private static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 1.0;

static final String CONTAINER_JVM_MEMORY_OVERHEAD_MBS = ORC_WRITER_PREFIX + "container.jvmMemoryOverheadMbs";
private static final int DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS = 0;

@VisibleForTesting
static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX + "autoTuneEnabled";
private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;

/**
* This value gives an estimation on how many writers are buffering records at the same time in a container.
* Since time-based partition scheme is a commonly used practice, plus the chances for late-arrival data,
* usually there would be 2-3 writers running during the hourly boundary. 3 is chosen here for being conservative.
*/
private static final int ESTIMATED_PARALLELISM_WRITERS = 3;

// The serialized record size passed from AVG_RECORD_SIZE is smaller than the actual in-memory representation
// of a record. This is just the number represents how many times that the actual buffer storing record is larger
// than the serialized size passed down from upstream constructs.
@VisibleForTesting
static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";
static final int DEFAULT_RECORD_SIZE_SCALE_FACTOR = 6;

/**
* Check comment of {@link #deepCleanRowBatch} for the usage of this configuration.
*/
private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = ORC_WRITER_PREFIX + "deepCleanBatch";

private final OrcValueWriter<D> valueWriter;
@VisibleForTesting
VectorizedRowBatch rowBatch;
Expand All @@ -99,61 +55,14 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {

// the close method may be invoked multiple times, but the underlying writer only supports close being called once
private volatile boolean closed = false;
private final boolean deepCleanBatch;

private final int batchSize;
protected final S inputSchema;

/**
* There are a couple of parameters in ORC writer that requires manual tuning based on record size given that executor
* for running these ORC writers has limited heap space. This helper function wrap them and has side effect for the
* argument {@param properties}.
*
* Assumption for current implementation:
* The extractor or source class should set {@link org.apache.gobblin.configuration.ConfigurationKeys#AVG_RECORD_SIZE}
*/
protected void autoTunedOrcWriterParams(State properties) {
double writerRatio = properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double) OrcConf.MEMORY_POOL.getDefaultValue());
long availableHeapPerWriter = Math.round(availableHeapSize(properties) * writerRatio / ESTIMATED_PARALLELISM_WRITERS);

// Upstream constructs will need to set this value properly
long estimatedRecordSize = getEstimatedRecordSize(properties);
long rowsBetweenCheck = availableHeapPerWriter * 1024 / estimatedRecordSize;
properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
Math.min(rowsBetweenCheck, (int) OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()));
// Row batch size should be smaller than row_between_check, 4 is just a magic number picked here.
long batchSize = Math.min(rowsBetweenCheck / 4, DEFAULT_ORC_WRITER_BATCH_SIZE);
properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize);
log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + " to be:" + rowsBetweenCheck + ","
+ ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
}

/**
* Calculate the heap size in MB available for ORC writers.
*/
protected long availableHeapSize(State properties) {
// Calculate the recommended size as the threshold for memory check
long physicalMem = Math.round(properties.getPropAsLong(CONTAINER_MEMORY_MBS, DEFAULT_CONTAINER_MEMORY_MBS)
* properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY));
long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
return physicalMem - nonHeap;
}

/**
* Calculate the estimated record size in bytes.
*/
protected long getEstimatedRecordSize(State properties) {
long estimatedRecordSizeScale = properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, DEFAULT_RECORD_SIZE_SCALE_FACTOR);
return (properties.contains(AVG_RECORD_SIZE) ? properties.getPropAsLong(AVG_RECORD_SIZE)
: EXEMPLIFIED_RECORD_SIZE_IN_BYTES) * estimatedRecordSizeScale;
}

public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
throws IOException {
super(builder, properties);
if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED, ORC_WRITER_AUTO_TUNE_DEFAULT)) {
autoTunedOrcWriterParams(properties);
}

// Create value-writer which is essentially a record-by-record-converter with buffering in batch.
this.inputSchema = builder.getSchema();
Expand All @@ -163,8 +72,6 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.rowBatchPool = RowBatchPool.instance(properties);
this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false);
this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize);
this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);

log.info("Created ORC writer, batch size: {}, {}: {}",
batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(),
properties.getProp(
Expand Down Expand Up @@ -192,7 +99,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
protected abstract TypeDescription getOrcSchema();

/**
* Get an {@OrcValueWriter} for the specified schema and configuration.
* Get an {@link OrcValueWriter} for the specified schema and configuration.
*/
protected abstract OrcValueWriter<D> getOrcValueWriter(TypeDescription typeDescription, S inputSchema, State state);

Expand Down Expand Up @@ -231,9 +138,6 @@ public void flush()
if (rowBatch.size > 0) {
orcFileWriter.addRowBatch(rowBatch);
rowBatch.reset();
if (deepCleanBatch) {
deepCleanRowBatch(rowBatch);
}
}
}

Expand Down Expand Up @@ -285,65 +189,6 @@ public void write(D record)
if (rowBatch.size == this.batchSize) {
orcFileWriter.addRowBatch(rowBatch);
rowBatch.reset();
if (deepCleanBatch) {
log.info("A reset of rowBatch is triggered - releasing holding memory for large object");
deepCleanRowBatch(rowBatch);
}
}
}

/**
* The reset call of {@link VectorizedRowBatch} doesn't release the memory occupied by each {@link ColumnVector}'s child,
* which is usually an array of objects, while it only set those value to null.
* This method ensure the reference to the child array for {@link ColumnVector} are released and gives a hint of GC,
* so that each reset could release the memory pre-allocated by {@link ColumnVector#ensureSize(int, boolean)} method.
*
* This feature is configurable and should only be turned on if a dataset is:
* 1. Has large per-record size.
* 2. Has {@link org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector} as part of schema,
* like array, map and all nested structures containing these.
*/
@VisibleForTesting
void deepCleanRowBatch(VectorizedRowBatch rowBatch) {
for(int i = 0; i < rowBatch.cols.length; ++i) {
ColumnVector cv = rowBatch.cols[i];
if (cv != null) {
removeRefOfColumnVectorChild(cv);
}
}
}

/**
* Set the child field of {@link ColumnVector} to null, assuming input {@link ColumnVector} is nonNull.
*/
private void removeRefOfColumnVectorChild(ColumnVector cv) {
if (cv instanceof StructColumnVector) {
StructColumnVector structCv = (StructColumnVector) cv;
for (ColumnVector childCv: structCv.fields) {
removeRefOfColumnVectorChild(childCv);
}
} else if (cv instanceof ListColumnVector) {
ListColumnVector listCv = (ListColumnVector) cv;
removeRefOfColumnVectorChild(listCv.child);
} else if (cv instanceof MapColumnVector) {
MapColumnVector mapCv = (MapColumnVector) cv;
removeRefOfColumnVectorChild(mapCv.keys);
removeRefOfColumnVectorChild(mapCv.values);
} else if (cv instanceof UnionColumnVector) {
UnionColumnVector unionCv = (UnionColumnVector) cv;
for (ColumnVector unionChildCv : unionCv.fields) {
removeRefOfColumnVectorChild(unionChildCv);
}
} else if (cv instanceof LongColumnVector) {
((LongColumnVector) cv).vector = null;
} else if (cv instanceof DoubleColumnVector) {
((DoubleColumnVector) cv).vector = null;
} else if (cv instanceof BytesColumnVector) {
((BytesColumnVector) cv).vector = null;
((BytesColumnVector) cv).start = null;
((BytesColumnVector) cv).length = null;
} else if (cv instanceof DecimalColumnVector) {
((DecimalColumnVector) cv).vector = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.orc.OrcConf;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand All @@ -45,12 +42,7 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.workunit.WorkUnit;

import static org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
import static org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords;
import static org.apache.gobblin.writer.GobblinOrcWriter.CONTAINER_JVM_MEMORY_OVERHEAD_MBS;
import static org.apache.gobblin.writer.GobblinOrcWriter.DEFAULT_RECORD_SIZE_SCALE_FACTOR;
import static org.apache.gobblin.writer.GobblinOrcWriter.ORC_WRITER_AUTO_TUNE_ENABLED;
import static org.apache.gobblin.writer.GobblinOrcWriter.RECORD_SIZE_SCALE_FACTOR;
import static org.mockito.Mockito.*;


Expand Down Expand Up @@ -81,69 +73,6 @@ public static final List<GenericRecord> deserializeAvroRecords(Class clazz, Sche
return records;
}

@Test
public void testAutoTuned() throws Exception {
Closer closer = Closer.create();
Schema schema =
new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));

FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
(FsDataWriterBuilder<Schema, GenericRecord>) Mockito.mock(FsDataWriterBuilder.class);
when(mockBuilder.getSchema()).thenReturn(schema);
State properties = new WorkUnit();
String stagingDir = Files.createTempDir().getAbsolutePath();
String outputDir = Files.createTempDir().getAbsolutePath();
properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple");
properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
when(mockBuilder.getFileName(properties)).thenReturn("file");

properties.setProp(ORC_WRITER_AUTO_TUNE_ENABLED, true);
properties.setProp(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, 2048);
closer.register(new GobblinOrcWriter(mockBuilder, properties));
// Verify the side effect within the properties object.
Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()),
Math.round((4096 - 2048) * 0.5 * 1024 / 3) / (1024 * properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, DEFAULT_RECORD_SIZE_SCALE_FACTOR)));

// Will get to 5000
properties.setProp(AVG_RECORD_SIZE, 10);
closer.register(new GobblinOrcWriter(mockBuilder, properties));
Assert.assertEquals(properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.name()), 5000);

closer.close();
}

@Test
public void testRowBatchDeepClean() throws Exception {
Schema schema = new Schema.Parser().parse(
this.getClass().getClassLoader().getResourceAsStream("orc_writer_list_test/schema.avsc"));
List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_list_test/data.json");
// Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder
FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
(FsDataWriterBuilder<Schema, GenericRecord>) Mockito.mock(FsDataWriterBuilder.class);
when(mockBuilder.getSchema()).thenReturn(schema);
State dummyState = new WorkUnit();
String stagingDir = Files.createTempDir().getAbsolutePath();
String outputDir = Files.createTempDir().getAbsolutePath();
dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "simple");
dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
dummyState.setProp("orcWriter.deepCleanBatch", "true");
when(mockBuilder.getFileName(dummyState)).thenReturn("file");

Closer closer = Closer.create();

GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState));
for (GenericRecord genericRecord : recordList) {
orcWriter.write(genericRecord);
}
// Manual trigger flush
orcWriter.flush();

Assert.assertNull(((BytesColumnVector) ((ListColumnVector) orcWriter.rowBatch.cols[0]).child).vector);
Assert.assertNull(((BytesColumnVector) orcWriter.rowBatch.cols[1]).vector);
}

/**
* A basic unit for trivial writer correctness.
* TODO: A detailed test suite of ORC-writer for different sorts of schema:
Expand Down