Skip to content

Commit

Permalink
[Flink/Rust] Adjust rolling file logic to reduce memory usage during …
Browse files Browse the repository at this point in the history
…write (#426)

* adjust rolling file logic to reduce memory usage during write

Signed-off-by: chenxu <[email protected]>

* fix parameters not used from flink-conf.yaml

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jan 19, 2024
1 parent 8ff6f19 commit 58600b6
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.table.data.RowData;

import java.io.IOException;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DEFAULT_BUCKET_ROLLING_SIZE;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.DEFAULT_BUCKET_ROLLING_TIME;

Expand Down Expand Up @@ -37,8 +39,8 @@ public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
}

@Override
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) {
return false;
public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) throws IOException {
return partFileState.getSize() >= this.rollingSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public String toString() {
", bucketId='" + bucketId + '\'' +
", identity=" + identity +
", commitId='" + commitId + '\'' +
", pendingFiles='" + pendingFiles + '\'' +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public long getPartCounter() {
}

public boolean isActive() {
return inProgressPartWriter != null || pendingFiles.size() > 0;
return inProgressPartWriter != null || !pendingFiles.isEmpty();
}

void merge(final LakeSoulWriterBucket bucket) throws IOException {
Expand All @@ -133,19 +133,15 @@ void merge(final LakeSoulWriterBucket bucket) throws IOException {
bucket.closePartFile();
pendingFiles.addAll(bucket.pendingFiles);

if (LOG.isDebugEnabled()) {
LOG.debug("Merging buckets for bucket id={}", bucketId);
}
LOG.info("Merging buckets for bucket id={}", bucketId);
}

void write(RowData element, long currentTime, long tsMs) throws IOException {
if (inProgressPartWriter == null || rollingPolicy.shouldRollOnEvent(inProgressPartWriter, element)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Opening new part file for bucket id={} due to element {}.",
bucketId,
element);
}
LOG.info(
"Opening new part file for bucket id={} at {}.",
bucketId,
tsMs);
inProgressPartWriter = rollPartFile(currentTime);
this.tsMs = tsMs;
}
Expand All @@ -157,10 +153,8 @@ List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush,String dmlTy
// we always close part file and do not keep in-progress file
// since the native parquet writer doesn't support resume
if (inProgressPartWriter != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing in-progress part file for bucket id={} on checkpoint.", bucketId);
}
LOG.info(
"Closing in-progress part file for bucket id={} on checkpoint.", bucketId);
closePartFile();
}

Expand All @@ -173,7 +167,7 @@ List<LakeSoulMultiTableSinkCommittable> prepareCommit(boolean flush,String dmlTy
committables.add(new LakeSoulMultiTableSinkCommittable(
bucketId,
tmpPending,
time, tableId, tsMs,dmlType));
time, tableId, tsMs, dmlType));
pendingFiles.clear();

return committables;
Expand All @@ -196,17 +190,15 @@ LakeSoulWriterBucketState snapshotState() throws IOException {
void onProcessingTime(long timestamp) throws IOException {
if (inProgressPartWriter != null
&& rollingPolicy.shouldRollOnProcessingTime(inProgressPartWriter, timestamp)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Bucket {} closing in-progress part file for part file id={} due to processing time rolling " +
"policy "
+ "(in-progress file created @ {}, last updated @ {} and current time is {}).",
bucketId,
uniqueId,
inProgressPartWriter.getCreationTime(),
inProgressPartWriter.getLastUpdateTime(),
timestamp);
}
LOG.info(
"Bucket {} closing in-progress part file for part file id={} due to processing time rolling " +
"policy "
+ "(in-progress file created @ {}, last updated @ {} and current time is {}).",
bucketId,
uniqueId,
inProgressPartWriter.getCreationTime(),
inProgressPartWriter.getLastUpdateTime(),
timestamp);

closePartFile();
}
Expand All @@ -217,12 +209,10 @@ private InProgressFileWriter<RowData, String> rollPartFile(long currentTime) thr

final Path partFilePath = assembleNewPartPath();

if (LOG.isDebugEnabled()) {
LOG.debug(
"Opening new part file \"{}\" for bucket id={}.",
partFilePath.getName(),
bucketId);
}
LOG.info(
"Opening new part file \"{}\" for bucket id={}.",
partFilePath.getName(),
bucketId);

