Skip to content

Commit

Permalink
[flink] Remove per record RecordsWithSplitIds (apache#1370)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jun 14, 2023
1 parent 500f86c commit 93e3dd5
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,6 @@
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.</td>
</tr>
<tr>
<td><h5>streaming-read-atomic</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>The option to enable return per iterator instead of per record in streaming read.This can ensure that there will be no checkpoint segmentation in iterator consumption.<br />By default, streaming source checkpoint will be performed in any time, this means 'UPDATE_BEFORE' and 'UPDATE_AFTER' can be split into two checkpoint. Downstream can see intermediate state.</td>
</tr>
<tr>
<td><h5>unaware-bucket.compaction.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -53,20 +52,6 @@ protected List<String> ddl() {
+ options);
}

@Test
public void testStreamingAtomicChangelogFileTrue() throws Exception {
changelogFile = true;
sql("ALTER TABLE T2 SET ('%s' = 'true')", STREAMING_READ_ATOMIC.key());
testSimple("T2");
}

@Test
public void testStreamingAtomicChangelogFileFalse() throws Exception {
changelogFile = false;
sql("ALTER TABLE T2 SET ('%s' = 'true')", STREAMING_READ_ATOMIC.key());
testSimple("T2");
}

@Test
public void testWithoutPrimaryKeyChangelogFileTrue() throws Exception {
changelogFile = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Comparator;
import java.util.List;

import static org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -53,20 +52,6 @@ protected List<String> ddl() {
+ options);
}

@Test
public void testStreamingAtomicChangelogFileTrue() throws Exception {
changelogFile = true;
sql("ALTER TABLE T2 SET ('%s' = 'true')", STREAMING_READ_ATOMIC.key());
testSimple("T2");
}

@Test
public void testStreamingAtomicChangelogFileFalse() throws Exception {
changelogFile = false;
sql("ALTER TABLE T2 SET ('%s' = 'true')", STREAMING_READ_ATOMIC.key());
testSimple("T2");
}

