Skip to content

Commit

Permalink
[FLINK-34063][runtime] Operator states need to be restored in an orde…
Browse files Browse the repository at this point in the history
…r they've been written for the compression to work properly on empty states.
  • Loading branch information
dmvk committed Jan 19, 2024
1 parent 6f76982 commit d536b5a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** Implementation of operator state restore operation. */
public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
Expand Down Expand Up @@ -171,34 +169,18 @@ public Void restore() throws Exception {
}
}

List<Map.Entry<String, OperatorStateHandle.StateMetaInfo>> entries =
new ArrayList<>(stateHandle.getStateNameToPartitionOffsets().entrySet());

if (backendSerializationProxy.isUsingStateCompression()) {
// sort state handles by offsets to avoid building SnappyFramedInputStream with
// EOF stream.
entries =
entries.stream()
.sorted(
Comparator.comparingLong(
entry -> {
OperatorStateHandle.StateMetaInfo
stateMetaInfo = entry.getValue();
long[] offsets = stateMetaInfo.getOffsets();
if (offsets == null
|| offsets.length == 0) {
return Long.MIN_VALUE;
} else {
return offsets[0];
}
}))
.collect(Collectors.toList());
}
// Restore states in the order in which they were written. Operator states come
// before Broadcast states.
final List<String> toRestore = new ArrayList<>();
restoredOperatorMetaInfoSnapshots.forEach(
stateName -> toRestore.add(stateName.getName()));
restoredBroadcastMetaInfoSnapshots.forEach(
stateName -> toRestore.add(stateName.getName()));

// Restore all the states
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets : entries) {
for (String stateName : toRestore) {

final String stateName = nameToOffsets.getKey();
final OperatorStateHandle.StateMetaInfo offsets =
stateHandle.getStateNameToPartitionOffsets().get(stateName);

PartitionableListState<?> listStateForName =
registeredOperatorStates.get(stateName);
Expand All @@ -222,10 +204,9 @@ public Void restore() throws Exception {
+ "corresponding meta info: "
+ stateName);
deserializeBroadcastStateValues(
broadcastStateForName, compressedIn, nameToOffsets.getValue());
broadcastStateForName, compressedIn, offsets);
} else {
deserializeOperatorStateValues(
listStateForName, compressedIn, nameToOffsets.getValue());
deserializeOperatorStateValues(listStateForName, compressedIn, offsets);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ private static OperatorStateHandle createOperatorStateHandle(
throws Exception {
try (OperatorStateBackend operatorStateBackend =
operatorStateBackendFactory.apply(Collections.emptyList())) {
final CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
for (String stateName : listStates.keySet()) {
final ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>(stateName, String.class);
Expand All @@ -84,7 +83,7 @@ private static OperatorStateHandle createOperatorStateHandle(
.snapshot(
1,
1,
streamFactory,
new MemCheckpointStreamFactory(4096),
CheckpointOptions.forCheckpointWithDefaultLocation())
.get();
return Objects.requireNonNull(result.getJobManagerOwnedSnapshot());
Expand Down Expand Up @@ -133,7 +132,7 @@ private static void verifyOperatorStateHandle(
void testRestoringMixedOperatorState(boolean snapshotCompressionEnabled) throws Exception {
final ExecutionConfig cfg = new ExecutionConfig();
cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
final ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
operatorStateBackendFactory =
createOperatorStateBackendFactory(
cfg, new CloseableRegistry(), this.getClass().getClassLoader());
Expand Down Expand Up @@ -162,7 +161,7 @@ void testRestoreAndRescalePartitionedOperatorState(boolean snapshotCompressionEn
throws Exception {
final ExecutionConfig cfg = new ExecutionConfig();
cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
final ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
operatorStateBackendFactory =
createOperatorStateBackendFactory(
cfg, new CloseableRegistry(), this.getClass().getClassLoader());
Expand Down Expand Up @@ -199,4 +198,29 @@ void testRestoreAndRescalePartitionedOperatorState(boolean snapshotCompressionEn
mergedListStates,
Collections.emptyMap());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testEmptyPartitionedOperatorState(boolean snapshotCompressionEnabled) throws Exception {
final ExecutionConfig cfg = new ExecutionConfig();
cfg.setUseSnapshotCompression(snapshotCompressionEnabled);
final ThrowingFunction<Collection<OperatorStateHandle>, OperatorStateBackend>
operatorStateBackendFactory =
createOperatorStateBackendFactory(
cfg, new CloseableRegistry(), this.getClass().getClassLoader());

final Map<String, List<String>> listStates = new HashMap<>();
listStates.put("bufferState", Collections.emptyList());
listStates.put("offsetState", Collections.singletonList("foo"));

final OperatorStateHandle stateHandle =
createOperatorStateHandle(
operatorStateBackendFactory, listStates, Collections.emptyMap());

verifyOperatorStateHandle(
operatorStateBackendFactory,
Collections.singletonList(stateHandle),
listStates,
Collections.emptyMap());
}
}

0 comments on commit d536b5a

Please sign in to comment.