Skip to content

Commit

Permalink
feat: support for ad-hoc extension of the client timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
novoj committed Apr 12, 2024
1 parent 4205ba4 commit a60dc06
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -83,9 +84,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -143,6 +146,10 @@ public class EvitaClient implements EvitaContract {
* and closes them along with their gRPC channels.
*/
private final Runnable terminationCallback;
/**
* Client call timeout.
*/
private final ThreadLocal<LinkedList<Timeout>> timeout;

public EvitaClient(@Nonnull EvitaClientConfiguration configuration) {
this(configuration, null);
Expand Down Expand Up @@ -192,6 +199,11 @@ public EvitaClient(
Thread.currentThread().interrupt();
}
};
this.timeout = ThreadLocal.withInitial(() -> {
final LinkedList<Timeout> timeouts = new LinkedList<>();
timeouts.add(new Timeout(configuration.timeout(), configuration.timeoutUnit()));
return timeouts;
});
this.active.set(true);

try {
Expand Down Expand Up @@ -281,45 +293,49 @@ public EvitaClientSession createSession(@Nonnull SessionTraits traits) {

if (traits.isReadWrite()) {
if (traits.isBinary()) {
grpcResponse = executeWithEvitaService(evitaService ->
evitaService.createBinaryReadWriteSession(
grpcResponse = executeWithEvitaService(evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.createBinaryReadWriteSession(
GrpcEvitaSessionRequest.newBuilder()
.setCatalogName(traits.catalogName())
.setCommitBehavior(EvitaEnumConverter.toGrpcCommitBehavior(traits.commitBehaviour()))
.setDryRun(traits.isDryRun())
.build()
).get(configuration.timeout(), configuration.timeoutUnit())
);
).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
});
} else {
grpcResponse = executeWithEvitaService(evitaService ->
evitaService.createReadWriteSession(
grpcResponse = executeWithEvitaService(evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.createReadWriteSession(
GrpcEvitaSessionRequest.newBuilder()
.setCatalogName(traits.catalogName())
.setCommitBehavior(EvitaEnumConverter.toGrpcCommitBehavior(traits.commitBehaviour()))
.setDryRun(traits.isDryRun())
.build()
).get(configuration.timeout(), configuration.timeoutUnit())
);
).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
});
}
} else {
if (traits.isBinary()) {
grpcResponse = executeWithEvitaService(evitaService ->
evitaService.createBinaryReadOnlySession(
grpcResponse = executeWithEvitaService(evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.createBinaryReadOnlySession(
GrpcEvitaSessionRequest.newBuilder()
.setCatalogName(traits.catalogName())
.setDryRun(traits.isDryRun())
.build()
).get(configuration.timeout(), configuration.timeoutUnit())
);
).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
});
} else {
grpcResponse = executeWithEvitaService(evitaService ->
evitaService.createReadOnlySession(
grpcResponse = executeWithEvitaService(evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.createReadOnlySession(
GrpcEvitaSessionRequest.newBuilder()
.setCatalogName(traits.catalogName())
.setDryRun(traits.isDryRun())
.build()
).get(configuration.timeout(), configuration.timeoutUnit())
);
).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
});
}
}
final EvitaClientSession evitaClientSession = new EvitaClientSession(
Expand All @@ -338,7 +354,8 @@ public EvitaClientSession createSession(@Nonnull SessionTraits traits) {
this.activeSessions.remove(evitaSession.getId());
ofNullable(traits.onTermination())
.ifPresent(it -> it.onTermination(evitaSession));
}
},
this.timeout.get().peek()
);

