Skip to content

Commit

Permalink
Implement support for multiple DB tenants in SQL mode (#3444)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnsrnhld authored May 17, 2024
1 parent 3d4b59d commit 897ba54
Show file tree
Hide file tree
Showing 26 changed files with 310 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,30 @@
import com.bakdata.conquery.mode.ManagerProvider;
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
import com.bakdata.conquery.sql.DslContextFactory;
import com.bakdata.conquery.sql.SqlContext;
import com.bakdata.conquery.sql.conversion.dialect.HanaSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.PostgreSqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialectFactory;
import io.dropwizard.core.setup.Environment;
import org.jooq.DSLContext;

public class LocalManagerProvider implements ManagerProvider {

private static final Supplier<Collection<ShardNodeInformation>> EMPTY_NODE_PROVIDER = Collections::emptyList;

public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Environment environment) {
private final SqlDialectFactory dialectFactory;

InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
public LocalManagerProvider() {
this.dialectFactory = new SqlDialectFactory();
}

SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig();
DSLContext dslContext = DslContextFactory.create(sqlConnectorConfig);
SqlDialect sqlDialect = createSqlDialect(sqlConnectorConfig, dslContext);
SqlContext sqlContext = new SqlContext(sqlConnectorConfig, sqlDialect);
public LocalManagerProvider(SqlDialectFactory dialectFactory) {
this.dialectFactory = dialectFactory;
}

SqlExecutionService sqlExecutionService = new SqlExecutionService(
sqlDialect.getDSLContext(),
ResultSetProcessorFactory.create(sqlDialect)
);
public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Environment environment) {

NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, sqlContext, sqlExecutionService);
InternalObjectMapperCreator creator = ManagerProvider.newInternalObjectMapperCreator(config, environment.getValidator());
NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, creator, dialectFactory);
DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createLocalDatasetRegistry(namespaceHandler, config, creator);
creator.init(datasetRegistry);

Expand All @@ -59,10 +50,4 @@ public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Env
);
}

protected SqlDialect createSqlDialect(SqlConnectorConfig sqlConnectorConfig, DSLContext dslContext) {
return switch (sqlConnectorConfig.getDialect()) {
case POSTGRESQL -> new PostgreSqlDialect(dslContext);
case HANA -> new HanaSqlDialect(dslContext);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,57 @@
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.mode.NamespaceSetupData;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.sql.SqlContext;
import com.bakdata.conquery.sql.DSLContextWrapper;
import com.bakdata.conquery.sql.DslContextFactory;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.conversion.SqlConverter;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialectFactory;
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionResult;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;

@RequiredArgsConstructor
@Slf4j
public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {

private final ConqueryConfig config;
private final InternalObjectMapperCreator mapperCreator;
private final SqlContext sqlContext;
private final SqlExecutionService sqlExecutionService;
private final SqlDialectFactory dialectFactory;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, IndexService indexService) {

NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, mapperCreator, indexService);
ExecutionManager executionManager = new SqlExecutionManager(sqlContext, sqlExecutionService, metaStorage);

SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig();
DatabaseConfig databaseConfig = sqlConnectorConfig.getDatabaseConfig(namespaceStorage.getDataset());

DSLContextWrapper dslContextWrapper = DslContextFactory.create(databaseConfig, sqlConnectorConfig);
DSLContext dslContext = dslContextWrapper.getDslContext();
SqlDialect sqlDialect = dialectFactory.createSqlDialect(databaseConfig.getDialect());

SqlConverter sqlConverter = new SqlConverter(sqlDialect, dslContext, databaseConfig);
SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, ResultSetProcessorFactory.create(sqlDialect));
ExecutionManager<SqlExecutionResult> executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage);
SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService);