return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
}
Expand Down Expand Up @@ -250,10 +240,13 @@ private Path assembleNewPartPath() {

private void closePartFile() throws IOException {
if (inProgressPartWriter != null) {
long start = System.currentTimeMillis();
InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable =
inProgressPartWriter.closeForCommit();
pendingFiles.add(pendingFileRecoverable);
inProgressPartWriter = null;
LOG.info("Closed part file {} for {}ms", pendingFileRecoverable.getPath(),
(System.currentTimeMillis() - start));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package org.apache.flink.lakesoul.sink.writer;

import com.dmetasoul.lakesoul.lakesoul.io.NativeIOWriter;
import com.dmetasoul.lakesoul.lakesoul.memory.ArrowMemoryUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -48,7 +46,7 @@ public class NativeParquetWriter implements InProgressFileWriter<RowData, String

String path;

protected BufferAllocator allocator;
private long totalRows = 0;

public NativeParquetWriter(RowType rowType,
List<String> primaryKeys,
Expand All @@ -60,8 +58,6 @@ public NativeParquetWriter(RowType rowType,
this.creationTime = creationTime;
this.bucketID = bucketID;
this.rowsInBatch = 0;
this.allocator = ArrowMemoryUtils.rootAllocator.newChildAllocator("NativeParquetWriter", 0, Long.MAX_VALUE);


ArrowUtils.setLocalTimeZone(FlinkUtil.getLocalTimeZone(conf));
Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
Expand All @@ -71,7 +67,7 @@ public NativeParquetWriter(RowType rowType,
nativeWriter.setAuxSortColumns(Collections.singletonList(SORT_FIELD));
}
nativeWriter.setRowGroupRowNumber(this.batchSize);
batch = VectorSchemaRoot.create(arrowSchema, this.allocator);
batch = VectorSchemaRoot.create(arrowSchema, nativeWriter.getAllocator());
arrowWriter = ArrowUtils.createRowDataArrowWriter(batch, rowType);
this.path = path.makeQualified(path.getFileSystem()).toString();
nativeWriter.addFile(this.path);
Expand All @@ -85,6 +81,7 @@ public void write(RowData element, long currentTime) throws IOException {
this.lastUpdateTime = currentTime;
this.arrowWriter.write(element);
this.rowsInBatch++;
this.totalRows++;
if (this.rowsInBatch >= this.batchSize) {
this.arrowWriter.finish();
this.nativeWriter.write(this.batch);
Expand Down Expand Up @@ -144,7 +141,6 @@ public PendingFileRecoverable closeForCommit() throws IOException {
this.rowsInBatch = 0;
this.batch.clear();
this.batch.close();
this.allocator.close();
try {
this.nativeWriter.close();
this.nativeWriter = null;
Expand All @@ -157,11 +153,10 @@ public PendingFileRecoverable closeForCommit() throws IOException {
@Override
public void dispose() {
try {
this.nativeWriter.close();
this.nativeWriter = null;
this.arrowWriter.finish();
this.batch.close();
this.allocator.close();
this.nativeWriter.close();
this.nativeWriter = null;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -179,7 +174,7 @@ public long getCreationTime() {

@Override
public long getSize() throws IOException {
return 0;
return totalRows;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.lakesoul.source.LakeSoulLookupTableSource;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.lakesoul.types.TableId;
Expand All @@ -32,7 +33,8 @@ public class LakeSoulDynamicTableFactory implements DynamicTableSinkFactory, Dyn

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration options = (Configuration) FactoryUtil.createTableFactoryHelper(this, context).getOptions();
Configuration options = GlobalConfiguration.loadConfiguration();
options.addAll((Configuration) FactoryUtil.createTableFactoryHelper(this, context).getOptions());
FlinkUtil.setLocalTimeZone(options,
FlinkUtil.getLocalTimeZone(((TableConfig) context.getConfiguration()).getConfiguration()));

Expand Down Expand Up @@ -71,7 +73,8 @@ public Set<ConfigOption<?>> optionalOptions() {

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
Configuration options = (Configuration) FactoryUtil.createTableFactoryHelper(this, context).getOptions();
Configuration options = GlobalConfiguration.loadConfiguration();
options.addAll((Configuration) FactoryUtil.createTableFactoryHelper(this, context).getOptions());
FlinkUtil.setLocalTimeZone(options,
FlinkUtil.getLocalTimeZone(((TableConfig) context.getConfiguration()).getConfiguration()));
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
Expand All @@ -97,7 +100,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {

return new LakeSoulLookupTableSource(
new TableId(io.debezium.relational.TableId.parse(objectIdentifier.asSummaryString())),
(RowType) catalogTable.getResolvedSchema().toSourceRowDataType().notNull().getLogicalType(), isStreaming, pkColumns, catalogTable, options.toMap()
(RowType) catalogTable.getResolvedSchema().toSourceRowDataType().notNull().getLogicalType(),
isStreaming,
pkColumns,
catalogTable,
options.toMap()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
import java.util.stream.IntStream;

public class LakeSoulTableSource
implements SupportsFilterPushDown, SupportsPartitionPushDown, SupportsProjectionPushDown, ScanTableSource, SupportsRowLevelModificationScan {
implements SupportsFilterPushDown, SupportsPartitionPushDown, SupportsProjectionPushDown, ScanTableSource,
SupportsRowLevelModificationScan {

private static final Logger LOG = LoggerFactory.getLogger(LakeSoulTableSource.class);

Expand Down Expand Up @@ -232,7 +233,10 @@ public String toString() {
}

@Override
public RowLevelModificationScanContext applyRowLevelModificationScan(RowLevelModificationType rowLevelModificationType, @Nullable RowLevelModificationScanContext previousContext) {
public RowLevelModificationScanContext applyRowLevelModificationScan(
RowLevelModificationType rowLevelModificationType,
@Nullable
RowLevelModificationScanContext previousContext) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) {
JSONObject properties = JSON.parseObject(tableInfo.getProperties());

org.apache.arrow.vector.types.pojo.Schema arrowSchema = null;
System.out.println(tableSchema);
if (TableInfoDao.isArrowKindSchema(tableSchema)) {
try {
arrowSchema = org.apache.arrow.vector.types.pojo.Schema.fromJSON(tableSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class LakeSoulSinkOptions {

public static final String SORT_FIELD = "__sort_filed__";

public static final Long DEFAULT_BUCKET_ROLLING_SIZE = 20000L;
public static final Long DEFAULT_BUCKET_ROLLING_SIZE = 5000000L;

public static final Long DEFAULT_BUCKET_ROLLING_TIME = 2000000L;
public static final Long DEFAULT_BUCKET_ROLLING_TIME = 5 * 60 * 1000L;

public static final String DELETE = "delete";

Expand Down Expand Up @@ -66,7 +66,7 @@ public class LakeSoulSinkOptions {
.key("warehouse_path")
.stringType()
.defaultValue(new Path(System.getProperty("java.io.tmpdir"), "lakesoul").toString())
.withDescription("warehouse path for LakeSoul");
.withDescription("warehouse path for LakeSoul for command line tools");

public static final ConfigOption<Integer> BUCKET_PARALLELISM = ConfigOptions
.key("sink.parallelism")
Expand All @@ -81,22 +81,22 @@ public class LakeSoulSinkOptions {
.withDescription("bucket number for table");

public static final ConfigOption<Long> FILE_ROLLING_SIZE = ConfigOptions
.key("file_rolling_size")
.key("lakesoul.file.rolling.rows")
.longType()
.defaultValue(20000L)
.withDescription("file rolling size ");
.defaultValue(DEFAULT_BUCKET_ROLLING_SIZE)
.withDescription("file rolling max rows");

public static final ConfigOption<Long> FILE_ROLLING_TIME = ConfigOptions
.key("file_rolling_time")
.key("lakesoul.file.rolling.time.ms")
.longType()
.defaultValue(Duration.ofMinutes(10).toMillis())
.withDescription("file rolling time ");
.defaultValue(DEFAULT_BUCKET_ROLLING_TIME)
.withDescription("file rolling time in milliseconds");

public static final ConfigOption<Long> BUCKET_CHECK_INTERVAL = ConfigOptions
.key("bucket_check_interval")
.key("lakesoul.rolling.check.interval")
.longType()
.defaultValue(Duration.ofMinutes(1).toMillis())
.withDescription("file rolling time ");
.withDescription("file rolling check interval in milliseconds");

public static final ConfigOption<Boolean> USE_CDC = ConfigOptions
.key("use_cdc")
Expand Down
5 changes: 2 additions & 3 deletions rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,10 +419,9 @@ pub fn create_session_context_with_planner(
sess_conf.options_mut().optimizer.enable_round_robin_repartition = false; // if true, the record_batches poll from stream become unordered
sess_conf.options_mut().optimizer.prefer_hash_join = false; //if true, panicked at 'range end out of bounds'
sess_conf.options_mut().execution.parquet.pushdown_filters = config.parquet_filter_pushdown;
// sess_conf.options_mut().execution.parquet.enable_page_index = true;
sess_conf.options_mut().execution.target_partitions = 1;

// limit memory for sort writer
let runtime = RuntimeEnv::new(RuntimeConfig::new().with_memory_limit(128 * 1024 * 1024, 1.0))?;
let runtime = RuntimeEnv::new(RuntimeConfig::new())?;

// firstly parse default fs if exist
let default_fs = config
Expand Down
2 changes: 0 additions & 2 deletions rust/lakesoul-io/src/lakesoul_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ impl LakeSoulReader {
"LakeSoulReader has wrong number of file".to_string(),
))
} else {
// let source = LakeSoulParquetProvider::from_config(self.config.clone());
// let source = source.build_with_context(&self.sess_ctx).await.unwrap();
let file_format = Arc::new(LakeSoulParquetFormat::new(
Arc::new(ParquetFormat::new()),
self.config.clone(),
Expand Down
Loading

0 comments on commit 58600b6

Please sign in to comment.