From fd673a2f46206ff65978f05fcb96b525696aead2 Mon Sep 17 00:00:00 2001 From: pvary Date: Fri, 19 Jan 2024 14:04:53 +0100 Subject: [PATCH] [FLINK-33974] Implement the Sink transformation depending on the new SinkV2 interfaces --- .../sink/CommitterOperatorFactory.java | 12 ++- .../operators/sink/SinkWriterOperator.java | 23 +++-- .../sink/StatefulSinkWriterStateHandler.java | 13 ++- .../SinkTransformationTranslator.java | 85 ++++++++++++------- .../SinkV2TransformationTranslatorITCase.java | 9 +- .../api/graph/StreamGraphGeneratorTest.java | 23 +++++ .../util/TestExpandingSinkWithMixin.java | 85 +++++++++++++++++++ 7 files changed, 186 insertions(+), 64 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java index 1f013ca0580ae..162f8d041817e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java @@ -19,9 +19,9 @@ package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.SupportsCommitter; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -41,14 +41,12 @@ public final class CommitterOperatorFactory implements OneInputStreamOperatorFactory< CommittableMessage, CommittableMessage> { - private final TwoPhaseCommittingSink sink; + private final SupportsCommitter sink; private final boolean isBatchMode; private final boolean isCheckpointingEnabled; public CommitterOperatorFactory( - TwoPhaseCommittingSink sink, - boolean isBatchMode, - boolean isCheckpointingEnabled) { + SupportsCommitter sink, boolean isBatchMode, boolean isCheckpointingEnabled) { this.sink = checkNotNull(sink); this.isBatchMode = isBatchMode; this.isCheckpointingEnabled = isCheckpointingEnabled; @@ -65,7 +63,7 @@ public >> T createStreamOpera processingTimeService, sink.getCommittableSerializer(), context -> sink.createCommitter(context), - sink instanceof WithPostCommitTopology, + sink instanceof SupportsPostCommitTopology, isBatchMode, isCheckpointingEnabled); committerOperator.setup( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index c0a9892d5ee16..a9d0f682a7864 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -24,12 +24,12 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.connector.sink2.SupportsWriterState; import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -113,18 +113,17 @@ class SinkWriterOperator extends AbstractStreamOperator(); - this.emitDownstream = sink instanceof TwoPhaseCommittingSink; + this.emitDownstream = sink instanceof SupportsCommitter; - if (sink instanceof StatefulSink) { + if (sink instanceof SupportsWriterState) { writerStateHandler = - new StatefulSinkWriterStateHandler<>((StatefulSink) sink); + new StatefulSinkWriterStateHandler<>((SupportsWriterState) sink); } else { writerStateHandler = new StatelessSinkWriterStateHandler<>(sink); } - if (sink instanceof TwoPhaseCommittingSink) { - committableSerializer = - ((TwoPhaseCommittingSink) sink).getCommittableSerializer(); + if (sink instanceof SupportsCommitter) { + committableSerializer = ((SupportsCommitter) sink).getCommittableSerializer(); } else { committableSerializer = null; } @@ -188,13 +187,13 @@ private void emitCommittables(Long checkpointId) throws IOException, Interrupted if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded - if (sinkWriter instanceof PrecommittingSinkWriter) { - ((PrecommittingSinkWriter) sinkWriter).prepareCommit(); + if (sinkWriter instanceof CommittingSinkWriter) { + ((CommittingSinkWriter) sinkWriter).prepareCommit(); } return; } Collection committables = - ((PrecommittingSinkWriter) sinkWriter).prepareCommit(); + ((CommittingSinkWriter) sinkWriter).prepareCommit(); StreamingRuntimeContext runtimeContext = getRuntimeContext(); final int indexOfThisSubtask = runtimeContext.getTaskInfo().getIndexOfThisSubtask(); final int numberOfParallelSubtasks = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java index 2e83a3b8a6c2a..3c70627a50e83 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.StatefulSinkWriter; import org.apache.flink.api.connector.sink2.SupportsWriterState; import org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState; @@ -79,10 +78,12 @@ final class StatefulSinkWriterStateHandler private StatefulSinkWriter sinkWriter; - public StatefulSinkWriterStateHandler(StatefulSink sink) { - this.sink = sink; + public StatefulSinkWriterStateHandler(SupportsWriterState sink) { + Preconditions.checkArgument( + sink instanceof Sink, "Should be an instance of " + Sink.class.getName()); + this.sink = (Sink) sink; Collection previousSinkStateNames = - sink instanceof StatefulSink.WithCompatibleState + sink instanceof SupportsWriterState.WithCompatibleState ? ((WithCompatibleState) sink).getCompatibleWriterStateNames() : Collections.emptyList(); this.writerStateSimpleVersionedSerializer = sink.getWriterStateSerializer(); @@ -116,10 +117,6 @@ public SinkWriter createWriter( Iterables.addAll(states, previousSinkState.get()); } - if (!(sink instanceof SupportsWriterState)) { - throw new IllegalArgumentException("Sink should implement SupportsWriterState"); - } - sinkWriter = ((SupportsWriterState) sink).restoreWriter(initContext, states); } else { sinkWriter = cast(sink.createWriter(initContext)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index ce009320659b7..e924086a1d824 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -24,15 +24,15 @@ import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.SupportsCommitter; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; -import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; -import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology; -import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -44,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -135,16 +136,27 @@ private void expand() { DataStream prewritten = inputStream; - if (sink instanceof WithPreWriteTopology) { + if (sink instanceof SupportsPreWriteTopology) { prewritten = adjustTransformations( prewritten, - ((WithPreWriteTopology) sink)::addPreWriteTopology, + ((SupportsPreWriteTopology) sink)::addPreWriteTopology, true, sink instanceof SupportsConcurrentExecutionAttempts); } - if (sink instanceof TwoPhaseCommittingSink) { + if (sink instanceof SupportsPreCommitTopology) { + Preconditions.checkArgument( + sink instanceof SupportsCommitter, + "Sink with SupportsPreCommitTopology should implement SupportsCommitter"); + } + if (sink instanceof SupportsPostCommitTopology) { + Preconditions.checkArgument( + sink instanceof SupportsCommitter, + "Sink with SupportsPostCommitTopology should implement SupportsCommitter"); + } + + if (sink instanceof SupportsCommitter) { addCommittingTopology(sink, prewritten); } else { adjustTransformations( @@ -173,32 +185,27 @@ private void expand() { } } - private void addCommittingTopology(Sink sink, DataStream inputStream) { - TwoPhaseCommittingSink committingSink = - (TwoPhaseCommittingSink) sink; - TypeInformation> typeInformation = + private void addCommittingTopology( + Sink sink, DataStream inputStream) { + SupportsCommitter committingSink = (SupportsCommitter) sink; + TypeInformation> committableTypeInformation = CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer); - DataStream> written = - adjustTransformations( - inputStream, - input -> - input.transform( - WRITER_NAME, - typeInformation, - new SinkWriterOperatorFactory<>(sink)), - false, - sink instanceof SupportsConcurrentExecutionAttempts); + DataStream> precommitted; + if (sink instanceof SupportsPreCommitTopology) { + SupportsPreCommitTopology preCommittingSink = + (SupportsPreCommitTopology) sink; + TypeInformation> writeResultTypeInformation = + CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer); - DataStream> precommitted = addFailOverRegion(written); + DataStream> writerResult = + addWriter(sink, inputStream, writeResultTypeInformation); - if (sink instanceof WithPreCommitTopology) { precommitted = adjustTransformations( - precommitted, - ((WithPreCommitTopology) sink)::addPreCommitTopology, - true, - false); + writerResult, preCommittingSink::addPreCommitTopology, true, false); + } else { + precommitted = addWriter(sink, inputStream, committableTypeInformation); } DataStream> committed = @@ -207,7 +214,7 @@ private void addCommittingTopology(Sink sink, DataStream inputStre pc -> pc.transform( COMMITTER_NAME, - typeInformation, + committableTypeInformation, new CommitterOperatorFactory<>( committingSink, isBatchMode, @@ -215,12 +222,12 @@ private void addCommittingTopology(Sink sink, DataStream inputStre false, false); - if (sink instanceof WithPostCommitTopology) { + if (sink instanceof SupportsPostCommitTopology) { DataStream> postcommitted = addFailOverRegion(committed); adjustTransformations( postcommitted, pc -> { - ((WithPostCommitTopology) sink).addPostCommitTopology(pc); + ((SupportsPostCommitTopology) sink).addPostCommitTopology(pc); return null; }, true, @@ -228,6 +235,24 @@ private void addCommittingTopology(Sink sink, DataStream inputStre } } + private DataStream> addWriter( + Sink sink, + DataStream inputStream, + TypeInformation> typeInformation) { + DataStream> written = + adjustTransformations( + inputStream, + input -> + input.transform( + WRITER_NAME, + typeInformation, + new SinkWriterOperatorFactory<>(sink)), + false, + sink instanceof SupportsConcurrentExecutionAttempts); + + return addFailOverRegion(written); + } + /** * Adds a batch exchange that materializes the output first. This is a no-op in STREAMING. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java index 612cb9d780816..97b23ababaca0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java @@ -67,10 +67,7 @@ public void testSettingOperatorUidHash() { .setWriterUidHash(writerHash) .setCommitterUidHash(committerHash) .build(); - src.sinkTo( - TestSinkV2.newBuilder().setDefaultCommitter().build(), - operatorsUidHashes) - .name(NAME); + src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); final StreamGraph streamGraph = env.getStreamGraph(); @@ -87,9 +84,7 @@ public void testSettingOperatorUids() { final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final DataStreamSource src = env.fromElements(1, 2); - src.sinkTo(TestSinkV2.newBuilder().setDefaultCommitter().build()) - .name(NAME) - .uid(sinkUid); + src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid); final StreamGraph streamGraph = env.getStreamGraph(); assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index f2a47b2a712e3..c9e56cad31964 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -74,6 +74,7 @@ import org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator; import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.TestExpandingSink; +import org.apache.flink.streaming.util.TestExpandingSinkWithMixin; import org.apache.flink.util.AbstractID; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -908,6 +909,28 @@ public void testAutoParallelismForExpandedTransformations() { }); } + @Test + public void testAutoParallelismForExpandedTransformationsWithMixin() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(2); + + DataStream sourceDataStream = env.fromData(1, 2, 3); + // Parallelism is set to -1 (default parallelism identifier) to imitate the behavior of + // the table planner. Parallelism should be set automatically after translating. + sourceDataStream.sinkTo(new TestExpandingSinkWithMixin()).setParallelism(-1); + + StreamGraph graph = env.getStreamGraph(); + + graph.getStreamNodes() + .forEach( + node -> { + if (!node.getOperatorName().startsWith("Source")) { + Assertions.assertThat(node.getParallelism()).isEqualTo(2); + } + }); + } + @Test public void testCacheTransformation() { final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java new file mode 100644 index 0000000000000..b3c7ba7bc922f --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; + +import java.io.IOException; + +/** A test sink that expands into a simple subgraph. Do not use in runtime. */ +public class TestExpandingSinkWithMixin + implements Sink, + SupportsCommitter, + SupportsPreWriteTopology, + SupportsPreCommitTopology, + SupportsPostCommitTopology { + + @Override + public void addPostCommitTopology(DataStream> committables) { + committables.sinkTo(new DiscardingSink<>()); + } + + @Override + public DataStream> addPreCommitTopology( + DataStream> committables) { + return committables.map(value -> value).returns(committables.getType()); + } + + @Override + public DataStream addPreWriteTopology(DataStream inputDataStream) { + return inputDataStream.map(new NoOpIntMap()); + } + + @Override + public SinkWriter createWriter(WriterInitContext context) throws IOException { + return null; + } + + @Override + public SinkWriter createWriter(InitContext context) throws IOException { + return null; + } + + @Override + public Committer createCommitter(CommitterInitContext context) { + return null; + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return null; + } + + @Override + public SimpleVersionedSerializer getWriteResultSerializer() { + return null; + } +}