Skip to content

Commit

Permalink
Rename constants and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Dec 4, 2023
1 parent c282218 commit 0716874
Show file tree
Hide file tree
Showing 23 changed files with 284 additions and 168 deletions.
5 changes: 4 additions & 1 deletion src/main/java/com/aerospike/jdbc/AerospikeConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ public void setNetworkTimeout(Executor executor, int milliseconds) {
client.getScanPolicyDefault(),
client.getQueryPolicyDefault(),
client.getBatchPolicyDefault()
}).forEach(p -> p.totalTimeout = milliseconds);
}).forEach(p -> {
p.totalTimeout = milliseconds;
p.connectTimeout = milliseconds;
});
client.getInfoPolicyDefault().timeout = milliseconds;
}

Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/aerospike/jdbc/AerospikeDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.util.stream.Stream;

import static com.aerospike.jdbc.util.AerospikeUtils.getIndexBinValuesRatio;
import static com.aerospike.jdbc.util.Constants.defaultKeyName;
import static com.aerospike.jdbc.util.Constants.defaultSchemaName;
import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;
import static com.aerospike.jdbc.util.Constants.schemaScanRecords;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
Expand Down Expand Up @@ -78,7 +78,7 @@ public AerospikeDatabaseMetadata(String url, IAerospikeClient client, AerospikeC
namespaces.addAll(asList(getOrDefault(r, "namespaces", "").split(";")));
streamOfSubProperties(r, "sets").forEach(p ->
tables.computeIfAbsent(p.getProperty("ns"), s -> new HashSet<>())
.addAll(Arrays.asList(p.getProperty("set"), defaultSchemaName))
.addAll(Arrays.asList(p.getProperty("set"), DEFAULT_SCHEMA_NAME))
);
streamOfSubProperties(r, "sindex")
.filter(AerospikeUtils::isSupportedIndexType)
Expand Down Expand Up @@ -859,12 +859,12 @@ public ResultSet getPrimaryKeys(String catalog, String schema, String table) {
if (catalog == null) {
tablesData = tables.entrySet().stream()
.flatMap(p -> p.getValue().stream().map(t ->
asList(p.getKey(), null, t, defaultKeyName, 1, defaultKeyName)))
asList(p.getKey(), null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME)))
.collect(toList());
} else {
tablesData = tables.getOrDefault(catalog, Collections.emptyList()).stream()
.filter(t -> table == null || table.equals(t))
.map(t -> asList(catalog, null, t, defaultKeyName, 1, defaultKeyName))
.map(t -> asList(catalog, null, t, PRIMARY_KEY_COLUMN_NAME, 1, PRIMARY_KEY_COLUMN_NAME))
.collect(toList());
}

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/aerospike/jdbc/async/ScanQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@

import java.util.Objects;

import static com.aerospike.jdbc.util.Constants.defaultKeyName;

