Skip to content

Commit

Permalink
[FLINK-34100][table] Merge SlicingWindowOperator and UnslicingWindowO…
Browse files Browse the repository at this point in the history
…perator

This closes #24162
  • Loading branch information
xuyangzhong authored and lsyldliu committed Jan 25, 2024
1 parent bab14f9 commit 80f6e06
Show file tree
Hide file tree
Showing 20 changed files with 196 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.table.planner.plan.logical;

import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -31,7 +31,7 @@
* A windowing strategy that gets windows from input columns as window slice have been assigned and
* attached to the physical columns. The window slice is usually identified by slice end timestamp.
*
* @see SlicingWindowOperator for more details about which windows can apply slicing.
* @see SlicingWindowProcessor for more details about which windows can apply slicing.
*/
@JsonTypeName("SliceAttached")
public class SliceAttachedWindowingStrategy extends WindowingStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.planner.plan.logical;

import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
Expand All @@ -37,8 +39,7 @@ public interface WindowSpec {
/**
* Return true if the window is aligned.
*
* <p>See more details about aligned window and unaligned window in {@link
* org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
* <p>See more details about aligned window and unaligned window in {@link WindowAggOperator}.
*/
@JsonIgnore
boolean isAlignedWindow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
import org.apache.flink.table.runtime.operators.aggregate.window.processors.UnsliceWindowAggProcessor;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigners.HoppingSliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceUnsharedAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;

Expand All @@ -48,8 +48,8 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The {@link WindowAggOperatorBuilder} is used to build a {@link SlicingWindowOperator} with
* slicing window or a {@link UnslicingWindowOperator} for with unslicing window.
* The {@link WindowAggOperatorBuilder} is used to build a {@link WindowAggOperator} with {@link
* SlicingWindowProcessor} or a {@link UnslicingWindowProcessor}.
*
* <pre>
* WindowAggOperatorBuilder.builder()
Expand Down Expand Up @@ -140,22 +140,24 @@ public WindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
return this;
}

