Skip to content

Commit

Permalink
[FLINK-33974] Implement the Sink transformation depending on the new …
Browse files Browse the repository at this point in the history
…SinkV2 interfaces
  • Loading branch information
pvary authored Jan 19, 2024
1 parent b309ceb commit fd673a2
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
Expand All @@ -41,14 +41,12 @@ public final class CommitterOperatorFactory<CommT>
implements OneInputStreamOperatorFactory<
CommittableMessage<CommT>, CommittableMessage<CommT>> {

private final TwoPhaseCommittingSink<?, CommT> sink;
private final SupportsCommitter<CommT> sink;
private final boolean isBatchMode;
private final boolean isCheckpointingEnabled;

public CommitterOperatorFactory(
TwoPhaseCommittingSink<?, CommT> sink,
boolean isBatchMode,
boolean isCheckpointingEnabled) {
SupportsCommitter<CommT> sink, boolean isBatchMode, boolean isCheckpointingEnabled) {
this.sink = checkNotNull(sink);
this.isBatchMode = isBatchMode;
this.isCheckpointingEnabled = isCheckpointingEnabled;
Expand All @@ -65,7 +63,7 @@ public <T extends StreamOperator<CommittableMessage<CommT>>> T createStreamOpera
processingTimeService,
sink.getCommittableSerializer(),
context -> sink.createCommitter(context),
sink instanceof WithPostCommitTopology,
sink instanceof SupportsPostCommitTopology,
isBatchMode,
isCheckpointingEnabled);
committerOperator.setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand Down Expand Up @@ -113,18 +113,17 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab
this.processingTimeService = checkNotNull(processingTimeService);
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.context = new Context<>();
this.emitDownstream = sink instanceof TwoPhaseCommittingSink;
this.emitDownstream = sink instanceof SupportsCommitter;

if (sink instanceof StatefulSink) {
if (sink instanceof SupportsWriterState) {
writerStateHandler =
new StatefulSinkWriterStateHandler<>((StatefulSink<InputT, ?>) sink);
new StatefulSinkWriterStateHandler<>((SupportsWriterState<InputT, ?>) sink);
} else {
writerStateHandler = new StatelessSinkWriterStateHandler<>(sink);
}

if (sink instanceof TwoPhaseCommittingSink) {
committableSerializer =
((TwoPhaseCommittingSink<InputT, CommT>) sink).getCommittableSerializer();
if (sink instanceof SupportsCommitter) {
committableSerializer = ((SupportsCommitter<CommT>) sink).getCommittableSerializer();
} else {
committableSerializer = null;
}
Expand Down Expand Up @@ -188,13 +187,13 @@ private void emitCommittables(Long checkpointId) throws IOException, Interrupted
if (!emitDownstream) {
// To support SinkV1 topologies with only a writer we have to call prepareCommit
// although no committables are forwarded
if (sinkWriter instanceof PrecommittingSinkWriter) {
((PrecommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
if (sinkWriter instanceof CommittingSinkWriter) {
((CommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
}
return;
}
Collection<CommT> committables =
((PrecommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
((CommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
StreamingRuntimeContext runtimeContext = getRuntimeContext();
final int indexOfThisSubtask = runtimeContext.getTaskInfo().getIndexOfThisSubtask();
final int numberOfParallelSubtasks =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
import org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
Expand Down Expand Up @@ -79,10 +78,12 @@ final class StatefulSinkWriterStateHandler<InputT, WriterStateT>

private StatefulSinkWriter<InputT, WriterStateT> sinkWriter;

public StatefulSinkWriterStateHandler(StatefulSink<InputT, WriterStateT> sink) {
this.sink = sink;
public StatefulSinkWriterStateHandler(SupportsWriterState<InputT, WriterStateT> sink) {
Preconditions.checkArgument(
sink instanceof Sink, "Should be an instance of " + Sink.class.getName());
this.sink = (Sink<InputT>) sink;
Collection<String> previousSinkStateNames =
sink instanceof StatefulSink.WithCompatibleState
sink instanceof SupportsWriterState.WithCompatibleState
? ((WithCompatibleState) sink).getCompatibleWriterStateNames()
: Collections.emptyList();
this.writerStateSimpleVersionedSerializer = sink.getWriterStateSerializer();
Expand Down Expand Up @@ -116,10 +117,6 @@ public SinkWriter<InputT> createWriter(
Iterables.addAll(states, previousSinkState.get());
}

if (!(sink instanceof SupportsWriterState)) {
throw new IllegalArgumentException("Sink should implement SupportsWriterState");
}

sinkWriter = ((SupportsWriterState) sink).restoreWriter(initContext, states);
} else {
sinkWriter = cast(sink.createWriter(initContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -44,6 +44,7 @@
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -135,16 +136,27 @@ private void expand() {

DataStream<T> prewritten = inputStream;

if (sink instanceof WithPreWriteTopology) {
if (sink instanceof SupportsPreWriteTopology) {
prewritten =
adjustTransformations(
prewritten,
((WithPreWriteTopology<T>) sink)::addPreWriteTopology,
((SupportsPreWriteTopology<T>) sink)::addPreWriteTopology,
true,
sink instanceof SupportsConcurrentExecutionAttempts);
}

if (sink instanceof TwoPhaseCommittingSink) {
if (sink instanceof SupportsPreCommitTopology) {
Preconditions.checkArgument(
sink instanceof SupportsCommitter,
"Sink with SupportsPreCommitTopology should implement SupportsCommitter");
}
if (sink instanceof SupportsPostCommitTopology) {
Preconditions.checkArgument(
sink instanceof SupportsCommitter,
"Sink with SupportsPostCommitTopology should implement SupportsCommitter");
}

if (sink instanceof SupportsCommitter) {
addCommittingTopology(sink, prewritten);
} else {
adjustTransformations(
Expand Down Expand Up @@ -173,32 +185,27 @@ private void expand() {
}
}

private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {
TwoPhaseCommittingSink<T, CommT> committingSink =
(TwoPhaseCommittingSink<T, CommT>) sink;
TypeInformation<CommittableMessage<CommT>> typeInformation =
private <CommT, WriteResultT> void addCommittingTopology(
Sink<T> sink, DataStream<T> inputStream) {
SupportsCommitter<CommT> committingSink = (SupportsCommitter<CommT>) sink;
TypeInformation<CommittableMessage<CommT>> committableTypeInformation =
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);

DataStream<CommittableMessage<CommT>> written =
adjustTransformations(
inputStream,
input ->
input.transform(
WRITER_NAME,
typeInformation,
new SinkWriterOperatorFactory<>(sink)),
false,
sink instanceof SupportsConcurrentExecutionAttempts);
DataStream<CommittableMessage<CommT>> precommitted;
if (sink instanceof SupportsPreCommitTopology) {
SupportsPreCommitTopology<WriteResultT, CommT> preCommittingSink =
(SupportsPreCommitTopology<WriteResultT, CommT>) sink;
TypeInformation<CommittableMessage<WriteResultT>> writeResultTypeInformation =
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);

DataStream<CommittableMessage<CommT>> precommitted = addFailOverRegion(written);
DataStream<CommittableMessage<WriteResultT>> writerResult =
addWriter(sink, inputStream, writeResultTypeInformation);

if (sink instanceof WithPreCommitTopology) {
precommitted =
adjustTransformations(
precommitted,
((WithPreCommitTopology<T, CommT>) sink)::addPreCommitTopology,
true,
false);
writerResult, preCommittingSink::addPreCommitTopology, true, false);
} else {
precommitted = addWriter(sink, inputStream, committableTypeInformation);
}

DataStream<CommittableMessage<CommT>> committed =
Expand All @@ -207,27 +214,45 @@ private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStre
pc ->
pc.transform(
COMMITTER_NAME,
typeInformation,
committableTypeInformation,
new CommitterOperatorFactory<>(
committingSink,
isBatchMode,
isCheckpointingEnabled)),
false,
false);

if (sink instanceof WithPostCommitTopology) {
if (sink instanceof SupportsPostCommitTopology) {
DataStream<CommittableMessage<CommT>> postcommitted = addFailOverRegion(committed);
adjustTransformations(
postcommitted,
pc -> {
((WithPostCommitTopology<T, CommT>) sink).addPostCommitTopology(pc);
((SupportsPostCommitTopology<CommT>) sink).addPostCommitTopology(pc);
return null;
},
true,
false);
}
}

private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(
Sink<T> sink,
DataStream<T> inputStream,
TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
DataStream<CommittableMessage<WriteResultT>> written =
adjustTransformations(
inputStream,
input ->
input.transform(
WRITER_NAME,
typeInformation,
new SinkWriterOperatorFactory<>(sink)),
false,
sink instanceof SupportsConcurrentExecutionAttempts);

return addFailOverRegion(written);
}

/**
* Adds a batch exchange that materializes the output first. This is a no-op in STREAMING.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ public void testSettingOperatorUidHash() {
.setWriterUidHash(writerHash)
.setCommitterUidHash(committerHash)
.build();
src.sinkTo(
TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(),
operatorsUidHashes)
.name(NAME);
src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);

final StreamGraph streamGraph = env.getStreamGraph();

Expand All @@ -87,9 +84,7 @@ public void testSettingOperatorUids() {
final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromElements(1, 2);
src.sinkTo(TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build())
.name(NAME)
.uid(sinkUid);
src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid);

final StreamGraph streamGraph = env.getStreamGraph();
assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.TestExpandingSink;
import org.apache.flink.streaming.util.TestExpandingSinkWithMixin;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
Expand Down Expand Up @@ -908,6 +909,28 @@ public void testAutoParallelismForExpandedTransformations() {
});
}

@Test
public void testAutoParallelismForExpandedTransformationsWithMixin() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(2);

DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
// Parallelism is set to -1 (default parallelism identifier) to imitate the behavior of
// the table planner. Parallelism should be set automatically after translating.
sourceDataStream.sinkTo(new TestExpandingSinkWithMixin()).setParallelism(-1);

StreamGraph graph = env.getStreamGraph();

graph.getStreamNodes()
.forEach(
node -> {
if (!node.getOperatorName().startsWith("Source")) {
Assertions.assertThat(node.getParallelism()).isEqualTo(2);
}
});
}

@Test
public void testCacheTransformation() {
final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
Expand Down
Loading

0 comments on commit fd673a2

Please sign in to comment.