diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SliceAttachedWindowingStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SliceAttachedWindowingStrategy.java index cc61d215fd4fb..1cf8e30b96139 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SliceAttachedWindowingStrategy.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SliceAttachedWindowingStrategy.java @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.logical; -import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; +import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -31,7 +31,7 @@ * A windowing strategy that gets windows from input columns as window slice have been assigned and * attached to the physical columns. The window slice is usually identified by slice end timestamp. * - * @see SlicingWindowOperator for more details about which windows can apply slicing. + * @see SlicingWindowProcessor for more details about which windows can apply slicing. */ @JsonTypeName("SliceAttached") public class SliceAttachedWindowingStrategy extends WindowingStrategy { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java index 3a29502f23d9b..24778e11f54f4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.plan.logical; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; + import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -37,8 +39,7 @@ public interface WindowSpec { /** * Return true if the window is aligned. * - *
See more details about aligned window and unaligned window in {@link - * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + *
See more details about aligned window and unaligned window in {@link WindowAggOperator}. */ @JsonIgnore boolean isAlignedWindow(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java index f6939cffb4601..77494b721e0fc 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java @@ -29,16 +29,16 @@ import org.apache.flink.table.runtime.operators.aggregate.window.processors.UnsliceWindowAggProcessor; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner; -import org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigners.HoppingSliceAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceUnsharedAssigner; -import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner; -import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; @@ -48,8 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The {@link WindowAggOperatorBuilder} is used to build a {@link SlicingWindowOperator} with - * slicing window or a {@link UnslicingWindowOperator} for with unslicing window. + * The {@link WindowAggOperatorBuilder} is used to build a {@link WindowAggOperator} with {@link + * SlicingWindowProcessor} or a {@link UnslicingWindowProcessor}. * *
* WindowAggOperatorBuilder.builder() @@ -140,22 +140,24 @@ public WindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { return this; } - public WindowOperatorBasebuild() { + public WindowAggOperator build() { checkNotNull(assigner); checkNotNull(inputSerializer); checkNotNull(keySerializer); checkNotNull(accSerializer); checkNotNull(generatedAggregateFunction); + final WindowProcessor> windowProcessor; if (assigner instanceof SliceAssigner) { - return buildSlicingWindowOperator(); + windowProcessor = buildSlicingWindowProcessor(); } else { - return buildUnslicingWindowOperator(); + windowProcessor = buildUnslicingWindowProcessor(); } + return new WindowAggOperator<>(windowProcessor); } @SuppressWarnings("unchecked") - protected WindowOperatorBase buildSlicingWindowOperator() { + private SlicingWindowProcessor buildSlicingWindowProcessor() { boolean isGlobalAgg = localGeneratedAggregateFunction != null && globalGeneratedAggregateFunction != null; @@ -200,19 +202,16 @@ public WindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { throw new IllegalArgumentException( "assigner must be instance of SliceUnsharedAssigner or SliceSharedAssigner."); } - return new SlicingWindowOperator<>(windowProcessor); + return windowProcessor; } @SuppressWarnings("unchecked") - private WindowOperatorBase buildUnslicingWindowOperator() { - final UnsliceWindowAggProcessor windowProcessor = - new UnsliceWindowAggProcessor( - (GeneratedNamespaceAggsHandleFunction ) - generatedAggregateFunction, - (UnsliceAssigner ) assigner, - accSerializer, - indexOfCountStart, - shiftTimeZone); - return new UnslicingWindowOperator<>(windowProcessor); + private UnsliceWindowAggProcessor buildUnslicingWindowProcessor() { + return new UnsliceWindowAggProcessor( + (GeneratedNamespaceAggsHandleFunction ) generatedAggregateFunction, + (UnsliceAssigner ) assigner, + accSerializer, + indexOfCountStart, + shiftTimeZone); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java index 578abc14a9fa2..16f4228f3c816 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorBuilder.java @@ -24,7 +24,7 @@ import org.apache.flink.table.runtime.operators.deduplicate.window.combines.RowTimeDeduplicateRecordsCombiner; import org.apache.flink.table.runtime.operators.deduplicate.window.processors.RowTimeWindowDeduplicateProcessor; import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner; -import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; @@ -35,8 +35,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The {@link RowTimeWindowDeduplicateOperatorBuilder} is used to build a {@link - * SlicingWindowOperator} for rowtime window deduplicate. + * The {@link RowTimeWindowDeduplicateOperatorBuilder} is used to build a {@link WindowAggOperator} + * for rowtime window deduplicate. * * * RowTimeWindowDeduplicateOperatorBuilder.builder() @@ -93,7 +93,7 @@ public RowTimeWindowDeduplicateOperatorBuilder windowEndIndex(int windowEndIndex return this; } - public SlicingWindowOperatorbuild() { + public WindowAggOperator build() { checkNotNull(inputSerializer); checkNotNull(keySerializer); checkArgument( @@ -108,6 +108,6 @@ public RowTimeWindowDeduplicateOperatorBuilder windowEndIndex(int windowEndIndex final SlicingWindowProcessor windowProcessor = new RowTimeWindowDeduplicateProcessor( inputSerializer, bufferFactory, windowEndIndex, shiftTimeZone); - return new SlicingWindowOperator<>(windowProcessor); + return new WindowAggOperator<>(windowProcessor); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java index 6ac39a0d50b3f..cf0b8f68d7603 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorBuilder.java @@ -26,7 +26,7 @@ import org.apache.flink.table.runtime.operators.rank.window.combines.TopNRecordsCombiner; import org.apache.flink.table.runtime.operators.rank.window.processors.WindowRankProcessor; import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner; -import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; @@ -37,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The {@link WindowRankOperatorBuilder} is used to build a {@link SlicingWindowOperator} for window + * The {@link WindowRankOperatorBuilder} is used to build a {@link WindowAggOperator} for window * rank. * * @@ -116,7 +116,7 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { return this; } - public SlicingWindowOperator* - *build() { + public WindowAggOperator build() { checkNotNull(inputSerializer); checkNotNull(keySerializer); checkNotNull(sortKeySelector); @@ -152,6 +152,6 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) { outputRankNumber, windowEndIndex, shiftTimeZone); - return new SlicingWindowOperator<>(windowProcessor); + return new WindowAggOperator<>(windowProcessor); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowOperatorBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java similarity index 96% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowOperatorBase.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java index e4303058312ec..de797ed934a09 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowOperatorBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java @@ -44,6 +44,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor; import java.util.Collections; @@ -85,11 +87,13 @@ * * Note: currently, {@link WindowOperatorBase} doesn't support early-fire and late-arrival. Thus + *
Note: currently, {@link WindowAggOperator} doesn't support early-fire and late-arrival. Thus * late elements (elements belong to emitted windows) will be simply dropped. + * + *
See more in {@link SlicingWindowProcessor} and {@link UnslicingWindowProcessor}. */ @Internal -public abstract class WindowOperatorBase
extends TableStreamOperator +public final class WindowAggOperator extends TableStreamOperator implements OneInputStreamOperator , Triggerable , KeyContext { private static final long serialVersionUID = 1L; @@ -123,7 +127,7 @@ public abstract class WindowOperatorBase extends TableStreamOperator watermarkLatency; - public WindowOperatorBase(WindowProcessor windowProcessor) { + public WindowAggOperator(WindowProcessor windowProcessor) { this.windowProcessor = windowProcessor; setChainingStrategy(ChainingStrategy.ALWAYS); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java index e93d894384bb1..b89b3db93a664 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAssigner.java @@ -30,7 +30,7 @@ * window assigner is translated from the new window TVF syntax, but the other is from the legacy * GROUP WINDOW FUNCTION syntax. In the long future, {@link GroupWindowAssigner} will be dropped. * - * See more details in {@link WindowOperatorBase}. + *
See more details in {@link WindowAggOperator}. */ @Internal public interface WindowAssigner extends Serializable { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java index 17c5a4b512574..87867d5f4b212 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import java.time.ZoneId; import java.util.Collection; @@ -31,8 +32,7 @@ /** * The operator for aligned window table function. * - *
See more details about aligned window and unaligned window in {@link - * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + *
See more details about aligned window and unaligned window in {@link WindowAggOperator}. * *
Note: The operator only applies for Window TVF with row semantics (e.g TUMBLE/HOP/CUMULATE) * instead of set semantics (e.g SESSION). diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java index 6f93df0af7875..02b30e8f603c9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java @@ -49,6 +49,7 @@ import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiConsumerWithException; @@ -67,8 +68,7 @@ /** * The operator for unaligned window table function. * - *
See more details about aligned window and unaligned window in {@link - * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + *
See more details about aligned window and unaligned window in {@link WindowAggOperator}. * *
Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row * semantics (e.g TUMBLE/HOP/CUMULATE). diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java index 2aa4cde6bd337..00b62e1eb91ed 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SliceAssigner.java @@ -30,7 +30,7 @@ *
Note: {@link SliceAssigner} servers as a base interface. Concrete assigners should implement * interface {@link SliceSharedAssigner} or {@link SliceUnsharedAssigner}. * - * @see SlicingWindowOperator for more definition of slice. + * @see SlicingWindowProcessor for more definition of slice. */ @Internal public interface SliceAssigner extends WindowAssigner { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowOperator.java deleted file mode 100644 index 5407955d240ce..0000000000000 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowOperator.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.window.tvf.slicing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor; -import org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase; - -/** - * The {@link SlicingWindowOperator} implements an optimized processing for aligned windows which - * can apply the slicing optimization. The core idea of slicing optimization is to divide all - * elements from a data stream into a finite number of non-overlapping chunks (a.k.a. slices). - * - *
Concept of Slice
- * - *Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the - * chunks are slices. It has the following properties: - * - *
- *
- * - *- An element must only belong to a single slice. - *
- Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j. - *
- A window is consist of a finite number of slices. - *
Abstraction of Slicing Window Operator
- * - *A slicing window operator is a simple wrap of {@link SlicingWindowProcessor}. It delegates all - * the important methods to the underlying processor, where the processor can have different - * implementation for aggregate and topk or others. - * - *
A {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign slices - * and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an example. - * - *
Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows, - * therefore, it doesn't support unaligned windows, e.g. session window. - * - *
See more details in {@link WindowOperatorBase}. - */ -@Internal -public final class SlicingWindowOperator
extends WindowOperatorBase { - - private static final long serialVersionUID = 1L; - - public SlicingWindowOperator(SlicingWindowProcessor windowProcessor) { - super(windowProcessor); - } -} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowProcessor.java index 85b636f8119a9..b7a177389377c 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowProcessor.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/slicing/SlicingWindowProcessor.java @@ -19,8 +19,37 @@ package org.apache.flink.table.runtime.operators.window.tvf.slicing; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor; -/** A processor that processes elements for slicing windows. */ +/** + * The {@link SlicingWindowProcessor} is an optimized processing for aligned windows which can apply + * the slicing optimization. The core idea of slicing optimization is to divide all elements from a + * data stream into a finite number of non-overlapping chunks (a.k.a. slices). + * + * Concept of Slice
+ * + *Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the + * chunks are slices. It has the following properties: + * + *
+ *
+ * + *- An element must only belong to a single slice. + *
- Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j. + *
- A window is consist of a finite number of slices. + *
The {@link SlicingWindowProcessor} have different implementation for aggregate and topk or + * others. + * + *
The {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign + * slices and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an + * example. + * + *
Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows, + * therefore, it doesn't support unaligned windows, e.g. session window. + * + *
See more details in {@link WindowAggOperator}. + */ @Internal public interface SlicingWindowProcessor
extends WindowProcessor {} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigner.java index 1abc5b3b18b61..7bddbdffe2db4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigner.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.aggregate.window.processors.UnsliceWindowAggProcessor; import org.apache.flink.table.runtime.operators.window.Window; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; @@ -40,7 +41,7 @@ * boundaries are determined based on the messages timestamps and their correlations, some windows * may be merged into one. * - * @see UnslicingWindowOperator for more definition of unslice window. + * @see UnsliceWindowAggProcessor for more definition of unslice window. */ @Internal public interface UnsliceAssigner extends WindowAssigner { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowOperator.java deleted file mode 100644 index 2944af3e692d4..0000000000000 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowOperator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime.operators.window.tvf.unslicing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase; - -/** - * The {@link UnslicingWindowOperator} implements an optimized processing for unaligned windows. - * - * Abstraction of Unslicing Window Operator
- * - *An unslicing window operator is a simple wrap of {@link UnslicingWindowProcessor}. It - * delegates all the important methods to the underlying processor. - * - *
A {@link UnslicingWindowProcessor} usually leverages the {@link UnsliceAssigner} to assign - * slices and calculate based on the window. - * - *
Note: Currently, the {@link UnslicingWindowOperator} only support session time window. - */ -@Internal -public class UnslicingWindowOperator
extends WindowOperatorBase { - - private static final long serialVersionUID = 1L; - - public UnslicingWindowOperator(UnslicingWindowProcessor windowProcessor) { - super(windowProcessor); - } -} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowProcessor.java index 2e4e094ff4e89..cc53821f15ad6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowProcessor.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnslicingWindowProcessor.java @@ -19,8 +19,18 @@ package org.apache.flink.table.runtime.operators.window.tvf.unslicing; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor; -/** A processor that processes elements for unslicing windows. */ +/** + * The {@link UnslicingWindowProcessor} is an optimized processing for unaligned windows. + * + * A {@link UnslicingWindowProcessor} usually leverages the {@link UnsliceAssigner} to assign + * slices and calculate based on the window. + * + *
Note: Currently, the {@link UnslicingWindowProcessor} only support session time window. + * + *
See more details in {@link WindowAggOperator}. + */ @Internal public interface UnslicingWindowProcessor
extends WindowProcessor {} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index 7d199df60fb59..bb5054353de16 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -22,9 +22,9 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigners; -import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; import org.junit.Test; import org.junit.runner.RunWith; @@ -56,16 +56,15 @@ public void testEventTimeHoppingWindows() throws Exception { 2, shiftTimeZone, Duration.ofSeconds(3), Duration.ofSeconds(1)); final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); - SlicingWindowOperator operator = - (SlicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .countStarIndex(1) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .countStarIndex(1) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -165,16 +164,15 @@ public void testProcessingTimeHoppingWindows() throws Exception { SliceAssigners.hopping(-1, shiftTimeZone, Duration.ofHours(3), Duration.ofHours(1)); final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); - SlicingWindowOperator operator = - (SlicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .countStarIndex(1) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .countStarIndex(1) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -289,15 +287,14 @@ public void testEventTimeCumulativeWindows() throws Exception { 2, shiftTimeZone, Duration.ofSeconds(3), Duration.ofSeconds(1)); final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); - SlicingWindowOperator operator = - (SlicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -407,15 +404,14 @@ public void testProcessingTimeCumulativeWindows() throws Exception { -1, shiftTimeZone, Duration.ofDays(1), Duration.ofHours(8)); final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); - SlicingWindowOperator operator = - (SlicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -543,15 +539,14 @@ public void testEventTimeTumblingWindows() throws Exception { SliceAssigners.tumbling(2, shiftTimeZone, Duration.ofSeconds(3)); final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); - SlicingWindowOperator operator = - (SlicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -652,15 +647,14 @@ public void testProcessingTimeTumblingWindows() throws Exception { final SlicingSumAndCountAggsFunction aggsFunction = new SlicingSumAndCountAggsFunction(assigner); - SlicingWindowOperator operator = - (SlicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java index 4fd499f20a570..05fb14bfd5b94 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/UnslicingWindowAggOperatorTest.java @@ -25,9 +25,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector; import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner; import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners; -import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; @@ -67,15 +67,14 @@ public void testEventTimeSessionWindows() throws Exception { final UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction(); - UnslicingWindowOperator operator = - (UnslicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -175,16 +174,15 @@ public void testEventTimeSessionWindowsWithChangelog() throws Exception { final UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction(); - UnslicingWindowOperator operator = - (UnslicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .countStarIndex(1) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .countStarIndex(1) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -305,15 +303,14 @@ public void testProcessingTimeSessionWindows() throws Exception { final UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction(); - UnslicingWindowOperator operator = - (UnslicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -398,16 +395,15 @@ public void testProcessingTimeSessionWindowsWithChangelog() throws Exception { final UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction(); - UnslicingWindowOperator operator = - (UnslicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer(KEY_SER) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .countStarIndex(1) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer(KEY_SER) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .countStarIndex(1) + .build(); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); @@ -543,17 +539,16 @@ public void testSessionWindowsWithoutPartitionKey() throws Exception { final EmptyRowDataKeySelector keySelector = EmptyRowDataKeySelector.INSTANCE; final UnslicingSumAndCountAggsFunction aggsFunction = new UnslicingSumAndCountAggsFunction(); - UnslicingWindowOperator operator = - (UnslicingWindowOperator ) - WindowAggOperatorBuilder.builder() - .inputSerializer(INPUT_ROW_SER) - .shiftTimeZone(shiftTimeZone) - .keySerializer( - (PagedTypeSerializer ) - keySelector.getProducedType().toSerializer()) - .assigner(assigner) - .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) - .build(); + WindowAggOperator operator = + WindowAggOperatorBuilder.builder() + .inputSerializer(INPUT_ROW_SER) + .shiftTimeZone(shiftTimeZone) + .keySerializer( + (PagedTypeSerializer ) + keySelector.getProducedType().toSerializer()) + .assigner(assigner) + .aggregate(createGeneratedAggsHandle(aggsFunction), ACC_SER) + .build(); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>( diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java index 88c1aabc0ec07..d1fd2e933ac53 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorTestBase.java @@ -28,7 +28,7 @@ import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; @@ -109,7 +109,7 @@ protected long localMills(long epochMills) { // ============================== Util Functions ============================== protected static OneInputStreamOperatorTestHarness createTestHarness( - WindowOperatorBase operator) throws Exception { + WindowAggOperator operator) throws Exception { return new KeyedOneInputStreamOperatorTestHarness<>( operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType()); } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java index 7109cadace781..af097f1816a85 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; +import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator; @@ -100,14 +100,14 @@ public static Collection