public WindowOperatorBase<RowData, ?> build() {
public WindowAggOperator<RowData, ?> build() {
checkNotNull(assigner);
checkNotNull(inputSerializer);
checkNotNull(keySerializer);
checkNotNull(accSerializer);
checkNotNull(generatedAggregateFunction);

final WindowProcessor<?> windowProcessor;
if (assigner instanceof SliceAssigner) {
return buildSlicingWindowOperator();
windowProcessor = buildSlicingWindowProcessor();
} else {
return buildUnslicingWindowOperator();
windowProcessor = buildUnslicingWindowProcessor();
}
return new WindowAggOperator<>(windowProcessor);
}

@SuppressWarnings("unchecked")
protected WindowOperatorBase<RowData, ?> buildSlicingWindowOperator() {
private SlicingWindowProcessor<Long> buildSlicingWindowProcessor() {

boolean isGlobalAgg =
localGeneratedAggregateFunction != null && globalGeneratedAggregateFunction != null;
Expand Down Expand Up @@ -200,19 +202,16 @@ public WindowAggOperatorBuilder countStarIndex(int indexOfCountStart) {
throw new IllegalArgumentException(
"assigner must be instance of SliceUnsharedAssigner or SliceSharedAssigner.");
}
return new SlicingWindowOperator<>(windowProcessor);
return windowProcessor;
}

@SuppressWarnings("unchecked")
private WindowOperatorBase<RowData, ?> buildUnslicingWindowOperator() {
final UnsliceWindowAggProcessor windowProcessor =
new UnsliceWindowAggProcessor(
(GeneratedNamespaceAggsHandleFunction<TimeWindow>)
generatedAggregateFunction,
(UnsliceAssigner<TimeWindow>) assigner,
accSerializer,
indexOfCountStart,
shiftTimeZone);
return new UnslicingWindowOperator<>(windowProcessor);
private UnsliceWindowAggProcessor buildUnslicingWindowProcessor() {
return new UnsliceWindowAggProcessor(
(GeneratedNamespaceAggsHandleFunction<TimeWindow>) generatedAggregateFunction,
(UnsliceAssigner<TimeWindow>) assigner,
accSerializer,
indexOfCountStart,
shiftTimeZone);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.table.runtime.operators.deduplicate.window.combines.RowTimeDeduplicateRecordsCombiner;
import org.apache.flink.table.runtime.operators.deduplicate.window.processors.RowTimeWindowDeduplicateProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
Expand All @@ -35,8 +35,8 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The {@link RowTimeWindowDeduplicateOperatorBuilder} is used to build a {@link
* SlicingWindowOperator} for rowtime window deduplicate.
* The {@link RowTimeWindowDeduplicateOperatorBuilder} is used to build a {@link WindowAggOperator}
* for rowtime window deduplicate.
*
* <pre>
* RowTimeWindowDeduplicateOperatorBuilder.builder()
Expand Down Expand Up @@ -93,7 +93,7 @@ public RowTimeWindowDeduplicateOperatorBuilder windowEndIndex(int windowEndIndex
return this;
}

public SlicingWindowOperator<RowData, ?> build() {
public WindowAggOperator<RowData, ?> build() {
checkNotNull(inputSerializer);
checkNotNull(keySerializer);
checkArgument(
Expand All @@ -108,6 +108,6 @@ public RowTimeWindowDeduplicateOperatorBuilder windowEndIndex(int windowEndIndex
final SlicingWindowProcessor<Long> windowProcessor =
new RowTimeWindowDeduplicateProcessor(
inputSerializer, bufferFactory, windowEndIndex, shiftTimeZone);
return new SlicingWindowOperator<>(windowProcessor);
return new WindowAggOperator<>(windowProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.table.runtime.operators.rank.window.combines.TopNRecordsCombiner;
import org.apache.flink.table.runtime.operators.rank.window.processors.WindowRankProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
Expand All @@ -37,7 +37,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The {@link WindowRankOperatorBuilder} is used to build a {@link SlicingWindowOperator} for window
* The {@link WindowRankOperatorBuilder} is used to build a {@link WindowAggOperator} for window
* rank.
*
* <pre>
Expand Down Expand Up @@ -116,7 +116,7 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) {
return this;
}

public SlicingWindowOperator<RowData, ?> build() {
public WindowAggOperator<RowData, ?> build() {
checkNotNull(inputSerializer);
checkNotNull(keySerializer);
checkNotNull(sortKeySelector);
Expand Down Expand Up @@ -152,6 +152,6 @@ public WindowRankOperatorBuilder windowEndIndex(int windowEndIndex) {
outputRankNumber,
windowEndIndex,
shiftTimeZone);
return new SlicingWindowOperator<>(windowProcessor);
return new WindowAggOperator<>(windowProcessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;

import java.util.Collections;

Expand Down Expand Up @@ -85,11 +87,13 @@
*
* </pre>
*
* <p>Note: currently, {@link WindowOperatorBase} doesn't support early-fire and late-arrival. Thus
* <p>Note: currently, {@link WindowAggOperator} doesn't support early-fire and late-arrival. Thus
* late elements (elements belong to emitted windows) will be simply dropped.
*
* <p>See more in {@link SlicingWindowProcessor} and {@link UnslicingWindowProcessor}.
*/
@Internal
public abstract class WindowOperatorBase<K, W> extends TableStreamOperator<RowData>
public final class WindowAggOperator<K, W> extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>, Triggerable<K, W>, KeyContext {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -123,7 +127,7 @@ public abstract class WindowOperatorBase<K, W> extends TableStreamOperator<RowDa
protected transient Meter lateRecordsDroppedRate;
protected transient Gauge<Long> watermarkLatency;

public WindowOperatorBase(WindowProcessor<W> windowProcessor) {
public WindowAggOperator(WindowProcessor<W> windowProcessor) {
this.windowProcessor = windowProcessor;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* window assigner is translated from the new window TVF syntax, but the other is from the legacy
* GROUP WINDOW FUNCTION syntax. In the long future, {@link GroupWindowAssigner} will be dropped.
*
* <p>See more details in {@link WindowOperatorBase}.
* <p>See more details in {@link WindowAggOperator}.
*/
@Internal
public interface WindowAssigner extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;

import java.time.ZoneId;
import java.util.Collection;
Expand All @@ -31,8 +32,7 @@
/**
* The operator for aligned window table function.
*
* <p>See more details about aligned window and unaligned window in {@link
* org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
* <p>See more details about aligned window and unaligned window in {@link WindowAggOperator}.
*
* <p>Note: The operator only applies for Window TVF with row semantics (e.g TUMBLE/HOP/CUMULATE)
* instead of set semantics (e.g SESSION).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
Expand All @@ -67,8 +68,7 @@
/**
* The operator for unaligned window table function.
*
* <p>See more details about aligned window and unaligned window in {@link
* org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
* <p>See more details about aligned window and unaligned window in {@link WindowAggOperator}.
*
* <p>Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row
* semantics (e.g TUMBLE/HOP/CUMULATE).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* <p>Note: {@link SliceAssigner} servers as a base interface. Concrete assigners should implement
* interface {@link SliceSharedAssigner} or {@link SliceUnsharedAssigner}.
*
* @see SlicingWindowOperator for more definition of slice.
* @see SlicingWindowProcessor for more definition of slice.
*/
@Internal
public interface SliceAssigner extends WindowAssigner {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,37 @@
package org.apache.flink.table.runtime.operators.window.tvf.slicing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.runtime.operators.aggregate.window.processors.SliceSharedWindowAggProcessor;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAggOperator;
import org.apache.flink.table.runtime.operators.window.tvf.common.WindowProcessor;

/** A processor that processes elements for slicing windows. */
/**
* The {@link SlicingWindowProcessor} is an optimized processing for aligned windows which can apply
* the slicing optimization. The core idea of slicing optimization is to divide all elements from a
* data stream into a finite number of non-overlapping chunks (a.k.a. slices).
*
* <h3>Concept of Slice</h3>
*
* <p>Dividing a window of aligned windows into a finite number of non-overlapping chunks, where the
* chunks are slices. It has the following properties:
*
* <ul>
* <li>An element must only belong to a single slice.
* <li>Slices are non-overlapping, i.e. S_i and S_j should not have any shared elements if i != j.
* <li>A window is consist of a finite number of slices.
* </ul>
*
* <p>The {@link SlicingWindowProcessor} have different implementation for aggregate and topk or
* others.
*
* <p>The {@link SlicingWindowProcessor} usually leverages the {@link SliceAssigner} to assign
* slices and calculate based on the slices. See {@link SliceSharedWindowAggProcessor} as an
* example.
*
* <p>Note: since {@link SlicingWindowProcessor} leverages slicing optimization for aligned windows,
* therefore, it doesn't support unaligned windows, e.g. session window.
*
* <p>See more details in {@link WindowAggOperator}.
*/
@Internal
public interface SlicingWindowProcessor<W> extends WindowProcessor<W> {}
Loading

0 comments on commit 80f6e06

Please sign in to comment.