@Test
public void testWithoutPrimaryKeyChangelogFileTrue() throws Exception {
changelogFile = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,6 @@ public class FlinkConnectorOptions {
+ SCAN_PARALLELISM.key()
+ ". Otherwise, source parallelism is inferred from splits number (batch mode) or bucket number(streaming mode).");

public static final ConfigOption<Boolean> STREAMING_READ_ATOMIC =
ConfigOptions.key("streaming-read-atomic")
.booleanType()
.defaultValue(false)
.withDescription(
Description.builder()
.text(
"The option to enable return per iterator instead of per record in streaming read.")
.text(
"This can ensure that there will be no checkpoint segmentation in iterator consumption.")
.linebreak()
.text(
"By default, streaming source checkpoint will be performed in any time,"
+ " this means 'UPDATE_BEFORE' and 'UPDATE_AFTER' can be split into two checkpoint."
+ " Downstream can see intermediate state.")
.build());

@Deprecated
@ExcludeFromDocumentation("Deprecated")
public static final ConfigOption<Duration> CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;

Expand All @@ -36,12 +33,10 @@
import java.util.Collection;
import java.util.Map;

import static org.apache.paimon.flink.FlinkConnectorOptions.STREAMING_READ_ATOMIC;

/** Unbounded {@link FlinkSource} for reading records. It continuously monitors new snapshots. */
public class ContinuousFileStoreSource extends FlinkSource {

private static final long serialVersionUID = 3L;
private static final long serialVersionUID = 4L;

private final Map<String, String> options;

Expand All @@ -53,7 +48,8 @@ public ContinuousFileStoreSource(

@Override
public Boundedness getBoundedness() {
return isBounded() ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
Long boundedWatermark = CoreOptions.fromMap(options).scanBoundedWatermark();
return boundedWatermark != null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
Expand All @@ -79,16 +75,4 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
scan);
}

@Override
public FileStoreSourceReader<?> createSourceReader(
SourceReaderContext context, TableRead read, @Nullable Long limit) {
return Options.fromMap(options).get(STREAMING_READ_ATOMIC)
? new FileStoreSourceReader<>(RecordsFunction.forSingle(), context, read, limit)
: new FileStoreSourceReader<>(RecordsFunction.forIterate(), context, read, limit);
}

private boolean isBounded() {
return CoreOptions.fromMap(options).scanBoundedWatermark() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,24 @@
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

import java.util.Map;

/** A {@link SourceReader} that read records from {@link FileStoreSourceSplit}. */
public final class FileStoreSourceReader<T>
public final class FileStoreSourceReader
extends SingleThreadMultiplexSourceReaderBase<
T, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {
RecordIterator<RowData>, RowData, FileStoreSourceSplit, FileStoreSourceSplitState> {

public FileStoreSourceReader(
RecordsFunction<T> recordsFunction,
SourceReaderContext readerContext,
TableRead tableRead,
@Nullable Long limit) {
this(
recordsFunction,
readerContext,
tableRead,
limit == null ? null : new RecordLimiter(limit));
}

private FileStoreSourceReader(
RecordsFunction<T> recordsFunction,
SourceReaderContext readerContext,
TableRead tableRead,
@Nullable RecordLimiter limiter) {
SourceReaderContext readerContext, TableRead tableRead, @Nullable Long limit) {
// limiter is created in SourceReader, it can be shared in all split readers
super(
() -> new FileStoreSourceSplitReader<>(recordsFunction, tableRead, limiter),
recordsFunction,
() -> new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(limit)),
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;

/** The {@link SplitReader} implementation for the file store source. */
public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSourceSplit> {

private final RecordsFunction<T> recordsFunction;
public class FileStoreSourceSplitReader
implements SplitReader<BulkFormat.RecordIterator<RowData>, FileStoreSourceSplit> {

private final TableRead tableRead;

Expand All @@ -62,11 +63,7 @@ public class FileStoreSourceSplitReader<T> implements SplitReader<T, FileStoreSo

private boolean paused;

public FileStoreSourceSplitReader(
RecordsFunction<T> recordsFunction,
TableRead tableRead,
@Nullable RecordLimiter limiter) {
this.recordsFunction = recordsFunction;
public FileStoreSourceSplitReader(TableRead tableRead, @Nullable RecordLimiter limiter) {
this.tableRead = tableRead;
this.limiter = limiter;
this.splits = new LinkedList<>();
Expand All @@ -76,9 +73,9 @@ public FileStoreSourceSplitReader(
}

@Override
public RecordsWithSplitIds<T> fetch() throws IOException {
public RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> fetch() throws IOException {
if (paused) {
return new RecordsFunction.RecordsWithPausedSplit<>();
return new RecordsWithPausedSplit<>();
}

checkSplitOrStartNext();
Expand All @@ -98,7 +95,7 @@ public RecordsWithSplitIds<T> fetch() throws IOException {
pool.recycler().recycle(iterator);
return finishSplit();
}
return recordsFunction.createRecords(currentSplitId, iterator.replace(nextBatch));
return FlinkRecordsWithSplitIds.forRecords(currentSplitId, iterator.replace(nextBatch));
}

private boolean reachLimit() {
Expand Down Expand Up @@ -199,16 +196,16 @@ private void seek(long toSkip) throws IOException {
}
}

private RecordsWithSplitIds<T> finishSplit() throws IOException {
private FlinkRecordsWithSplitIds finishSplit() throws IOException {
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
}
currentReader = null;
}

final RecordsWithSplitIds<T> finishRecords =
recordsFunction.createRecordsWithFinishedSplit(currentSplitId);
final FlinkRecordsWithSplitIds finishRecords =
FlinkRecordsWithSplitIds.finishedSplit(currentSplitId);
currentSplitId = null;
return finishRecords;
}
Expand Down Expand Up @@ -275,4 +272,25 @@ public RecordReader<InternalRow> recordReader() throws IOException {
return lazyRecordReader;
}
}

/** Indicates that the {@link FileStoreSourceSplitReader} is paused. */
private static class RecordsWithPausedSplit<T> implements RecordsWithSplitIds<T> {

@Nullable
@Override
public String nextSplit() {
return null;
}

@Nullable
@Override
public T nextRecordFromSplit() {
return null;
}

@Override
public Set<String> finishedSplits() {
return Collections.emptySet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import org.apache.paimon.utils.Reference;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.RowData;

import javax.annotation.Nullable;

Expand All @@ -32,19 +35,19 @@
* A {@link RecordsWithSplitIds} which contains only one iterator record. This can ensure that there
* will be no checkpoint segmentation in iterator consumption.
*/
public class SingleIteratorRecords<T> implements RecordsWithSplitIds<BulkFormat.RecordIterator<T>> {
public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds<RecordIterator<RowData>> {

@Nullable private String splitId;

@Nullable private Reference<BulkFormat.RecordIterator<T>> recordsForSplitCurrent;
@Nullable private Reference<RecordIterator<RowData>> recordsForSplitCurrent;

@Nullable private final BulkFormat.RecordIterator<T> recordsForSplit;
@Nullable private final RecordIterator<RowData> recordsForSplit;

private final Set<String> finishedSplits;

private SingleIteratorRecords(
private FlinkRecordsWithSplitIds(
@Nullable String splitId,
@Nullable BulkFormat.RecordIterator<T> recordsForSplit,
@Nullable RecordIterator<RowData> recordsForSplit,
Set<String> finishedSplits) {
this.splitId = splitId;
this.recordsForSplit = recordsForSplit;
Expand All @@ -67,12 +70,12 @@ public String nextSplit() {

@Nullable
@Override
public BulkFormat.RecordIterator<T> nextRecordFromSplit() {
public RecordIterator<RowData> nextRecordFromSplit() {
if (this.recordsForSplitCurrent == null) {
throw new IllegalStateException();
}

BulkFormat.RecordIterator<T> recordsForSplit = this.recordsForSplitCurrent.get();
RecordIterator<RowData> recordsForSplit = this.recordsForSplitCurrent.get();
this.recordsForSplitCurrent.set(null);
return recordsForSplit;
}
Expand All @@ -89,12 +92,23 @@ public void recycle() {
}
}

public static <T> SingleIteratorRecords<T> forRecords(
String splitId, BulkFormat.RecordIterator<T> recordsForSplit) {
return new SingleIteratorRecords<>(splitId, recordsForSplit, Collections.emptySet());
public static FlinkRecordsWithSplitIds forRecords(
String splitId, RecordIterator<RowData> recordsForSplit) {
return new FlinkRecordsWithSplitIds(splitId, recordsForSplit, Collections.emptySet());
}

public static <T> SingleIteratorRecords<T> finishedSplit(String splitId) {
return new SingleIteratorRecords<>(null, null, Collections.singleton(splitId));
public static FlinkRecordsWithSplitIds finishedSplit(String splitId) {
return new FlinkRecordsWithSplitIds(null, null, Collections.singleton(splitId));
}

public static void emitRecord(
RecordIterator<RowData> element,
SourceOutput<RowData> output,
FileStoreSourceSplitState state) {
RecordAndPosition<RowData> record;
while ((record = element.next()) != null) {
output.collect(record.getRecord());
state.setPosition(record);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.source;

import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;

import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
Expand Down Expand Up @@ -47,12 +46,7 @@ public FlinkSource(ReadBuilder readBuilder, @Nullable Long limit) {

@Override
public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderContext context) {
return createSourceReader(context, readBuilder.newRead(), limit);
}

public FileStoreSourceReader<?> createSourceReader(
SourceReaderContext context, TableRead read, @Nullable Long limit) {
return new FileStoreSourceReader<>(RecordsFunction.forSingle(), context, read, limit);
return new FileStoreSourceReader(context, readBuilder.newRead(), limit);
}

@Override
Expand Down
Loading

0 comments on commit 93e3dd5

Please sign in to comment.