Skip to content

Commit

Permalink
FMWK-268 Store secondary indexes in metadata and make it cacheable
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Nov 30, 2023
1 parent 6f5ae1f commit c23250b
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 15 deletions.
7 changes: 5 additions & 2 deletions src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.aerospike.jdbc.sql.SimpleWrapper;
import com.aerospike.jdbc.sql.type.ByteArrayBlob;
import com.aerospike.jdbc.sql.type.StringClob;
import com.aerospike.jdbc.util.MetadataBuilder;

import java.sql.*;
import java.util.Map;
Expand All @@ -32,6 +33,7 @@ public class AerospikeConnection implements Connection, SimpleWrapper {
private final String url;
private final DriverConfiguration config;
private final IAerospikeClient client;
private final MetadataBuilder metadataBuilder;
private final AtomicReference<String> schema = new AtomicReference<>(null); // namespace
private volatile boolean readOnly = false;
private volatile Map<String, Class<?>> typeMap = emptyMap();
Expand All @@ -43,6 +45,7 @@ public AerospikeConnection(String url, Properties props) {
config = new DriverConfiguration(props);
config.parse(url);
client = new AerospikeClient(config.getClientPolicy(), config.getHosts());
metadataBuilder = new MetadataBuilder(config.getDriverPolicy());
schema.set(config.getSchema()); // namespace
}

Expand Down Expand Up @@ -98,9 +101,9 @@ public boolean isClosed() {
}

@Override
public DatabaseMetaData getMetaData() {
public DatabaseMetaData getMetaData() throws SQLException {
logger.info("getMetaData request");
return new AerospikeDatabaseMetadata(url, client, this);
return metadataBuilder.build(url, client, this);
}

@Override
Expand Down
20 changes: 13 additions & 7 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.aerospike.client.Info;
import com.aerospike.client.policy.InfoPolicy;
import com.aerospike.client.query.IndexType;
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.AerospikeSecondaryIndex;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
Expand Down Expand Up @@ -56,8 +55,9 @@ public class AerospikeDatabaseMetadata implements DatabaseMetaData, SimpleWrappe
private final String dbBuild;
private final String dbEdition;
private final List<String> catalogs;
private final Map<String, Collection<String>> tables = new ConcurrentHashMap<>();
private final Map<String, Collection<AerospikeSecondaryIndex>> indices = new ConcurrentHashMap<>();
private final Map<String, Collection<String>> tables;
private final Map<String, Collection<AerospikeSecondaryIndex>> catalogIndexes;
private final Map<String, AerospikeSecondaryIndex> secondaryIndexes;

public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection connection) {
logger.info("Init AerospikeDatabaseMetadata");
Expand All @@ -69,6 +69,8 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection
Collection<String> editions = synchronizedSet(new HashSet<>());
Collection<String> namespaces = synchronizedSet(new HashSet<>());
final InfoPolicy infoPolicy = client.getInfoPolicyDefault();
catalogIndexes = new ConcurrentHashMap<>();
tables = new ConcurrentHashMap<>();
Arrays.stream(client.getNodes()).parallel()
.map(node -> Info.request(infoPolicy, node, "namespaces", "sets", "sindex", "build", "edition"))
.forEach(r -> {
Expand All @@ -82,7 +84,7 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection
streamOfSubProperties(r, "sindex")
.filter(AerospikeUtils::isSupportedIndexType)
.forEach(p ->
indices.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
catalogIndexes.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
.add(new AerospikeSecondaryIndex(
p.getProperty("ns"),
p.getProperty("set"),
Expand All @@ -93,7 +95,7 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, Connection
)
);
});
AerospikeQuery.secondaryIndexes = indices.values().stream()
secondaryIndexes = catalogIndexes.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toMap(AerospikeSecondaryIndex::toKey, Function.identity()));

Expand Down Expand Up @@ -950,9 +952,9 @@ public ResultSet getTypeInfo() {
public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) {
Stream<AerospikeSecondaryIndex> secondaryIndexStream;
if (catalog == null) {
secondaryIndexStream = indices.entrySet().stream().flatMap(p -> p.getValue().stream());
secondaryIndexStream = catalogIndexes.entrySet().stream().flatMap(p -> p.getValue().stream());
} else {
secondaryIndexStream = getOrDefault(indices, catalog, Collections.emptyList()).stream();
secondaryIndexStream = getOrDefault(catalogIndexes, catalog, Collections.emptyList()).stream();
}
final Iterable<List<?>> indicesData = secondaryIndexStream
.filter(i -> i.getNamespace().equals(schema) && i.getSet().equals(table))
Expand All @@ -970,6 +972,10 @@ public ResultSet getIndexInfo(String catalog, String schema, String table, boole
systemColumns(columns, sqlTypes), indicesData);
}

public Map<String, AerospikeSecondaryIndex> getSecondaryIndexes() {
return secondaryIndexes;
}

@Override
public boolean supportsResultSetType(int type) {
return false;
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static com.aerospike.jdbc.util.Constants.defaultSchemaName;

Expand All @@ -26,8 +25,6 @@ public class AerospikeQuery {
.withUnquotedCasing(Casing.UNCHANGED)
.withQuotedCasing(Casing.UNCHANGED);

public static volatile Map<String, AerospikeSecondaryIndex> secondaryIndexes;

private String catalog;
private String schema;
private String table;
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/aerospike/jdbc/model/DriverPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
public class DriverPolicy {

private static final int DEFAULT_CAPACITY = 256;
private static final int DEFAULT_TIMEOUT = 1000;
private static final int DEFAULT_TIMEOUT_MS = 1000;
private static final int DEFAULT_METADATA_CACHE_TTL_SECONDS = 3600;

private final int recordSetQueueCapacity;
private final int recordSetTimeoutMs;
private final int metadataCacheTtlSeconds;

public DriverPolicy(Properties properties) {
recordSetQueueCapacity = parseInt(properties.getProperty("recordSetQueueCapacity"), DEFAULT_CAPACITY);
recordSetTimeoutMs = parseInt(properties.getProperty("recordSetTimeoutMs"), DEFAULT_TIMEOUT);
recordSetTimeoutMs = parseInt(properties.getProperty("recordSetTimeoutMs"), DEFAULT_TIMEOUT_MS);
metadataCacheTtlSeconds = parseInt(properties.getProperty("metadataCacheTtlSeconds"),
DEFAULT_METADATA_CACHE_TTL_SECONDS);
}

public int getRecordSetQueueCapacity() {
Expand All @@ -23,6 +27,10 @@ public int getRecordSetTimeoutMs() {
return recordSetTimeoutMs;
}

public int getMetadataCacheTtlSeconds() {
return metadataCacheTtlSeconds;
}

private int parseInt(String value, int defaultValue) {
if (value != null) {
return Integer.parseInt(value);
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.query.KeyRecord;
import com.aerospike.jdbc.AerospikeDatabaseMetadata;
import com.aerospike.jdbc.async.EventLoopProvider;
import com.aerospike.jdbc.async.RecordSet;
import com.aerospike.jdbc.async.RecordSetBatchSequenceListener;
Expand All @@ -22,6 +23,7 @@
import com.aerospike.jdbc.util.VersionUtils;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.*;
Expand All @@ -35,10 +37,17 @@ public class SelectQueryHandler extends BaseQueryHandler {

private static final Logger logger = Logger.getLogger(SelectQueryHandler.class.getName());

protected final Map<String, AerospikeSecondaryIndex> secondaryIndexes;
protected List<DataColumn> columns;

public SelectQueryHandler(IAerospikeClient client, Statement statement) {
super(client, statement);
try {
secondaryIndexes = ((AerospikeDatabaseMetadata) statement.getConnection().getMetaData())
.getSecondaryIndexes();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get secondary indexes", e);
}
}

@Override
Expand Down Expand Up @@ -125,7 +134,7 @@ private Pair<ResultSet, Integer> executeQuery(AerospikeQuery query,
private Optional<AerospikeSecondaryIndex> secondaryIndex(AerospikeQuery query) {
if (VersionUtils.isSIndexSupported(client) && Objects.nonNull(query.getPredicate())
&& query.getPredicate().isIndexable() && Objects.isNull(query.getOffset())) {
Map<String, AerospikeSecondaryIndex> indexMap = AerospikeQuery.secondaryIndexes;
Map<String, AerospikeSecondaryIndex> indexMap = secondaryIndexes;
List<String> binNames = query.getPredicate().getBinNames();
if (!binNames.isEmpty() && indexMap != null && !indexMap.isEmpty()) {
if (binNames.size() == 1) {
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/aerospike/jdbc/util/MetadataBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.aerospike.jdbc.util;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.jdbc.AerospikeDatabaseMetadata;
import com.aerospike.jdbc.model.DriverPolicy;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.sql.Connection;
import java.sql.SQLException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class MetadataBuilder {

private final Cache<String, AerospikeDatabaseMetadata> metadataCache;

public MetadataBuilder(DriverPolicy driverPolicy) {
metadataCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofSeconds(driverPolicy.getMetadataCacheTtlSeconds()))
.build();
}

public AerospikeDatabaseMetadata build(String url, IAerospikeClient client, Connection connection)
throws SQLException {
try {
return metadataCache.get(url, () -> new AerospikeDatabaseMetadata(url, client, connection));
} catch (ExecutionException e) {
throw new SQLException(e);
}
}
}
4 changes: 4 additions & 0 deletions src/test/java/com/aerospike/jdbc/ParseJdbcUrlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ public void testParseUrlParameters() throws Exception {
Properties update = new Properties();
update.setProperty("totalTimeout", "3000");
update.setProperty("sendKey", "true");
update.setProperty("recordSetQueueCapacity", "1024");
update.setProperty("metadataCacheTtlSeconds", "7200");
connection.setClientInfo(update);
assertEquals(config.getQueryPolicy().totalTimeout, 3000);
assertEquals(config.getWritePolicy().totalTimeout, 3000);
assertEquals(config.getScanPolicy().totalTimeout, 3000);
assertTrue(config.getQueryPolicy().sendKey);
assertTrue(config.getWritePolicy().sendKey);
assertTrue(config.getScanPolicy().sendKey);
assertEquals(config.getDriverPolicy().getRecordSetQueueCapacity(), 1024);
assertEquals(config.getDriverPolicy().getMetadataCacheTtlSeconds(), 7200);

connection.setClientInfo("recordSetTimeoutMs", "7000");
assertEquals(config.getDriverPolicy().getRecordSetTimeoutMs(), 7000);
Expand Down

0 comments on commit c23250b

Please sign in to comment.