Skip to content

Commit

Permalink
Merge pull request #3171 from ingef/hotifx/proper-timeout-for-futures
Browse files Browse the repository at this point in the history
FIX proper timeout for futures
  • Loading branch information
awildturtok authored Sep 6, 2023
2 parents ff1d8dd + 2286fe8 commit 70a1065
Showing 1 changed file with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -47,7 +50,6 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -293,6 +295,7 @@ private static void dumpToFile(byte[] gzippedObj, @NonNull String keyOfDump, Exc
}

if (!dumpfile.getParentFile().exists() && !dumpfile.getParentFile().mkdirs()) {
//TODO this seems to occur sometimes, is it maybe just a race condition?
throw new IllegalStateException("Could not create `%s`.".formatted(dumpfile.getParentFile()));
}

Expand Down Expand Up @@ -353,7 +356,6 @@ private static byte[] debugUnGzip(byte[] bytes) throws IOException {
*
* @implNote This method is concurrent!
*/
@SneakyThrows
@Override
public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
final IterationStatistic result = new IterationStatistic();
Expand All @@ -368,11 +370,24 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
final ListenableFuture<List<ByteIterable>> allJobs = Futures.allAsList(jobs);


List<ByteIterable> maybeFailed;
List<ByteIterable> maybeFailed = Collections.emptyList();

do {
try {
maybeFailed = allJobs.get(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.interrupted();
log.debug("Thread was interrupted.");
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
catch (TimeoutException e) {
log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count());
}
} while (!allJobs.isDone());

while ((maybeFailed = allJobs.get(30, TimeUnit.SECONDS)) == null) {
log.debug("Still waiting for {} jobs.", jobs.stream().filter(Predicate.not(Future::isDone)).count());
}

final List<ByteIterable> unreadables = maybeFailed.stream().filter(Objects::nonNull).toList();

Expand All @@ -398,24 +413,28 @@ public IterationStatistic forEach(StoreEntryConsumer<KEY, VALUE> consumer) {
}

private ByteIterable handle(StoreEntryConsumer<KEY, VALUE> consumer, IterationStatistic result, ByteIterable keyRaw, ByteIterable valueRaw) {
result.incrTotalProcessed();

// Try to read the key first
final KEY
key =
getDeserializedAndDumpFailed(keyRaw, SerializingStore.this::readKey, () -> new String(keyRaw.getBytesUnsafe()), valueRaw, "Could not parse key [{}]");
if (key == null) {
result.incrFailedKeys();
return keyRaw;
}
final KEY key;
final VALUE value;

// Try to read the value
final VALUE
value =
getDeserializedAndDumpFailed(valueRaw, SerializingStore.this::readValue, key::toString, valueRaw, "Could not parse value for key [{}]");
try {
result.incrTotalProcessed();

if (value == null) {
result.incrFailedValues();
// Try to read the key first
key = getDeserializedAndDumpFailed(keyRaw, SerializingStore.this::readKey, () -> new String(keyRaw.getBytesUnsafe()), valueRaw, "Could not parse key [{}]");
if (key == null) {
result.incrFailedKeys();
return keyRaw;
}

// Try to read the value
value = getDeserializedAndDumpFailed(valueRaw, SerializingStore.this::readValue, key::toString, valueRaw, "Could not parse value for key [{}]");

if (value == null) {
result.incrFailedValues();
return keyRaw;
}
}catch(Exception e){
log.error("Failed processing key/value", e);
return keyRaw;
}

Expand Down

0 comments on commit 70a1065

Please sign in to comment.