this.activeSessions.put(evitaClientSession.getId(), evitaClientSession);
Expand Down Expand Up @@ -369,8 +386,11 @@ public void terminateSession(@Nonnull EvitaSessionContract session) {
public Set<String> getCatalogNames() {
assertActive();
final GrpcCatalogNamesResponse grpcResponse = executeWithEvitaService(
evitaService -> evitaService.getCatalogNames(Empty.newBuilder().build())
.get(configuration.timeout(), configuration.timeoutUnit())
evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.getCatalogNames(Empty.newBuilder().build())
.get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
}
);
return new LinkedHashSet<>(
grpcResponse.getCatalogNamesList()
Expand Down Expand Up @@ -400,8 +420,11 @@ public void renameCatalog(@Nonnull String catalogName, @Nonnull String newCatalo
.setNewCatalogName(newCatalogName)
.build();
final GrpcRenameCatalogResponse grpcResponse = executeWithEvitaService(
evitaService -> evitaService.renameCatalog(request)
.get(configuration.timeout(), configuration.timeoutUnit())
evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.renameCatalog(request)
.get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
}
);
final boolean success = grpcResponse.getSuccess();
if (success) {
Expand All @@ -417,9 +440,13 @@ public void replaceCatalog(@Nonnull String catalogNameToBeReplacedWith, @Nonnull
.setCatalogNameToBeReplacedWith(catalogNameToBeReplacedWith)
.setCatalogNameToBeReplaced(catalogNameToBeReplaced)
.build();

final GrpcReplaceCatalogResponse grpcResponse = executeWithEvitaService(
evitaService -> evitaService.replaceCatalog(request)
.get(configuration.timeout(), configuration.timeoutUnit())
evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.replaceCatalog(request)
.get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
}
);
final boolean success = grpcResponse.getSuccess();
if (success) {
Expand All @@ -435,9 +462,13 @@ public boolean deleteCatalogIfExists(@Nonnull String catalogName) {
final GrpcDeleteCatalogIfExistsRequest request = GrpcDeleteCatalogIfExistsRequest.newBuilder()
.setCatalogName(catalogName)
.build();

final GrpcDeleteCatalogIfExistsResponse grpcResponse = executeWithEvitaService(
evitaService -> evitaService.deleteCatalogIfExists(request)
.get(configuration.timeout(), configuration.timeoutUnit())
evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.deleteCatalogIfExists(request)
.get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
}
);
final boolean success = grpcResponse.getSuccess();
if (success) {
Expand All @@ -457,9 +488,13 @@ public void update(@Nonnull TopLevelCatalogSchemaMutation... catalogMutations) {
final GrpcUpdateEvitaRequest request = GrpcUpdateEvitaRequest.newBuilder()
.addAllSchemaMutations(grpcSchemaMutations)
.build();

executeWithEvitaService(
evitaService -> evitaService.update(request)
.get(configuration.timeout(), configuration.timeoutUnit())
evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
return evitaService.update(request)
.get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
}
);
}

Expand Down Expand Up @@ -584,8 +619,9 @@ public SystemStatus getSystemStatus() {

return executeWithEvitaService(
evitaService -> {
final Timeout timeoutToUse = this.timeout.get().peek();
final GrpcEvitaServerStatusResponse response = evitaService.serverStatus(Empty.newBuilder().build())
.get(configuration.timeout(), configuration.timeoutUnit());
.get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit());
return new SystemStatus(
response.getVersion(),
EvitaDataTypesConverter.toOffsetDateTime(response.getStartedAt()),
Expand All @@ -608,6 +644,42 @@ public void close() {
}
}

/**
* Method executes lambda using specified timeout for the call ignoring the defaults specified
* in {@link EvitaClientConfiguration#timeout()}.
*
* @param lambda logic to be executed
* @param timeout timeout value
* @param unit time unit of the timeout
*/
public void executeWithExtendedTimeout(@Nonnull Runnable lambda, long timeout, @Nonnull TimeUnit unit) {
try {
this.timeout.get().push(new Timeout(timeout, unit));
lambda.run();
} finally {
this.timeout.get().pop();
}
}

/**
* Method executes lambda using specified timeout for the call ignoring the defaults specified
* in {@link EvitaClientConfiguration#timeout()}.
*
* @param lambda logic to be executed
* @param timeout timeout value
* @param unit time unit of the timeout
* @return result of the lambda
* @param <T> type of the result
*/
public <T> T executeWithExtendedTimeout(@Nonnull Supplier<T> lambda, long timeout, @Nonnull TimeUnit unit) {
try {
this.timeout.get().push(new Timeout(timeout, unit));
return lambda.get();
} finally {
this.timeout.get().pop();
}
}

/**
* Verifies this instance is still active.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,6 @@ public class EvitaClientSession implements EvitaSessionContract {
* Evita instance this session is connected to.
*/
@Getter private final EvitaClient evita;
/**
* Configuration of the evitaDB client.
*/
private final EvitaClientConfiguration configuration;
/**
* Reflection lookup is used to speed up reflection operation by memoizing the results for examined classes.
*/
Expand Down Expand Up @@ -268,10 +264,10 @@ public EvitaClientSession(
@Nonnull UUID sessionId,
@Nonnull CommitBehavior commitBehaviour,
@Nonnull SessionTraits sessionTraits,
@Nonnull Consumer<EvitaClientSession> onTerminationCallback
) {
@Nonnull Consumer<EvitaClientSession> onTerminationCallback,
@Nonnull Timeout timeout
) {
this.evita = evita;
this.configuration = evita.getConfiguration();
this.reflectionLookup = evita.getReflectionLookup();
this.proxyFactory = schemaCache.getProxyFactory();
this.schemaCache = schemaCache;
Expand All @@ -282,7 +278,7 @@ public EvitaClientSession(
this.sessionId = sessionId;
this.sessionTraits = sessionTraits;
this.onTerminationCallback = onTerminationCallback;
this.callTimeout.add(new Timeout(configuration.timeout(), configuration.timeoutUnit()));
this.callTimeout.add(timeout);
}

@Nonnull
Expand Down Expand Up @@ -1520,8 +1516,8 @@ private <S extends Serializable> List<S> queryListInternal(
private <T> T executeWithBlockingEvitaSessionService(
@Nonnull AsyncCallFunction<EvitaSessionServiceFutureStub, ListenableFuture<T>> lambda
) {
final Timeout timeout = callTimeout.peek();
try {
final Timeout timeout = callTimeout.getLast();
return executeWithEvitaSessionService(
lambda,
EvitaSessionServiceGrpc::newFutureStub
Expand All @@ -1541,7 +1537,7 @@ private <T> T executeWithBlockingEvitaSessionService(
throw new EvitaClientServerCallException("Server call interrupted.", e);
} catch (TimeoutException e) {
throw new EvitaClientTimedOutException(
configuration.timeout(), configuration.timeoutUnit()
timeout.timeout(), timeout.timeoutUnit()
);
}
}
Expand Down

0 comments on commit a60dc06

Please sign in to comment.