Skip to content

Commit

Permalink
scan hints (#2351)
Browse files Browse the repository at this point in the history
* Updated connection factory to be able to configure scan hints
* Updated to allow query logics to selectively override base client configuration
  • Loading branch information
ivakegg authored Apr 22, 2024
1 parent bb8d353 commit 2441af7
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
<version.maven-install-plugin>2.5.2</version.maven-install-plugin>
<version.metrics-cdi>1.6.0</version.metrics-cdi>
<version.microservice.accumulo-api>3.0.0</version.microservice.accumulo-api>
<version.microservice.accumulo-utils>3.0.1</version.microservice.accumulo-utils>
<version.microservice.accumulo-utils>3.0.2</version.microservice.accumulo-utils>
<version.microservice.audit-api>3.0.0</version.microservice.audit-api>
<version.microservice.authorization-api>3.0.0</version.microservice.authorization-api>
<version.microservice.base-rest-responses>3.0.0</version.microservice.base-rest-responses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ public AccumuloClient getClient(final String cpn, final Priority priority, final
AccumuloClient mock = new InMemoryAccumuloClient(pool.getFactory().getUsername(), cache.getInstance());
mock.securityOperations().changeLocalUserPassword(pool.getFactory().getUsername(), new PasswordToken(pool.getFactory().getPassword()));
WrappedAccumuloClient wrappedAccumuloClient = new WrappedAccumuloClient(c, mock);
if (connectionPoolsConfiguration.getClientConfiguration(poolName) != null) {
wrappedAccumuloClient.setClientConfig(connectionPoolsConfiguration.getClientConfiguration(poolName).getConfiguration());
}
String classLoaderContext = System.getProperty("dw.accumulo.classLoader.context");
if (classLoaderContext != null) {
wrappedAccumuloClient.setScannerClassLoaderContext(classLoaderContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package datawave.webservice.common.connection.config;

import java.util.Map;

import org.apache.accumulo.core.client.ScannerBase;
import org.apache.commons.lang3.StringUtils;
import org.apache.deltaspike.core.api.config.ConfigResolver;
import org.apache.log4j.Logger;

import datawave.webservice.common.connection.AccumuloClientConfiguration;

/**
* The configuration for the connection pool clients of the form derived from properties as follows:
*
* dw.{pool}.client.{tableName}.consistency = IMMEDIATE|EVENTUAL dw.{pool}.client.{tableName}.{hintName} = {hintValue}
*
*/
public class ConnectionPoolClientConfiguration {

private static final Logger log = Logger.getLogger(ConnectionPoolConfiguration.class);
private AccumuloClientConfiguration config = new AccumuloClientConfiguration();

public ConnectionPoolClientConfiguration(String poolName) {
String prefix = "dw." + poolName + ".client";
for (Map.Entry<String,String> property : ConfigResolver.getAllProperties().entrySet()) {
if (property.getKey().startsWith(prefix)) {
String[] tableAndHint = StringUtils.split(property.getKey().substring(prefix.length()), '.');
if (tableAndHint.length == 2) {
if (tableAndHint[1].equals("consistency")) {
config.setConsistency(tableAndHint[0], ScannerBase.ConsistencyLevel.valueOf(property.getValue()));
} else {
config.addHint(tableAndHint[0], tableAndHint[1], property.getValue());
}
} else {
log.error("Invalid client hint configuration property " + property.getKey());
}
}
}
}

public AccumuloClientConfiguration getConfiguration() {
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public class ConnectionPoolsConfiguration {
private List<String> poolNames;

private Map<String,ConnectionPoolConfiguration> pools = new HashMap<>();
private Map<String,ConnectionPoolClientConfiguration> configs = new HashMap<>();

@PostConstruct
private void initializePools() {
for (String poolName : poolNames) {
pools.put(poolName, new ConnectionPoolConfiguration(poolName.toLowerCase()));
configs.put(poolName, new ConnectionPoolClientConfiguration(poolName.toLowerCase()));
}
}

Expand All @@ -38,4 +40,15 @@ public Map<String,ConnectionPoolConfiguration> getPools() {
return Collections.unmodifiableMap(pools);
}

public ConnectionPoolConfiguration getConfiguration(String pool) {
return pools.get(pool);
}

public Map<String,ConnectionPoolClientConfiguration> getClientConfiguration() {
return Collections.unmodifiableMap(configs);
}

public ConnectionPoolClientConfiguration getClientConfiguration(String pool) {
return configs.get(pool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datawave.marking.MarkingFunctions;
import datawave.security.authorization.UserOperations;
import datawave.webservice.common.audit.Auditor.AuditType;
import datawave.webservice.common.connection.AccumuloClientConfiguration;
import datawave.webservice.query.Query;
import datawave.webservice.query.configuration.GenericQueryConfiguration;
import datawave.webservice.query.exception.QueryException;
Expand Down Expand Up @@ -47,6 +48,7 @@ public abstract class BaseQueryLogic<T> implements QueryLogic<T> {
protected ResponseObjectFactory responseObjectFactory;
protected SelectorExtractor selectorExtractor;
protected ResponseEnricherBuilder responseEnricherBuilder = null;
protected AccumuloClientConfiguration clientConfig = null;

public static final String BYPASS_ACCUMULO = "rfile.debug";

Expand Down Expand Up @@ -413,4 +415,14 @@ public UserOperations getUserOperations() {
// null implies that the local user operations/principal is to be used for auths.
return null;
}

@Override
public void setClientConfig(AccumuloClientConfiguration clientConfig) {
this.clientConfig = clientConfig;
}

@Override
public AccumuloClientConfiguration getClientConfig() {
return clientConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datawave.marking.MarkingFunctions;
import datawave.security.authorization.UserOperations;
import datawave.webservice.common.audit.Auditor;
import datawave.webservice.common.connection.AccumuloClientConfiguration;
import datawave.webservice.common.connection.AccumuloConnectionFactory;
import datawave.webservice.query.Query;
import datawave.webservice.query.configuration.GenericQueryConfiguration;
Expand Down Expand Up @@ -361,4 +362,14 @@ public UserOperations getUserOperations() {
public void preInitialize(Query settings, Set<Authorizations> queryAuths) {
delegate.preInitialize(settings, queryAuths);
}

@Override
public void setClientConfig(AccumuloClientConfiguration config) {
delegate.setClientConfig(config);
}

@Override
public AccumuloClientConfiguration getClientConfig() {
return delegate.getClientConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datawave.security.authorization.UserOperations;
import datawave.validation.ParameterValidator;
import datawave.webservice.common.audit.Auditor.AuditType;
import datawave.webservice.common.connection.AccumuloClientConfiguration;
import datawave.webservice.common.connection.AccumuloConnectionFactory;
import datawave.webservice.query.Query;
import datawave.webservice.query.QueryImpl;
Expand Down Expand Up @@ -446,4 +447,18 @@ default long getResultLimit(Query settings) {
default void preInitialize(Query settings, Set<Authorizations> userAuthorizations) {
// noop
}

/**
* Set a client configuration for scanner hints and consistency.
*
* @param config
*/
void setClientConfig(AccumuloClientConfiguration config);

/**
* Get the client configuration
*
* @return client configuration
*/
AccumuloClientConfiguration getClientConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import datawave.security.authorization.remote.RemoteUserOperationsImpl;
import datawave.security.util.WSAuthorizationsUtil;
import datawave.webservice.common.connection.AccumuloConnectionFactory;
import datawave.webservice.common.connection.WrappedAccumuloClient;
import datawave.webservice.query.Query;
import datawave.webservice.query.QueryImpl;
import datawave.webservice.query.cache.AbstractRunningQuery;
Expand Down Expand Up @@ -195,6 +196,9 @@ public void setClient(AccumuloClient client) throws Exception {
addNDC();
applyPrediction(null);
this.client = client;
if (this.client instanceof WrappedAccumuloClient && this.logic.getClientConfig() != null) {
((WrappedAccumuloClient) this.client).updateClientConfig(this.logic.getClientConfig());
}
long start = System.currentTimeMillis();
GenericQueryConfiguration configuration = this.logic.initialize(this.client, this.settings, this.calculatedAuths);
this.lastPageNumber = 0;
Expand Down

0 comments on commit 2441af7

Please sign in to comment.