public class ScanQueryHandler {

private final IAerospikeClient client;
private RecordSetRecordSequenceListener listener;

private int currentPartition;
private int count;

private final ScanCallback callback = ((key, rec) -> {
listener.onRecord(key, rec);
count++;
Expand All @@ -36,23 +35,22 @@ public static ScanQueryHandler create(IAerospikeClient client, DriverPolicy driv
}

public RecordSet execute(ScanPolicy scanPolicy, AerospikeQuery query) {
if (query.getBinNames() != null && query.getBinNames().length == 1
&& query.getBinNames()[0].equals(defaultKeyName)) {
if (query.isPrimaryKeyOnly()) {
scanPolicy.includeBinData = false;
}
if (Objects.nonNull(query.getOffset())) {
long maxRecords = scanPolicy.maxRecords;
PartitionFilter filter = getPartitionFilter(query);
while (isScanRequired(maxRecords)) {
client.scanPartitions(scanPolicy, filter, query.getSchema(), query.getSetName(),
callback, query.getBinNames());
callback, query.columnBins());
scanPolicy.maxRecords = maxRecords > 0 ? maxRecords - count : maxRecords;
filter = PartitionFilter.id(++currentPartition);
}
listener.onSuccess();
} else {
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getSetName(), query.getBinNames());
query.getSetName(), query.columnBins());
}
return listener.getRecordSet();
}
Expand Down
31 changes: 24 additions & 7 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

import static com.aerospike.jdbc.util.Constants.defaultSchemaName;
import static com.aerospike.jdbc.util.Constants.DEFAULT_SCHEMA_NAME;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public class AerospikeQuery {

Expand All @@ -26,6 +28,8 @@ public class AerospikeQuery {
.withUnquotedCasing(Casing.UNCHANGED)
.withQuotedCasing(Casing.UNCHANGED);

private static final String ASTERISK = "*";

private String catalog;
private String schema;
private String table;
Expand Down Expand Up @@ -89,7 +93,7 @@ public void setTable(String table) {
}

public String getSetName() {
if (table.equals(defaultSchemaName)) {
if (table.equals(DEFAULT_SCHEMA_NAME)) {
return null;
}
return table;
Expand Down Expand Up @@ -147,11 +151,12 @@ public void setColumns(List<String> columns) {
this.columns = columns;
}

public String[] getBinNames() {
if (columns.size() == 1 && columns.get(0).equals("*")) {
return null;
}
return columns.toArray(new String[0]);
public String[] columnBins() {
String[] binNames = columns.stream()
.filter(c -> !Objects.equals(c, ASTERISK))
.filter(c -> !Objects.equals(c, PRIMARY_KEY_COLUMN_NAME))
.toArray(String[]::new);
return binNames.length == 0 ? null : binNames;
}

public Collection<Object> getPrimaryKeys() {
Expand All @@ -161,6 +166,18 @@ public Collection<Object> getPrimaryKeys() {
return Collections.emptyList();
}

public boolean isPrimaryKeyOnly() {
return columns.size() == 1 && columns.get(0).equals(PRIMARY_KEY_COLUMN_NAME);
}

public boolean isStar() {
return columns.stream().anyMatch(c -> c.equals(ASTERISK));
}

public boolean isCount() {
return columns.size() == 1 && columns.get(0).toLowerCase(Locale.ENGLISH).startsWith("count(");
}

public boolean isIndexable() {
return Objects.nonNull(predicate) && predicate.isIndexable() && Objects.isNull(offset);
}
Expand Down
32 changes: 20 additions & 12 deletions src/main/java/com/aerospike/jdbc/model/AerospikeSqlVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.math.BigDecimal;
import java.util.stream.Collectors;

import static com.aerospike.jdbc.util.Constants.unsupportedQueryType;
import static com.aerospike.jdbc.util.Constants.UNSUPPORTED_QUERY_TYPE_MESSAGE;
import static java.util.Objects.requireNonNull;

public class AerospikeSqlVisitor implements SqlVisitor<AerospikeQuery> {
Expand All @@ -36,7 +36,8 @@ public AerospikeQuery visit(SqlCall sqlCall) {
SqlSelect sql = (SqlSelect) sqlCall;
query.setQueryType(QueryType.SELECT);
query.setTable(requireNonNull(sql.getFrom()).toString());
query.setColumns(sql.getSelectList().stream().map(SqlNode::toString).collect(Collectors.toList()));
query.setColumns(sql.getSelectList().stream()
.map(SqlNode::toString).collect(Collectors.toList()));
if (sql.hasWhere()) {
query.setPredicate(parseWhere((SqlBasicCall) requireNonNull(sql.getWhere())));
}
Expand All @@ -47,8 +48,10 @@ public AerospikeQuery visit(SqlCall sqlCall) {
if (sql.getCondition() != null) {
query.setPredicate(parseWhere((SqlBasicCall) sql.getCondition()));
}
query.setColumns(sql.getTargetColumnList().stream().map(SqlNode::toString).collect(Collectors.toList()));
query.setValues(sql.getSourceExpressionList().stream().map(this::parseValue).collect(Collectors.toList()));
query.setColumns(sql.getTargetColumnList().stream()
.map(SqlNode::toString).collect(Collectors.toList()));
query.setValues(sql.getSourceExpressionList().stream()
.map(this::parseValue).collect(Collectors.toList()));
} else if (sqlCall instanceof SqlInsert) {
SqlInsert sql = (SqlInsert) sqlCall;
query.setQueryType(QueryType.INSERT);
Expand Down Expand Up @@ -79,19 +82,23 @@ public AerospikeQuery visit(SqlCall sqlCall) {
query.setQueryType(QueryType.DROP_SCHEMA);
} else if (sqlCall instanceof SqlOrderBy) {
SqlOrderBy sql = (SqlOrderBy) sqlCall;
if (!sql.orderList.isEmpty()) throw new UnsupportedOperationException(unsupportedQueryType);
if (!sql.orderList.isEmpty()) {
throw new UnsupportedOperationException(UNSUPPORTED_QUERY_TYPE_MESSAGE);
}
if (sql.fetch != null) {
query.setLimit(requireNonNull((BigDecimal) ((SqlNumericLiteral) sql.fetch).getValue()).intValue());
query.setLimit(requireNonNull((BigDecimal) ((SqlNumericLiteral) sql.fetch)
.getValue()).intValue());
}
if (sql.offset != null) {
query.setOffset(requireNonNull((BigDecimal) ((SqlNumericLiteral) sql.offset).getValue()).intValue());
query.setOffset(requireNonNull((BigDecimal) ((SqlNumericLiteral) sql.offset)
.getValue()).intValue());
}
visit((SqlCall) sql.query);
} else {
throw new UnsupportedOperationException(unsupportedQueryType);
throw new UnsupportedOperationException(UNSUPPORTED_QUERY_TYPE_MESSAGE);
}
} catch (Exception e) {
throw new UnsupportedOperationException(unsupportedQueryType);
throw new UnsupportedOperationException(UNSUPPORTED_QUERY_TYPE_MESSAGE);
}
return query;
}
Expand All @@ -109,7 +116,8 @@ private QueryPredicate parseWhere(SqlBasicCall where) {
return new QueryPredicateList(
where.getOperandList().get(0).toString(),
operator,
((SqlNodeList) where.getOperandList().get(1)).stream().map(this::parseValue).distinct().toArray()
((SqlNodeList) where.getOperandList().get(1)).stream()
.map(this::parseValue).distinct().toArray()
);
} else {
return new QueryPredicateBinary(
Expand Down Expand Up @@ -143,7 +151,7 @@ private QueryPredicate parseWhere(SqlBasicCall where) {
parseValue(where.getOperandList().get(2))
);
}
throw new UnsupportedOperationException(unsupportedQueryType);
throw new UnsupportedOperationException(UNSUPPORTED_QUERY_TYPE_MESSAGE);
}

private Object parseValue(SqlNode sqlNode) {
Expand All @@ -161,7 +169,7 @@ private Object parseValue(SqlNode sqlNode) {
} else if (sqlNode instanceof SqlIdentifier) {
return unwrapString(sqlNode.toString());
}
throw new UnsupportedOperationException(unsupportedQueryType);
throw new UnsupportedOperationException(UNSUPPORTED_QUERY_TYPE_MESSAGE);
}

private Object getNumeric(BigDecimal bd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ protected Exp getValueExp(Object value) {
}

protected Exp buildLeftExp() {
return binName.equals(Constants.defaultKeyName) ? Exp.key(valueType) : Exp.bin(binName, valueType);
return binName.equals(Constants.PRIMARY_KEY_COLUMN_NAME)
? Exp.key(valueType)
: Exp.bin(binName, valueType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.Collections;
import java.util.Optional;

import static com.aerospike.jdbc.util.Constants.defaultKeyName;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public class QueryPredicateBinary extends QueryPredicateBase {

Expand Down Expand Up @@ -39,7 +39,7 @@ public Optional<Filter> toFilter(String binName) {

@Override
public Collection<Object> getPrimaryKeys() {
if (binName.equals(defaultKeyName) && operator == OperatorBinary.EQ) {
if (binName.equals(PRIMARY_KEY_COLUMN_NAME) && operator == OperatorBinary.EQ) {
return Collections.singletonList(value);
}
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.List;
import java.util.Optional;

import static com.aerospike.jdbc.util.Constants.defaultKeyName;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public class QueryPredicateIsNotNull implements QueryPredicate {

Expand All @@ -19,7 +19,9 @@ public QueryPredicateIsNotNull(String binName) {

@Override
public Exp toFilterExpression() {
return binName.equals(defaultKeyName) ? Exp.keyExists() : Exp.binExists(binName);
return binName.equals(PRIMARY_KEY_COLUMN_NAME)
? Exp.keyExists()
: Exp.binExists(binName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.List;
import java.util.Optional;

import static com.aerospike.jdbc.util.Constants.defaultKeyName;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public class QueryPredicateIsNull implements QueryPredicate {

Expand All @@ -20,7 +20,9 @@ public QueryPredicateIsNull(String binName) {
@Override
public Exp toFilterExpression() {
return Exp.not(
binName.equals(defaultKeyName) ? Exp.keyExists() : Exp.binExists(binName)
binName.equals(PRIMARY_KEY_COLUMN_NAME)
? Exp.keyExists()
: Exp.binExists(binName)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Collections;
import java.util.Optional;

import static com.aerospike.jdbc.util.Constants.defaultKeyName;
import static com.aerospike.jdbc.util.Constants.PRIMARY_KEY_COLUMN_NAME;

public class QueryPredicateList extends QueryPredicateBase {

Expand All @@ -24,7 +24,9 @@ public QueryPredicateList(String binName, Operator operator, Object[] values) {
@Override
public Exp toFilterExpression() {
return operator.exp(
Arrays.stream(values).map(v -> Exp.eq(buildLeftExp(), getValueExp(v))).toArray(Exp[]::new)
Arrays.stream(values)
.map(v -> Exp.eq(buildLeftExp(), getValueExp(v)))
.toArray(Exp[]::new)
);
}

Expand All @@ -40,7 +42,7 @@ public boolean isIndexable() {

@Override
public Collection<Object> getPrimaryKeys() {
if (binName.equals(defaultKeyName)) {
if (binName.equals(PRIMARY_KEY_COLUMN_NAME)) {
return Arrays.asList(values);
}
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ public Pair<ResultSet, Integer> execute(AerospikeQuery query) {
RecordSetRecordSequenceListener listener = new RecordSetRecordSequenceListener(config.getDriverPolicy());
ScanPolicy scanPolicy = policyBuilder.buildScanPolicy(query);
scanPolicy.includeBinData = false;

client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getSetName());

final WritePolicy deletePolicy = policyBuilder.buildDeleteWritePolicy();
final AtomicInteger count = new AtomicInteger();
listener.getRecordSet().forEach(r -> {
try {
if (client.delete(writePolicy, r.key))
if (client.delete(deletePolicy, r.key))
count.incrementAndGet();
} catch (Exception e) {
logger.warning("Failed to delete record: " + e.getMessage());
Expand Down
Loading

0 comments on commit 0716874

Please sign in to comment.