Skip to content

Commit

Permalink
[FLINK-32335] Fix the ConcurrentModificationException in the unittest
Browse files Browse the repository at this point in the history
This closes apache#247.
  • Loading branch information
jiangxin369 authored Jul 14, 2023
1 parent fbd2a52 commit 892fb47
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,14 @@ private List<AbstractEvent> parseInputChannelEvents(InputChannel inputChannel)
PrioritizedDeque<BufferConsumerWithPartialRecordLength> queue =
ReflectionUtils.getFieldValue(
pipelinedSubpartition, PipelinedSubpartition.class, "buffers");
for (BufferConsumerWithPartialRecordLength bufferConsumer : queue) {
if (!bufferConsumer.getBufferConsumer().isBuffer()) {
events.add(
EventSerializer.fromBuffer(
bufferConsumer.getBufferConsumer().copy().build(),
getClass().getClassLoader()));
synchronized (queue) {
for (BufferConsumerWithPartialRecordLength bufferConsumer : queue) {
if (!bufferConsumer.getBufferConsumer().isBuffer()) {
events.add(
EventSerializer.fromBuffer(
bufferConsumer.getBufferConsumer().copy().build(),
getClass().getClassLoader()));
}
}
}
} else {
Expand Down

0 comments on commit 892fb47

Please sign in to comment.