Skip to content

Commit

Permalink
fixes #2406: scanner factory deadlock (#2407)
Browse files Browse the repository at this point in the history
* Removed synchronization in the scanner factory
* Updated to track all scanners created by the factory
  • Loading branch information
ivakegg authored and hgklohr committed Jun 10, 2024
1 parent a13d7d0 commit 3ffbad9
Showing 1 changed file with 113 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
Expand Down Expand Up @@ -33,33 +34,31 @@
public class ScannerFactory {

protected int maxQueue = 1000;
protected HashSet<ScannerBase> instances = new HashSet<>();
protected HashSet<ScannerSession> sessionInstances = new HashSet<>();
protected AccumuloClient cxn;
protected boolean open = true;
protected final Set<ScannerBase> instances = Collections.synchronizedSet(new HashSet<>());
protected final Set<ScannerSession> sessionInstances = Collections.synchronizedSet(new HashSet<>());
protected final AccumuloClient cxn;
// using an AtomicBoolean to give us a separate monitor for synchronization
protected final AtomicBoolean open = new AtomicBoolean(true);

protected boolean accrueStats = false;
protected Query settings;
protected Query settings = null;
protected ResourceQueue scanQueue = null;
ShardQueryConfiguration config = null;
protected ShardQueryConfiguration config = null;

private static final Logger log = Logger.getLogger(ScannerFactory.class);

public ScannerFactory(GenericQueryConfiguration queryConfiguration) {

this.cxn = queryConfiguration.getClient();

if (queryConfiguration instanceof ShardQueryConfiguration) {
this.settings = ((ShardQueryConfiguration) queryConfiguration).getQuery();
this.accrueStats = ((ShardQueryConfiguration) queryConfiguration).getAccrueStats();
}
log.debug("Created scanner factory " + System.identityHashCode(this) + " is wrapped ? " + (cxn instanceof WrappedConnector));

if (queryConfiguration instanceof ShardQueryConfiguration) {
config = ((ShardQueryConfiguration) queryConfiguration);
maxQueue = ((ShardQueryConfiguration) queryConfiguration).getMaxScannerBatchSize();
this.settings = ((ShardQueryConfiguration) queryConfiguration).getQuery();
this.config = ((ShardQueryConfiguration) queryConfiguration);
this.maxQueue = this.config.getMaxScannerBatchSize();
this.settings = this.config.getQuery();
this.accrueStats = this.config.getAccrueStats();
try {
scanQueue = new ResourceQueue(((ShardQueryConfiguration) queryConfiguration).getNumQueryThreads(), this.cxn);
scanQueue = new ResourceQueue(this.config.getNumQueryThreads(), this.cxn);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -74,50 +73,71 @@ public ScannerFactory(AccumuloClient client) {
public ScannerFactory(AccumuloClient client, int queueSize) {
try {
this.cxn = client;
scanQueue = new ResourceQueue(queueSize, client);
this.scanQueue = new ResourceQueue(queueSize, client);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public synchronized Scanner newSingleScanner(String tableName, Set<Authorizations> auths, Query query) throws TableNotFoundException {
if (open) {
public Scanner newSingleScanner(String tableName, Set<Authorizations> auths, Query query) throws TableNotFoundException {
if (open.get()) {
Scanner bs = QueryScannerHelper.createScannerWithoutInfo(cxn, tableName, auths, query);
log.debug("Created scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
}

return bs;
synchronized (open) {
if (open.get()) {
instances.add(bs);
return bs;
} else {
bs.close();
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}
} else {
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}

public synchronized BatchScanner newScanner(String tableName, Set<Authorizations> auths, int threads, Query query) throws TableNotFoundException {
if (open) {
public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int threads, Query query) throws TableNotFoundException {
if (open.get()) {
BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query);
log.debug("Created scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
}
instances.add(bs);
return bs;
synchronized (open) {
if (open.get()) {
instances.add(bs);
return bs;
} else {
bs.close();
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}
} else {
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}

public synchronized BatchScanner newScanner(String tableName, Set<Authorizations> auths, int threads, Query query, boolean reportErrors)
throws TableNotFoundException {
if (open) {
public BatchScanner newScanner(String tableName, Set<Authorizations> auths, int threads, Query query, boolean reportErrors) throws TableNotFoundException {
if (open.get()) {
BatchScanner bs = QueryScannerHelper.createBatchScanner(cxn, tableName, auths, threads, query, reportErrors);
log.debug("Created scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Adding instance " + bs.hashCode());
}
instances.add(bs);
return bs;
synchronized (open) {
if (open.get()) {
instances.add(bs);
return bs;
} else {
bs.close();
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}
} else {
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
Expand Down Expand Up @@ -145,7 +165,7 @@ public BatchScanner newScanner(String tableName, Query query) throws TableNotFou
* @throws Exception
* if there are issues
*/
public synchronized BatchScannerSession newQueryScanner(final String tableName, final Set<Authorizations> auths, Query settings) throws Exception {
public BatchScannerSession newQueryScanner(final String tableName, final Set<Authorizations> auths, Query settings) throws Exception {

return newLimitedScanner(BatchScannerSession.class, tableName, auths, settings).setThreads(scanQueue.getCapacity());
}
Expand All @@ -169,11 +189,11 @@ public synchronized BatchScannerSession newQueryScanner(final String tableName,
* if there are issues
*
*/
public synchronized <T extends ScannerSession> T newLimitedScanner(Class<T> wrapper, final String tableName, final Set<Authorizations> auths,
final Query settings) throws Exception {
public <T extends ScannerSession> T newLimitedScanner(Class<T> wrapper, final String tableName, final Set<Authorizations> auths, final Query settings)
throws Exception {
Preconditions.checkNotNull(scanQueue);
Preconditions.checkNotNull(wrapper);
Preconditions.checkArgument(open, "Factory has been locked. No New scanners can be created");
Preconditions.checkArgument(open.get(), "Factory has been locked. No New scanners can be created");

log.debug("Creating limited scanner whose max threads is is " + scanQueue.getCapacity() + " and max capacity is " + maxQueue);

Expand All @@ -194,9 +214,15 @@ public synchronized <T extends ScannerSession> T newLimitedScanner(Class<T> wrap
if (log.isTraceEnabled()) {
log.trace("Adding instance " + session.hashCode());
}
sessionInstances.add(session);

return session;
synchronized (open) {
if (open.get()) {
sessionInstances.add(session);
return session;
} else {
session.close();
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}
}

/**
Expand All @@ -213,32 +239,37 @@ public synchronized <T extends ScannerSession> T newLimitedScanner(Class<T> wrap
* @throws Exception
* if there are issues
*/
public synchronized RangeStreamScanner newRangeScanner(final String tableName, final Set<Authorizations> auths, final Query settings) throws Exception {
public RangeStreamScanner newRangeScanner(final String tableName, final Set<Authorizations> auths, final Query settings) throws Exception {
return newRangeScanner(tableName, auths, settings, Integer.MAX_VALUE);
}

public RangeStreamScanner newRangeScanner(String tableName, Set<Authorizations> auths, Query query, int shardsPerDayThreshold) throws Exception {
return newLimitedScanner(RangeStreamScanner.class, tableName, auths, settings).setShardsPerDayThreshold(shardsPerDayThreshold).setScannerFactory(this);
}

public synchronized boolean close(ScannerBase bs) {
boolean removed = instances.remove(bs);
if (removed) {
public boolean close(ScannerBase bs) {
try {
log.debug("Closed scanner " + System.identityHashCode(bs));
if (log.isTraceEnabled()) {
log.trace("Closing instance " + bs.hashCode());
if (instances.remove(bs)) {
if (log.isTraceEnabled()) {
log.trace("Closing instance " + bs.hashCode());
}
bs.close();
return true;
}
bs.close();
} catch (Exception e) {
// ANY EXCEPTION HERE CAN SAFELY BE IGNORED
log.trace("Exception closing ScannerBase, can be safely ignored: {}", e);
}
return removed;
return false;
}

/**
* Returns a NEW collection of scanner instances to the caller.
*
* @return a NEW collection of scanners
*/
public synchronized Collection<ScannerBase> currentScanners() {
public Collection<ScannerBase> currentScanners() {
return new ArrayList<>(instances);
}

Expand All @@ -247,28 +278,30 @@ public synchronized Collection<ScannerBase> currentScanners() {
*
* @return a NEW collection of scanner session instances
*/
public synchronized Collection<ScannerSession> currentSessions() {
public Collection<ScannerSession> currentSessions() {
return new ArrayList<>(sessionInstances);
}

public synchronized boolean lockdown() {
public boolean lockdown() {
log.debug("Locked scanner factory " + System.identityHashCode(this));
if (log.isTraceEnabled()) {
log.trace("Locked down with following stacktrace", new Exception("stacktrace for debugging"));
}

open = false;
return open;
synchronized (open) {
return open.getAndSet(false);
}
}

public synchronized void close(ScannerSession bs) {
public void close(ScannerSession bs) {
try {
log.debug("Closed session " + System.identityHashCode(bs));
sessionInstances.remove(bs);
if (log.isTraceEnabled()) {
log.trace("Closing instance " + bs.hashCode());
if (sessionInstances.remove(bs)) {
if (log.isTraceEnabled()) {
log.trace("Closing instance " + bs.hashCode());
}
bs.close();
}
bs.close();
} catch (Exception e) {
// ANY EXCEPTION HERE CAN SAFELY BE IGNORED
log.trace("Exception closing ScannerSession, can be safely ignored: {}", e);
Expand All @@ -279,30 +312,40 @@ public void setMaxQueue(int size) {
this.maxQueue = size;
}

public synchronized ScannerBase newRfileScanner(String tableName, Set<Authorizations> auths, Query setting) {
Configuration conf = new Configuration();
public ScannerBase newRfileScanner(String tableName, Set<Authorizations> auths, Query setting) {
if (open.get()) {
Configuration conf = new Configuration();

AccumuloClient con = cxn;
AccumuloClient con = cxn;

Properties clientProps = con.properties();
final String instanceName = clientProps.getProperty(ClientProperty.INSTANCE_NAME.getKey());
final String zookeepers = clientProps.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey());
Properties clientProps = con.properties();
final String instanceName = clientProps.getProperty(ClientProperty.INSTANCE_NAME.getKey());
final String zookeepers = clientProps.getProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey());

AccumuloHelper.setInstanceName(conf, instanceName);
AccumuloHelper.setUsername(conf, con.whoami());
AccumuloHelper.setInstanceName(conf, instanceName);
AccumuloHelper.setUsername(conf, con.whoami());

AccumuloHelper.setZooKeepers(conf, zookeepers);
BulkInputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);
AccumuloHelper.setZooKeepers(conf, zookeepers);
BulkInputFormat.setZooKeeperInstance(conf, instanceName, zookeepers);

AccumuloHelper.setPassword(conf, config.getAccumuloPassword().getBytes());
BulkInputFormat.setMemoryInput(conf, con.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next());
AccumuloHelper.setPassword(conf, config.getAccumuloPassword().getBytes());
BulkInputFormat.setMemoryInput(conf, con.whoami(), config.getAccumuloPassword().getBytes(), tableName, auths.iterator().next());

conf.set(MultiRfileInputformat.CACHE_METADATA, "true");
conf.set(MultiRfileInputformat.CACHE_METADATA, "true");

ScannerBase baseScanner = new RfileScanner(con, conf, tableName, auths, 1);
ScannerBase baseScanner = new RfileScanner(con, conf, tableName, auths, 1);

instances.add(baseScanner);

return baseScanner;
synchronized (open) {
if (open.get()) {
instances.add(baseScanner);
return baseScanner;
} else {
baseScanner.close();
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}
} else {
throw new IllegalStateException("Factory has been locked. No new scanners can be created.");
}
}
}

0 comments on commit 3ffbad9

Please sign in to comment.