From 8dd28a2e8bf10ec278ff90c99563468e294a135a Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Fri, 19 Jan 2024 13:32:10 +0800 Subject: [PATCH] [hotfix] peekNextBufferSubpartitionId shouldn't throw UnsupportedDataTypeException --- .../consumer/RecoveredInputChannel.java | 3 +-- .../netty/TestingNettyConnectionReader.java | 23 ++++++++++++---- .../netty/TestingTierConsumerAgent.java | 26 +++++++++++++++---- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java index 06ae4258da4fd..1f41a099931e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.activation.UnsupportedDataTypeException; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -193,7 +192,7 @@ private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException { @Override protected int peekNextBufferSubpartitionIdInternal() throws IOException { - throw new UnsupportedDataTypeException(); + throw new UnsupportedOperationException(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java index 4b9358eb31974..5fc1855ca0460 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingNettyConnectionReader.java @@ -20,24 +20,28 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; -import javax.activation.UnsupportedDataTypeException; - import java.io.IOException; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; /** Test implementation for {@link NettyConnectionReader}. */ public class TestingNettyConnectionReader implements NettyConnectionReader { private final Function readBufferFunction; - private TestingNettyConnectionReader(Function readBufferFunction) { + private final Supplier peekNextBufferSubpartitionIdSupplier; + + private TestingNettyConnectionReader( + Function readBufferFunction, + Supplier peekNextBufferSubpartitionIdSupplier) { this.readBufferFunction = readBufferFunction; + this.peekNextBufferSubpartitionIdSupplier = peekNextBufferSubpartitionIdSupplier; } @Override public int peekNextBufferSubpartitionId() throws IOException { - throw new UnsupportedDataTypeException(); + return peekNextBufferSubpartitionIdSupplier.get(); } @Override @@ -50,6 +54,8 @@ public static class Builder { private Function readBufferFunction = segmentId -> null; + private Supplier peekNextBufferSubpartitionIdSupplier = () -> -1; + public Builder() {} public Builder setReadBufferFunction(Function readBufferFunction) { @@ -57,8 +63,15 @@ public Builder setReadBufferFunction(Function readBufferFunctio return this; } + public Builder setPeekNextBufferSubpartitionIdSupplier( + Supplier peekNextBufferSubpartitionIdSupplier) { + this.peekNextBufferSubpartitionIdSupplier = peekNextBufferSubpartitionIdSupplier; + return this; + } + public TestingNettyConnectionReader build() { - return new TestingNettyConnectionReader(readBufferFunction); + return new TestingNettyConnectionReader( + readBufferFunction, peekNextBufferSubpartitionIdSupplier); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java index cecc81f1baef8..f0f2f69168c78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java @@ -25,10 +25,9 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; -import javax.activation.UnsupportedDataTypeException; - import java.io.IOException; import java.util.Optional; +import java.util.function.BiFunction; import java.util.function.Supplier; /** Test implementation for {@link TierConsumerAgent}. */ @@ -42,15 +41,21 @@ public class TestingTierConsumerAgent implements TierConsumerAgent { private final Runnable closeNotifier; + private final BiFunction + peekNextBufferSubpartitionIdFunction; + private TestingTierConsumerAgent( Runnable startNotifier, Supplier bufferSupplier, Runnable availabilityNotifierRegistrationRunnable, - Runnable closeNotifier) { + Runnable closeNotifier, + BiFunction + peekNextBufferSubpartitionIdFunction) { this.startNotifier = startNotifier; this.bufferSupplier = bufferSupplier; this.availabilityNotifierRegistrationRunnable = availabilityNotifierRegistrationRunnable; this.closeNotifier = closeNotifier; + this.peekNextBufferSubpartitionIdFunction = peekNextBufferSubpartitionIdFunction; } @Override @@ -62,7 +67,7 @@ public void start() { public int peekNextBufferSubpartitionId( TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet indexSet) throws IOException { - throw new UnsupportedDataTypeException(); + return peekNextBufferSubpartitionIdFunction.apply(partitionId, indexSet); } @Override @@ -95,6 +100,9 @@ public static class Builder { private Runnable closeNotifier = () -> {}; + private BiFunction + peekNextBufferSubpartitionIdFunction = (ignore1, ignore2) -> -1; + public Builder() {} public Builder setStartNotifier(Runnable startNotifier) { @@ -119,12 +127,20 @@ public Builder setCloseNotifier(Runnable closeNotifier) { return this; } + public Builder setPeekNextBufferSubpartitionIdFunction( + BiFunction + peekNextBufferSubpartitionIdFunction) { + this.peekNextBufferSubpartitionIdFunction = peekNextBufferSubpartitionIdFunction; + return this; + } + public TestingTierConsumerAgent build() { return new TestingTierConsumerAgent( startNotifier, bufferSupplier, availabilityNotifierRegistrationRunnable, - closeNotifier); + closeNotifier, + peekNextBufferSubpartitionIdFunction); } } }