Skip to content

Commit

Permalink
Replace ActiveJDBC with JDBI for QueryHistory
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 1, 2024
1 parent 258e7ff commit a316292
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 107 deletions.
12 changes: 12 additions & 0 deletions gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,18 @@
<version>${dep.activejdbc.version}</version>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
<version>${dep.jdbi.version}</version>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-sqlobject</artifactId>
<version>${dep.jdbi.version}</version>
</dependency>

<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-rules-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.trino.gateway.proxyserver.ProxyHandler;
import io.trino.gateway.proxyserver.ProxyServer;
import io.trino.gateway.proxyserver.ProxyServerConfiguration;
import org.jdbi.v3.core.Jdbi;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -79,10 +80,11 @@ public class HaGatewayProviderModule
public HaGatewayProviderModule(HaGatewayConfiguration configuration, Environment environment)
{
super(configuration, environment);
connectionManager = new JdbcConnectionManager(configuration.getDataStore());
Jdbi jdbi = Jdbi.create(configuration.getDataStore().getJdbcUrl(), configuration.getDataStore().getUser(), configuration.getDataStore().getPassword());
connectionManager = new JdbcConnectionManager(jdbi, configuration.getDataStore());
resourceGroupsManager = new HaResourceGroupsManager(connectionManager);
gatewayBackendManager = new HaGatewayManager(connectionManager);
queryHistoryManager = new HaQueryHistoryManager(connectionManager);
queryHistoryManager = new HaQueryHistoryManager(jdbi);
routingManager =
new HaRoutingManager(gatewayBackendManager, queryHistoryManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,43 @@
package io.trino.gateway.ha.persistence;

import io.trino.gateway.ha.config.DataStoreConfiguration;
import io.trino.gateway.ha.persistence.dao.QueryHistory;
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
import jakarta.annotation.Nullable;
import org.javalite.activejdbc.Base;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;

public class JdbcConnectionManager
{
private static final Logger log = LoggerFactory.getLogger(JdbcConnectionManager.class);

private final Jdbi jdbi;
private final DataStoreConfiguration configuration;
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();

public JdbcConnectionManager(DataStoreConfiguration configuration)
public JdbcConnectionManager(Jdbi jdbi, DataStoreConfiguration configuration)
{
this.jdbi = requireNonNull(jdbi, "jdbi is null")
.installPlugin(new SqlObjectPlugin())
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
this.configuration = configuration;
startCleanUps();
}

public Jdbi getJdbi()
{
return jdbi;
}

public void open()
{
this.open(null);
Expand Down Expand Up @@ -68,16 +82,8 @@ private void startCleanUps()
executorService.scheduleWithFixedDelay(
() -> {
log.info("Performing query history cleanup task");
try {
this.open();
QueryHistory.delete(
"created < ?",
System.currentTimeMillis() - TimeUnit.HOURS.toMillis(
this.configuration.getQueryHistoryHoursRetention()));
}
finally {
this.close();
}
long created = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
jdbi.onDemand(QueryHistoryDao.class).deleteOldHistory(created);
},
1,
120,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.persistence;

import org.jdbi.v3.core.config.ConfigRegistry;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.mapper.RowMapperFactory;
import org.jdbi.v3.core.mapper.reflect.ConstructorMapper;
import org.jdbi.v3.core.mapper.reflect.JdbiConstructor;

import java.lang.reflect.Type;
import java.util.Optional;
import java.util.stream.Stream;

public class RecordAndAnnotatedConstructorMapper
implements RowMapperFactory
{
@Override
public Optional<RowMapper<?>> build(Type type, ConfigRegistry config)
{
if ((type instanceof Class<?> clazz) && (clazz.isRecord() || hasJdbiConstructorMethod(clazz))) {
return ConstructorMapper.factory(clazz).build(type, config);
}
return Optional.empty();
}

private boolean hasJdbiConstructorMethod(Class<?> clazz)
{
return Stream.of(clazz.getConstructors()).anyMatch(constructor -> (constructor.getAnnotation(JdbiConstructor.class) != null))
|| Stream.of(clazz.getMethods()).anyMatch(method -> (method.getAnnotation(JdbiConstructor.class) != null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,27 @@
*/
package io.trino.gateway.ha.persistence.dao;

import io.trino.gateway.ha.router.QueryHistoryManager;
import org.javalite.activejdbc.Model;
import org.javalite.activejdbc.annotations.Cached;
import org.javalite.activejdbc.annotations.IdName;
import org.javalite.activejdbc.annotations.Table;
import org.jdbi.v3.core.mapper.reflect.ColumnName;
import org.jdbi.v3.core.mapper.reflect.JdbiConstructor;

import java.util.ArrayList;
import java.util.List;
import static java.util.Objects.requireNonNull;

@IdName("query_id")
@Table("query_history")
@Cached
public class QueryHistory
extends Model
public record QueryHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created)
{
private static final String queryId = "query_id";
private static final String queryText = "query_text";
private static final String backendUrl = "backend_url";
private static final String userName = "user_name";
private static final String source = "source";
private static final String created = "created";

public static List<QueryHistoryManager.QueryDetail> upcast(List<QueryHistory> queryHistoryList)
@JdbiConstructor
public QueryHistory(
@ColumnName("query_id") String queryId,
@ColumnName("query_text") String queryText,
@ColumnName("backend_url") String backendUrl,
@ColumnName("user_name") String userName,
@ColumnName("source") String source,
@ColumnName("created") long created)
{
List<QueryHistoryManager.QueryDetail> queryDetails = new ArrayList<>();
for (QueryHistory dao : queryHistoryList) {
QueryHistoryManager.QueryDetail queryDetail = new QueryHistoryManager.QueryDetail();
queryDetail.setQueryId(dao.getString(queryId));
queryDetail.setQueryText(dao.getString(queryText));
queryDetail.setCaptureTime(dao.getLong(created));
queryDetail.setBackendUrl(dao.getString(backendUrl));
queryDetail.setUser(dao.getString(userName));
queryDetail.setSource(dao.getString(source));
queryDetails.add(queryDetail);
}
return queryDetails;
}

public static void create(QueryHistory model, QueryHistoryManager.QueryDetail queryDetail)
{
//Checks
String id = queryDetail.getQueryId();
if (id == null || id.isEmpty()) {
return;
}

model.set(queryId, id);
model.set(queryText, queryDetail.getQueryText());
model.set(backendUrl, queryDetail.getBackendUrl());
model.set(userName, queryDetail.getUser());
model.set(source, queryDetail.getSource());
model.set(created, queryDetail.getCaptureTime());
if (!queryDetail.getQueryId().isEmpty()) {
model.insert();
}
this.queryId = requireNonNull(queryId, "queryId is null");
this.queryText = requireNonNull(queryText, "queryText is null");
this.backendUrl = requireNonNull(backendUrl, "backendUrl is null");
this.userName = requireNonNull(userName, "userName is null");
this.source = requireNonNull(source, "source is null");
this.created = created;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.persistence.dao;

import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

import java.util.List;

public interface QueryHistoryDao
{
@SqlQuery("""
SELECT * FROM query_history
ORDER BY created DESC
LIMIT 2000
""")
List<QueryHistory> findRecentQueries();

@SqlQuery("""
SELECT * FROM query_history
WHERE user_name = :userName
ORDER BY created DESC
LIMIT 2000
""")
List<QueryHistory> findRecentQueriesByUserName(String userName);

@SqlQuery("""
SELECT backend_url FROM query_history
WHERE query_id = :queryId
""")
String findBackendUrlByQueryId(String queryId);

@SqlUpdate("""
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created)
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created)
""")
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created);

@SqlUpdate("""
DELETE FROM query_history
WHERE created < :created
""")
void deleteOldHistory(long created);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,68 +13,75 @@
*/
package io.trino.gateway.ha.router;

import io.trino.gateway.ha.persistence.JdbcConnectionManager;
import io.trino.gateway.ha.persistence.dao.QueryHistory;
import io.trino.gateway.ha.persistence.dao.QueryHistoryDao;
import org.jdbi.v3.core.Jdbi;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class HaQueryHistoryManager
implements QueryHistoryManager
{
private final JdbcConnectionManager connectionManager;
private final QueryHistoryDao dao;

public HaQueryHistoryManager(JdbcConnectionManager connectionManager)
public HaQueryHistoryManager(Jdbi jdbi)
{
this.connectionManager = connectionManager;
dao = requireNonNull(jdbi, "jdbi is null").onDemand(QueryHistoryDao.class);
}

@Override
public void submitQueryDetail(QueryDetail queryDetail)
{
try {
connectionManager.open();
QueryHistory dao = new QueryHistory();
QueryHistory.create(dao, queryDetail);
}
finally {
connectionManager.close();
String id = queryDetail.getQueryId();
if (id == null || id.isEmpty()) {
return;
}

dao.insertHistory(
queryDetail.getQueryId(),
queryDetail.getQueryText(),
queryDetail.getBackendUrl(),
queryDetail.getUser(),
queryDetail.getSource(),
queryDetail.getCaptureTime());
}

@Override
public List<QueryDetail> fetchQueryHistory(Optional<String> user)
{
try {
connectionManager.open();
String sql = "select * from query_history";
if (user.isPresent()) {
sql += " where user_name = '" + user.orElseThrow() + "'";
}
return QueryHistory.upcast(QueryHistory.findBySQL(String.join(" ",
sql,
"order by created desc",
"limit 2000")));
List<QueryHistory> histories;
if (user.isPresent()) {
histories = dao.findRecentQueriesByUserName(user.orElseThrow());
}
finally {
connectionManager.close();
else {
histories = dao.findRecentQueries();
}
return upcast(histories);
}

private static List<QueryHistoryManager.QueryDetail> upcast(List<QueryHistory> queryHistoryList)
{
List<QueryHistoryManager.QueryDetail> queryDetails = new ArrayList<>();
for (QueryHistory dao : queryHistoryList) {
QueryHistoryManager.QueryDetail queryDetail = new QueryHistoryManager.QueryDetail();
queryDetail.setQueryId(dao.queryId());
queryDetail.setQueryText(dao.queryText());
queryDetail.setCaptureTime(dao.created());
queryDetail.setBackendUrl(dao.backendUrl());
queryDetail.setUser(dao.userName());
queryDetail.setSource(dao.source());
queryDetails.add(queryDetail);
}
return queryDetails;
}

@Override
public String getBackendForQueryId(String queryId)
{
String backend = null;
try {
connectionManager.open();
QueryHistory queryHistory = QueryHistory.findById(queryId);
if (queryHistory != null) {
backend = queryHistory.get("backend_url").toString();
}
}
finally {
connectionManager.close();
}
return backend;
return dao.findBackendUrlByQueryId(queryId);
}
}
Loading

0 comments on commit a316292

Please sign in to comment.