return new LocalNamespace(
namespaceData.getPreprocessMapper(),
namespaceData.getCommunicationMapper(),
namespaceStorage,
executionManager,
sqlExecutionService,
dslContextWrapper,
sqlStorageHandler,
namespaceData.getJobManager(),
namespaceData.getFilterSearch(),
namespaceData.getIndexService(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.bakdata.conquery.models.config;

import lombok.Builder;
import lombok.Data;

@Data
@Builder
public class DatabaseConfig {

private static final String DEFAULT_PRIMARY_COLUMN = "pid";

private Dialect dialect;

private String databaseUsername;

private String databasePassword;

private String jdbcConnectionUrl;

@Builder.Default
private String primaryColumn = DEFAULT_PRIMARY_COLUMN;

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.bakdata.conquery.models.config;

import java.util.Map;

import com.bakdata.conquery.models.datasets.Dataset;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Data
Expand All @@ -11,22 +16,21 @@
@AllArgsConstructor
public class SqlConnectorConfig {

public static final String DEFAULT_PRIMARY_COLUMN = "pid";

boolean enabled;

private Dialect dialect;

/**
* Determines if generated SQL should be formatted.
*/
private boolean withPrettyPrinting;

private String databaseUsername;

private String databasePassword;
/**
* Keys must match the name of existing {@link Dataset}s.
*/
@Getter(AccessLevel.PRIVATE)
private Map<String, DatabaseConfig> databaseConfigs;

private String jdbcConnectionUrl;
public DatabaseConfig getDatabaseConfig(Dataset dataset) {
return databaseConfigs.get(dataset.getName());
}

private String primaryColumn = DEFAULT_PRIMARY_COLUMN;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.models.worker;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -13,31 +14,34 @@
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.query.FilterSearch;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.bakdata.conquery.sql.DSLContextWrapper;
import com.bakdata.conquery.sql.execution.SqlExecutionResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Getter
@Slf4j
public class LocalNamespace extends Namespace {

private final SqlExecutionService sqlExecutionService;

private final DSLContextWrapper dslContextWrapper;
private final SqlStorageHandler storageHandler;

public LocalNamespace(
ObjectMapper preprocessMapper,
ObjectMapper communicationMapper,
NamespaceStorage storage,
ExecutionManager executionManager,
SqlExecutionService sqlExecutionService,
ExecutionManager<SqlExecutionResult> executionManager,
DSLContextWrapper dslContextWrapper,
SqlStorageHandler storageHandler,
JobManager jobManager,
FilterSearch filterSearch,
IndexService indexService,
List<Injectable> injectables
) {
super(preprocessMapper, communicationMapper, storage, executionManager, jobManager, filterSearch, indexService, injectables);
this.sqlExecutionService = sqlExecutionService;
this.storageHandler = new SqlStorageHandler(sqlExecutionService);
this.dslContextWrapper = dslContextWrapper;
this.storageHandler = storageHandler;
}

@Override
Expand All @@ -52,4 +56,26 @@ void registerColumnValuesInSearch(Set<Column> columns) {
getFilterSearch().registerValues(column, stringStream.collect(Collectors.toSet()));
}
}

@Override
public void close() {
closeDslContextWrapper();
super.close();
}

@Override
public void remove() {
closeDslContextWrapper();
super.remove();
}

private void closeDslContextWrapper() {
try {
dslContextWrapper.close();
}
catch (IOException e) {
log.warn("Could not close namespace's {} DSLContext/Datasource directly", getDataset().getId(), e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.bakdata.conquery.sql;

import java.io.Closeable;
import java.io.IOException;

import com.zaxxer.hikari.HikariDataSource;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;

/**
* Provides access to a configured {@link DSLContext} and enables closing the underlying connection pool.
*/
@RequiredArgsConstructor
public class DSLContextWrapper implements Closeable {

@Getter
private final DSLContext dslContext;

private final HikariDataSource dataSource;

@Override
public void close() throws IOException {
// Hikari opens a connection pool under the hood which we won't be able to close after passing it to the DSLContext.
// That's why we keep the HikariDataSource reference.
dataSource.close();
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.bakdata.conquery.sql;

import javax.sql.DataSource;

import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -12,25 +11,28 @@

public class DslContextFactory {

public static DSLContext create(SqlConnectorConfig config) {
public static DSLContextWrapper create(DatabaseConfig config, SqlConnectorConfig connectorConfig) {

HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(config.getJdbcConnectionUrl());
hikariConfig.setUsername(config.getDatabaseUsername());
hikariConfig.setPassword(config.getDatabasePassword());

DataSource dataSource = new HikariDataSource(hikariConfig);
HikariDataSource hikariDataSource = new HikariDataSource(hikariConfig);

Settings settings = new Settings()
.withRenderFormatted(config.isWithPrettyPrinting())
.withRenderFormatted(connectorConfig.isWithPrettyPrinting())
// enforces all identifiers to be quoted if not explicitly unquoted via DSL.unquotedName()
// to prevent any lowercase/uppercase SQL dialect specific identifier naming issues
.withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_QUOTED);

return DSL.using(
dataSource,
DSLContext dslContext = DSL.using(
hikariDataSource,
config.getDialect().getJooqDialect(),
settings
);

return new DSLContextWrapper(dslContext, hikariDataSource);
}

}
11 changes: 0 additions & 11 deletions backend/src/main/java/com/bakdata/conquery/sql/SqlContext.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.query.ManagedQuery;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.sql.SqlContext;
import com.bakdata.conquery.sql.conversion.SqlConverter;
import com.bakdata.conquery.sql.conversion.model.SqlQuery;
import com.bakdata.conquery.sql.execution.SqlExecutionResult;
Expand All @@ -29,10 +28,10 @@ public class SqlExecutionManager extends ExecutionManager<SqlExecutionResult> {
private final SqlConverter converter;
private final Map<ManagedExecutionId, CompletableFuture<Void>> runningExecutions;

public SqlExecutionManager(final SqlContext context, SqlExecutionService sqlExecutionService, MetaStorage storage) {
public SqlExecutionManager(SqlConverter sqlConverter, SqlExecutionService sqlExecutionService, MetaStorage storage) {
super(storage);
this.converter = sqlConverter;
this.executionService = sqlExecutionService;
this.converter = new SqlConverter(context.getSqlDialect(), context.getConfig());
this.runningExecutions = new HashMap<>();
}

Expand Down
Loading

0 comments on commit 897ba54

Please sign in to comment.