Skip to content

Commit

Permalink
[hotfix] peekNextBufferSubpartitionId shouldn't throw UnsupportedData…
Browse files Browse the repository at this point in the history
…TypeException
  • Loading branch information
reswqa authored and dawidwys committed Jan 19, 2024
1 parent b7c314b commit 8dd28a2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.activation.UnsupportedDataTypeException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -193,7 +192,7 @@ private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException {

@Override
protected int peekNextBufferSubpartitionIdInternal() throws IOException {
throw new UnsupportedDataTypeException();
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,28 @@

import org.apache.flink.runtime.io.network.buffer.Buffer;

import javax.activation.UnsupportedDataTypeException;

import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/** Test implementation for {@link NettyConnectionReader}. */
public class TestingNettyConnectionReader implements NettyConnectionReader {

private final Function<Integer, Buffer> readBufferFunction;

private TestingNettyConnectionReader(Function<Integer, Buffer> readBufferFunction) {
private final Supplier<Integer> peekNextBufferSubpartitionIdSupplier;

private TestingNettyConnectionReader(
Function<Integer, Buffer> readBufferFunction,
Supplier<Integer> peekNextBufferSubpartitionIdSupplier) {
this.readBufferFunction = readBufferFunction;
this.peekNextBufferSubpartitionIdSupplier = peekNextBufferSubpartitionIdSupplier;
}

@Override
public int peekNextBufferSubpartitionId() throws IOException {
throw new UnsupportedDataTypeException();
return peekNextBufferSubpartitionIdSupplier.get();
}

@Override
Expand All @@ -50,15 +54,24 @@ public static class Builder {

private Function<Integer, Buffer> readBufferFunction = segmentId -> null;

private Supplier<Integer> peekNextBufferSubpartitionIdSupplier = () -> -1;

public Builder() {}

public Builder setReadBufferFunction(Function<Integer, Buffer> readBufferFunction) {
this.readBufferFunction = readBufferFunction;
return this;
}

public Builder setPeekNextBufferSubpartitionIdSupplier(
Supplier<Integer> peekNextBufferSubpartitionIdSupplier) {
this.peekNextBufferSubpartitionIdSupplier = peekNextBufferSubpartitionIdSupplier;
return this;
}

public TestingNettyConnectionReader build() {
return new TestingNettyConnectionReader(readBufferFunction);
return new TestingNettyConnectionReader(
readBufferFunction, peekNextBufferSubpartitionIdSupplier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;

import javax.activation.UnsupportedDataTypeException;

import java.io.IOException;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/** Test implementation for {@link TierConsumerAgent}. */
Expand All @@ -42,15 +41,21 @@ public class TestingTierConsumerAgent implements TierConsumerAgent {

private final Runnable closeNotifier;

private final BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, Integer>
peekNextBufferSubpartitionIdFunction;

private TestingTierConsumerAgent(
Runnable startNotifier,
Supplier<Buffer> bufferSupplier,
Runnable availabilityNotifierRegistrationRunnable,
Runnable closeNotifier) {
Runnable closeNotifier,
BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, Integer>
peekNextBufferSubpartitionIdFunction) {
this.startNotifier = startNotifier;
this.bufferSupplier = bufferSupplier;
this.availabilityNotifierRegistrationRunnable = availabilityNotifierRegistrationRunnable;
this.closeNotifier = closeNotifier;
this.peekNextBufferSubpartitionIdFunction = peekNextBufferSubpartitionIdFunction;
}

@Override
Expand All @@ -62,7 +67,7 @@ public void start() {
public int peekNextBufferSubpartitionId(
TieredStoragePartitionId partitionId, ResultSubpartitionIndexSet indexSet)
throws IOException {
throw new UnsupportedDataTypeException();
return peekNextBufferSubpartitionIdFunction.apply(partitionId, indexSet);
}

@Override
Expand Down Expand Up @@ -95,6 +100,9 @@ public static class Builder {

private Runnable closeNotifier = () -> {};

private BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, Integer>
peekNextBufferSubpartitionIdFunction = (ignore1, ignore2) -> -1;

public Builder() {}

public Builder setStartNotifier(Runnable startNotifier) {
Expand All @@ -119,12 +127,20 @@ public Builder setCloseNotifier(Runnable closeNotifier) {
return this;
}

public Builder setPeekNextBufferSubpartitionIdFunction(
BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, Integer>
peekNextBufferSubpartitionIdFunction) {
this.peekNextBufferSubpartitionIdFunction = peekNextBufferSubpartitionIdFunction;
return this;
}

public TestingTierConsumerAgent build() {
return new TestingTierConsumerAgent(
startNotifier,
bufferSupplier,
availabilityNotifierRegistrationRunnable,
closeNotifier);
closeNotifier,
peekNextBufferSubpartitionIdFunction);
}
}
}

0 comments on commit 8dd28a2

Please sign in to comment.