Skip to content

Commit

Permalink
Add support for setting scan consistency level on a query logic (#2386)
Browse files Browse the repository at this point in the history
* Add support for setting scan consistency level on a query logic

Additional updates

Update a few instances that still relied on old constructors

Only update scanner factory hints and consistency if set on logic config

* merge request feedback

* clarify names are per-table
  • Loading branch information
apmoriarty authored Jun 12, 2024
1 parent 3ffbad9 commit 0f34220
Show file tree
Hide file tree
Showing 20 changed files with 614 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.security.Authorizations;

import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -70,6 +73,10 @@ public class GenericQueryConfiguration implements Serializable {
// Whether or not this query emits every result or performs some kind of result reduction
protected boolean reduceResults = false;

// either IMMEDIATE or EVENTUAL
private Map<String,ScannerBase.ConsistencyLevel> tableConsistencyLevels = new HashMap<>();
private Map<String,Map<String,String>> tableHints = new HashMap<>();

/**
* Empty default constructor
*/
Expand Down Expand Up @@ -103,6 +110,8 @@ public GenericQueryConfiguration(GenericQueryConfiguration genericConfig) {
this.setQueryString(genericConfig.getQueryString());
this.setTableName(genericConfig.getTableName());
this.setReduceResults(genericConfig.isReduceResults());
this.setTableConsistencyLevels(genericConfig.getTableConsistencyLevels());
this.setTableHints(genericConfig.getTableHints());
}

public Collection<QueryData> getQueries() {
Expand Down Expand Up @@ -269,6 +278,22 @@ public void setAccumuloPassword(String password) {
this.accumuloPassword = EnvProvider.resolve(password);
}

public Map<String,ScannerBase.ConsistencyLevel> getTableConsistencyLevels() {
return tableConsistencyLevels;
}

public void setTableConsistencyLevels(Map<String,ScannerBase.ConsistencyLevel> tableConsistencyLevels) {
this.tableConsistencyLevels = tableConsistencyLevels;
}

public Map<String,Map<String,String>> getTableHints() {
return tableHints;
}

public void setTableHints(Map<String,Map<String,String>> tableHints) {
this.tableHints = tableHints;
}

/**
* Checks for non-null, sane values for the configured values
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,20 @@ public void setClientConfig(AccumuloClientConfiguration clientConfig) {
public AccumuloClientConfiguration getClientConfig() {
return clientConfig;
}

public Map<String,ScannerBase.ConsistencyLevel> getTableConsistencyLevels() {
return getConfig().getTableConsistencyLevels();
}

public void setTableConsistencyLevels(Map<String,ScannerBase.ConsistencyLevel> consistencyLevels) {
getConfig().setTableConsistencyLevels(consistencyLevels);
}

public Map<String,Map<String,String>> getTableHints() {
return getConfig().getTableHints();
}

public void setTableHints(Map<String,Map<String,String>> hints) {
getConfig().setTableHints(hints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.clientImpl.ScannerOptions;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.data.Key;
Expand Down Expand Up @@ -524,7 +525,10 @@ protected void submitScan(Scan scan, boolean increment) {
*/
public BatchScannerSession setOptions(SessionOptions options) {
return this;
}

public void setConsistencyLevel(ScannerBase.ConsistencyLevel consistencyLevel) {
this.options.setConsistencyLevel(consistencyLevel);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -33,10 +35,12 @@

public class ScannerFactory {

private static final int DEFAULT_MAX_THREADS = 100;
protected int maxQueue = 1000;

protected final Set<ScannerBase> instances = Collections.synchronizedSet(new HashSet<>());
protected final Set<ScannerSession> sessionInstances = Collections.synchronizedSet(new HashSet<>());
protected final AccumuloClient cxn;
protected AccumuloClient client;
// using an AtomicBoolean to give us a separate monitor for synchronization
protected final AtomicBoolean open = new AtomicBoolean(true);

Expand All @@ -45,43 +49,97 @@ public class ScannerFactory {
protected ResourceQueue scanQueue = null;
protected ShardQueryConfiguration config = null;

private static final Logger log = Logger.getLogger(ScannerFactory.class);

public ScannerFactory(GenericQueryConfiguration queryConfiguration) {
protected Map<String,ScannerBase.ConsistencyLevel> consistencyByTable = new HashMap<>();
protected Map<String,Map<String,String>> hintsByTable = new HashMap<>();

this.cxn = queryConfiguration.getClient();
log.debug("Created scanner factory " + System.identityHashCode(this) + " is wrapped ? " + (cxn instanceof WrappedConnector));
private static final Logger log = Logger.getLogger(ScannerFactory.class);

if (queryConfiguration instanceof ShardQueryConfiguration) {
this.config = ((ShardQueryConfiguration) queryConfiguration);
this.maxQueue = this.config.getMaxScannerBatchSize();
this.settings = this.config.getQuery();
this.accrueStats = this.config.getAccrueStats();
try {
scanQueue = new ResourceQueue(this.config.getNumQueryThreads(), this.cxn);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Preferred constructor, builds scanner factory from configs
*
* @param config
* a {@link GenericQueryConfiguration}
*/
public ScannerFactory(GenericQueryConfiguration config) {
updateConfigs(config);
}

/**
* Constructor that accepts a prebuilt AccumuloClient
*
* @param client
* an {@link AccumuloClient}
*/
public ScannerFactory(AccumuloClient client) {
this(client, 100);
this(client, DEFAULT_MAX_THREADS);

}

/**
* Constructor that accepts a prebuild AccumuloClient and limits the internal result queue to the provided value
*
* @param client
* an {@link AccumuloClient}
* @param queueSize
* the internal result queue size
*/
public ScannerFactory(AccumuloClient client, int queueSize) {
try {
this.cxn = client;
this.client = client;
this.scanQueue = new ResourceQueue(queueSize, client);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Method that allows a ScannerFactory to be updated by a config after initialization
*
* @param genericConfig
* a {@link GenericQueryConfiguration}
*/
public void updateConfigs(GenericQueryConfiguration genericConfig) {

this.client = genericConfig.getClient();

Map<String,ScannerBase.ConsistencyLevel> consistencyLevels = genericConfig.getTableConsistencyLevels();
if (consistencyLevels != null && !consistencyLevels.isEmpty()) {
this.consistencyByTable = genericConfig.getTableConsistencyLevels();
}

Map<String,Map<String,String>> hints = genericConfig.getTableHints();
if (hints != null && !hints.isEmpty()) {
this.hintsByTable = genericConfig.getTableHints();
}

int numThreads = DEFAULT_MAX_THREADS;
if (genericConfig instanceof ShardQueryConfiguration) {
ShardQueryConfiguration config = (ShardQueryConfiguration) genericConfig;

this.settings = config.getQuery();
this.accrueStats = config.getAccrueStats();
this.maxQueue = config.getMaxScannerBatchSize();
this.config = config;

numThreads = config.getNumQueryThreads();
}

try {
this.scanQueue = new ResourceQueue(numThreads, this.client);
} catch (Exception e) {
throw new RuntimeException(e);
}

if (log.isDebugEnabled()) {
log.debug("Created ScannerFactory " + System.identityHashCode(this) + " is wrapped ? " + (client instanceof WrappedConnector));
}
}

public Scanner newSingleScanner(String tableName, Set<Authorizations> auths, Query query) throws TableNotFoundException {
if (open.get()) {
Scanner bs = QueryScannerHelper.createScannerWithoutInfo(cxn, tableName, auths, query);
Scanner bs = QueryScannerHelper.createScannerWithoutInfo(client, tableName, auths, query);
applyConfigs(bs, tableName);

log.debug("Created scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
Expand All @@ -103,7 +161,9 @@ public Scanner newSingleScanner(String tableName, Set<Authorizations> auths, Que

public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int threads, Query query) throws TableNotFoundException {
if (open.get()) {
BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query);
BatchScanner bs = QueryScannerHelper.createBatchScanner(client, tableName, auths, threads, query);
applyConfigs(bs, tableName);

log.debug("Created scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
Expand All @@ -124,7 +184,9 @@ public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int

public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int threads, Query query, boolean reportErrors) throws TableNotFoundException {
if (open.get()) {
BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query, reportErrors);
BatchScanner bs = QueryScannerHelper.createBatchScanner(client, tableName, auths, threads, query, reportErrors);
applyConfigs(bs, tableName);

log.debug("Created scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
Expand Down Expand Up @@ -166,7 +228,6 @@ public BatchScanner newScanner(String tableName, Query query) throws TableNotFou
* if there are issues
*/
public BatchScannerSession newQueryScanner(final String tableName, final Set<Authorizations> auths, Query settings) throws Exception {

return newLimitedScanner(BatchScannerSession.class, tableName, auths, settings).setThreads(scanQueue.getCapacity());
}

Expand Down Expand Up @@ -202,14 +263,16 @@ public <T extends ScannerSession> T newLimitedScanner(Class<T> wrapper, final St
stats = new ScanSessionStats();
}

T session = null;
T session;
if (wrapper == ScannerSession.class) {
session = (T) new ScannerSession(tableName, auths, scanQueue, maxQueue, settings).applyStats(stats);
} else {
session = wrapper.getConstructor(ScannerSession.class)
.newInstance(new ScannerSession(tableName, auths, scanQueue, maxQueue, settings).applyStats(stats));
}

applyConfigs(session, tableName);

log.debug("Created session " + System.identityHashCode(session));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + session.hashCode());
Expand Down Expand Up @@ -316,24 +379,24 @@ public ScannerBase newRfileScanner(String tableName, Set<Authorizations> auths,
if (open.get()) {
Configuration conf = new Configuration();

AccumuloClient con = cxn;

Properties clientProps = con.properties();
Properties clientProps = client.properties();
final String instanceName = clientProps.getProperty(ClientProperty.INSTANCE_NAME.getKey());
final String zookeepers = clientProps.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey());

AccumuloHelper.setInstanceName(conf, instanceName);
AccumuloHelper.setUsername(conf, con.whoami());
AccumuloHelper.setUsername(conf, client.whoami());

AccumuloHelper.setZooKeepers(conf, zookeepers);
BulkInputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);

AccumuloHelper.setPassword(conf, config.getAccumuloPassword().getBytes());
BulkInputFormat.setMemoryInput(conf, con.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next());
BulkInputFormat.setMemoryInput(conf, client.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next());

conf.set(MultiRfileInputformat.CACHE_METADATA, "true");

ScannerBase baseScanner = new RfileScanner(con, conf, tableName, auths, 1);
ScannerBase baseScanner = new RfileScanner(client, conf, tableName, auths, 1);

applyConfigs(baseScanner, tableName);

synchronized (open) {
if (open.get()) {
Expand All @@ -348,4 +411,44 @@ public ScannerBase newRfileScanner(String tableName, Set<Authorizations> auths,
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}

/**
* Apply table-specific scanner configs to the provided scanner base object
*
* @param scannerBase
* a {@link ScannerBase}
* @param tableName
* the table
*/
protected void applyConfigs(ScannerBase scannerBase, String tableName) {
if (consistencyByTable != null && consistencyByTable.containsKey(tableName)) {
scannerBase.setConsistencyLevel(consistencyByTable.get(tableName));
}

if (hintsByTable != null && hintsByTable.containsKey(tableName)) {
scannerBase.setExecutionHints(hintsByTable.get(tableName));
}
}

/**
* Apply table-specific scanner configs to the provided scanner session
*
* @param scannerSession
* the {@link ScannerSession}
* @param tableName
* the table
*/
protected void applyConfigs(ScannerSession scannerSession, String tableName) {
SessionOptions options = scannerSession.getOptions();

if (consistencyByTable != null && consistencyByTable.containsKey(tableName)) {
options.setConsistencyLevel(consistencyByTable.get(tableName));
}

if (hintsByTable != null && hintsByTable.containsKey(tableName)) {
options.setExecutionHints(hintsByTable.get(tableName));
}

scannerSession.setOptions(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ public void setReverseIndexTableName(String reverseIndexTableName) {

@Override
public GenericQueryConfiguration initialize(AccumuloClient client, Query settings, Set<Authorizations> auths) throws Exception {
this.config = new ShardIndexQueryConfiguration(this, settings);
this.scannerFactory = new ScannerFactory(client);
MetadataHelper metadataHelper = initializeMetadataHelper(client, getConfig().getMetadataTableName(), auths);
ShardIndexQueryConfiguration config = new ShardIndexQueryConfiguration(this, settings);
config.setClient(client);
this.scannerFactory = new ScannerFactory(config);

MetadataHelper metadataHelper = initializeMetadataHelper(client, config.getMetadataTableName(), auths);

if (StringUtils.isEmpty(settings.getQuery())) {
throw new IllegalArgumentException("Query cannot be null");
Expand Down Expand Up @@ -424,7 +426,6 @@ public void setupQuery(AccumuloClient client, GenericQueryConfiguration baseConf
baseConfig.setQueries(checkpoint.getQueries());
config.setClient(client);

scannerFactory = new ScannerFactory(client);
MetadataHelper metadataHelper = initializeMetadataHelper(client, config.getMetadataTableName(), config.getAuthorizations());
config.setQueryModel(metadataHelper.getQueryModel(config.getModelTableName(), config.getModelName(), null));

Expand Down
Loading

0 comments on commit 0f34220

Please sign in to comment.