diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java index 1c5e41e45d4..dcba2d1570f 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/DataPlaneDefaultServicesExtension.java @@ -16,8 +16,10 @@ import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl; import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy; +import org.eclipse.edc.connector.dataplane.framework.store.InMemoryAccessTokenDataStore; import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore; import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; @@ -32,18 +34,16 @@ public class DataPlaneDefaultServicesExtension implements ServiceExtension { public static final String NAME = "Data Plane Framework Default Services"; + @Inject + private Clock clock; + @Inject + private CriterionOperatorRegistry criterionOperatorRegistry; @Override public String name() { return NAME; } - @Inject - private Clock clock; - - @Inject - private CriterionOperatorRegistry criterionOperatorRegistry; - @Provider(isDefault = true) public TransferServiceSelectionStrategy transferServiceSelectionStrategy() { return TransferServiceSelectionStrategy.selectFirst(); @@ -54,6 +54,11 @@ public DataPlaneStore dataPlaneStore() { return new InMemoryDataPlaneStore(clock, criterionOperatorRegistry); } + @Provider(isDefault = true) + public AccessTokenDataStore defaultAccessTokenDataStore() { + return new InMemoryAccessTokenDataStore(criterionOperatorRegistry); + } + @Provider(isDefault = true) public PipelineService pipelineService(ServiceExtensionContext context) { return new PipelineServiceImpl(context.getMonitor()); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryAccessTokenDataStore.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryAccessTokenDataStore.java new file mode 100644 index 00000000000..454c1917e0e --- /dev/null +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryAccessTokenDataStore.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.framework.store; + +import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver; +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.spi.query.CriterionOperatorRegistry; +import org.eclipse.edc.spi.query.QueryResolver; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreResult; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * In-mem implementation of the {@link AccessTokenDataStore} based on a {@link ConcurrentHashMap}. + */ +public class InMemoryAccessTokenDataStore implements AccessTokenDataStore { + private final Map store = new ConcurrentHashMap<>(); + private final QueryResolver queryResolver; + + public InMemoryAccessTokenDataStore(CriterionOperatorRegistry operatorRegistry) { + this.queryResolver = new ReflectionBasedQueryResolver<>(AccessTokenData.class, operatorRegistry); + } + + @Override + public AccessTokenData getById(String id) { + return store.get(id); + } + + @Override + public StoreResult store(AccessTokenData accessTokenData) { + + var prev = store.putIfAbsent(accessTokenData.id(), accessTokenData); + return Optional.ofNullable(prev) + .map(a -> StoreResult.alreadyExists(OBJECT_EXISTS.formatted(accessTokenData.id()))) + .orElse(StoreResult.success()); + } + + @Override + public StoreResult deleteById(String id) { + var prev = store.remove(id); + return Optional.ofNullable(prev) + .map(p -> StoreResult.success()) + .orElse(StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id))); + } + + @Override + public Collection query(QuerySpec querySpec) { + return queryResolver.query(store.values().stream(), querySpec).toList(); + } + +} diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryAccessTokenDataStoreTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryAccessTokenDataStoreTest.java new file mode 100644 index 00000000000..098b672f2d2 --- /dev/null +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/store/InMemoryAccessTokenDataStoreTest.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.framework.store; + +import org.eclipse.edc.connector.core.store.CriterionOperatorRegistryImpl; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataTestBase; + +class InMemoryAccessTokenDataStoreTest extends AccessTokenDataTestBase { + private final InMemoryAccessTokenDataStore store = new InMemoryAccessTokenDataStore(CriterionOperatorRegistryImpl.ofDefaults()); + + @Override + protected AccessTokenDataStore getStore() { + return store; + } +} \ No newline at end of file diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/build.gradle.kts b/extensions/data-plane/store/sql/accesstokendata-store-sql/build.gradle.kts new file mode 100644 index 00000000000..576fa0302fe --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/build.gradle.kts @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` + `maven-publish` +} + +dependencies { + api(project(":spi:common:transaction-spi")) + api(project(":spi:data-plane:data-plane-spi")) + + implementation(project(":spi:common:transaction-datasource-spi")) + implementation(project(":extensions:common:sql:sql-core")) + + testImplementation(project(":core:common:junit")) + testImplementation(testFixtures(project(":spi:data-plane:data-plane-spi"))) + testImplementation(testFixtures(project(":extensions:common:sql:sql-lease"))) + testImplementation(testFixtures(project(":extensions:common:sql:sql-core"))) + +} + + diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql new file mode 100644 index 00000000000..707a0f42082 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +-- Statements are designed for and tested with Postgres only! + +CREATE TABLE IF NOT EXISTS edc_accesstokendata +( + id VARCHAR NOT NULL PRIMARY KEY, + claim_token json NOT NULL, + data_address JSON NOT NULL +); + +COMMENT ON COLUMN edc_accesstokendata.claim_token IS 'ClaimToken serialized as JSON map'; +COMMENT ON COLUMN edc_accesstokendata.data_address IS 'DataAddress serialized as JSON map'; diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java new file mode 100644 index 00000000000..030d348b340 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; +import org.eclipse.edc.connector.dataplane.store.sql.schema.AccessTokenDataStatements; +import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.jetbrains.annotations.Nullable; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; + +import static org.eclipse.edc.spi.query.Criterion.criterion; + +/** + * SQL implementation of {@link DataPlaneStore} + */ +public class SqlAccessTokenDataStore extends AbstractSqlStore implements AccessTokenDataStore { + + private final AccessTokenDataStatements statements; + + public SqlAccessTokenDataStore(DataSourceRegistry dataSourceRegistry, + String dataSourceName, + TransactionContext transactionContext, + AccessTokenDataStatements statements, + ObjectMapper objectMapper, + QueryExecutor queryExecutor) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor); + this.statements = statements; + } + + @Override + public AccessTokenData getById(String id) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + return findByIdInternal(connection, id); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public StoreResult store(AccessTokenData accessTokenData) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + if (findByIdInternal(connection, accessTokenData.id()) != null) { + return StoreResult.alreadyExists(OBJECT_EXISTS.formatted(accessTokenData.id())); + } + insert(connection, accessTokenData); + return StoreResult.success(); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public StoreResult deleteById(String id) { + return transactionContext.execute(() -> { + try (var connection = getConnection()) { + if (findByIdInternal(connection, id) == null) { + return StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id)); + } + var sql = statements.getDeleteTemplate(); + queryExecutor.execute(connection, sql, id); + return StoreResult.success(); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + @Override + public Collection query(QuerySpec querySpec) { + return transactionContext.execute(() -> { + try (var conn = getConnection()) { + var sql = statements.createQuery(querySpec); + return queryExecutor.query(conn, true, this::mapAccessTokenData, sql.getQueryAsString(), sql.getParameters()).toList(); + } catch (SQLException e) { + throw new EdcPersistenceException(e); + } + }); + } + + private void insert(Connection connection, AccessTokenData dataFlow) { + var sql = statements.getInsertTemplate(); + queryExecutor.execute(connection, sql, + dataFlow.id(), + toJson(dataFlow.claimToken()), + toJson(dataFlow.dataAddress()) + ); + } + + + private AccessTokenData mapAccessTokenData(ResultSet resultSet) throws SQLException { + var claimToken = fromJson(resultSet.getString(statements.getClaimTokenColumn()), ClaimToken.class); + var dataAddress = fromJson(resultSet.getString(statements.getDataAddressColumn()), DataAddress.class); + var id = resultSet.getString(statements.getIdColumn()); + + return new AccessTokenData(id, claimToken, dataAddress); + } + + private @Nullable AccessTokenData findByIdInternal(Connection conn, String id) { + return transactionContext.execute(() -> { + var querySpec = QuerySpec.Builder.newInstance().filter(criterion("id", "=", id)).build(); + var statement = statements.createQuery(querySpec); + return queryExecutor.query(conn, true, this::mapAccessTokenData, statement.getQueryAsString(), statement.getParameters()) + .findFirst().orElse(null); + }); + } + +} diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStoreExtension.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStoreExtension.java new file mode 100644 index 00000000000..873879efd8c --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStoreExtension.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql; + +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.connector.dataplane.store.sql.schema.AccessTokenDataStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresAccessTokenDataStatements; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Provider; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; + +import java.time.Clock; + +/** + * Provides Sql Store for {@link AccessTokenData} objects + */ +@Extension(value = SqlAccessTokenDataStoreExtension.NAME) +public class SqlAccessTokenDataStoreExtension implements ServiceExtension { + + public static final String NAME = "Sql AccessTokenData Store"; + + @Setting(value = "Name of the datasource to use for accessing data plane store") + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.accesstokendata.name"; + + @Inject + private DataSourceRegistry dataSourceRegistry; + + @Inject + private TransactionContext transactionContext; + + @Inject(required = false) + private AccessTokenDataStatements statements; + + @Inject + private Clock clock; + + @Inject + private TypeManager typeManager; + + @Inject + private QueryExecutor queryExecutor; + + @Override + public String name() { + return NAME; + } + + @Provider + public AccessTokenDataStore dataPlaneStore(ServiceExtensionContext context) { + return new SqlAccessTokenDataStore(dataSourceRegistry, getDataSourceName(context), transactionContext, + getStatementImpl(), typeManager.getMapper(), queryExecutor); + } + + /** + * returns an externally-provided sql statement dialect, or postgres as a default + */ + private AccessTokenDataStatements getStatementImpl() { + return statements != null ? statements : new PostgresAccessTokenDataStatements(); + } + + private String getDataSourceName(ServiceExtensionContext context) { + return context.getConfig().getString(DATASOURCE_SETTING_NAME, DataSourceRegistry.DEFAULT_DATASOURCE); + } +} diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java new file mode 100644 index 00000000000..2f7bfc6f2e6 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql.schema; + +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.statement.SqlStatements; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +/** + * Sql Statements for DataPlane Store + */ +public interface AccessTokenDataStatements extends SqlStatements { + + default String getIdColumn() { + return "id"; + } + + default String getTableName() { + return "edc_accesstokendata"; + } + + default String getClaimTokenColumn() { + return "claim_token"; + } + + default String getDataAddressColumn() { + return "data_address"; + } + + String getInsertTemplate(); + + String getSelectTemplate(); + + String getDeleteTemplate(); + + SqlQueryStatement createQuery(QuerySpec querySpec); + +} + diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java new file mode 100644 index 00000000000..88139d33799 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql.schema; + +import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.AccessTokenDataMapping; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.sql.translation.SqlOperatorTranslator; +import org.eclipse.edc.sql.translation.SqlQueryStatement; + +public class BaseSqlAccessTokenStatements implements AccessTokenDataStatements { + + protected final SqlOperatorTranslator operatorTranslator; + + public BaseSqlAccessTokenStatements(SqlOperatorTranslator operatorTranslator) { + this.operatorTranslator = operatorTranslator; + } + + @Override + public String getInsertTemplate() { + return executeStatement() + .column(getIdColumn()) + .jsonColumn(getClaimTokenColumn()) + .jsonColumn(getDataAddressColumn()) + .insertInto(getTableName()); + } + + @Override + public String getSelectTemplate() { + return "SELECT * FROM %s".formatted(getTableName()); + } + + @Override + public String getDeleteTemplate() { + return executeStatement() + .delete(getTableName(), getIdColumn()); + } + + @Override + public SqlQueryStatement createQuery(QuerySpec querySpec) { + return new SqlQueryStatement(getSelectTemplate(), querySpec, new AccessTokenDataMapping(this), operatorTranslator); + } +} diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java new file mode 100644 index 00000000000..3741953dce7 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql.schema.postgres; + +import org.eclipse.edc.connector.dataplane.spi.DataFlow; +import org.eclipse.edc.connector.dataplane.store.sql.schema.AccessTokenDataStatements; +import org.eclipse.edc.sql.translation.JsonFieldTranslator; +import org.eclipse.edc.sql.translation.TranslationMapping; + +/** + * Maps fields of a {@link DataFlow} onto the + * corresponding SQL schema (= column names) enabling access through Postgres JSON operators where applicable + */ +public class AccessTokenDataMapping extends TranslationMapping { + + public AccessTokenDataMapping(AccessTokenDataStatements statements) { + add("id", statements.getIdColumn()); + add("claimToken", new JsonFieldTranslator(statements.getClaimTokenColumn())); + add("dataAddress", new JsonFieldTranslator(statements.getDataAddressColumn())); + } +} diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresAccessTokenDataStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresAccessTokenDataStatements.java new file mode 100644 index 00000000000..5d00f7473b2 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/PostgresAccessTokenDataStatements.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql.schema.postgres; + +import org.eclipse.edc.connector.dataplane.store.sql.schema.BaseSqlAccessTokenStatements; +import org.eclipse.edc.sql.dialect.PostgresDialect; +import org.eclipse.edc.sql.translation.PostgresqlOperatorTranslator; + +public class PostgresAccessTokenDataStatements extends BaseSqlAccessTokenStatements { + + public PostgresAccessTokenDataStatements() { + super(new PostgresqlOperatorTranslator()); + } + + @Override + public String getFormatAsJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } +} diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..c87b2fa29d4 --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,16 @@ +# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.edc.connector.dataplane.store.sql.SqlAccessTokenDataStoreExtension + diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStoreTest.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStoreTest.java new file mode 100644 index 00000000000..e3b82a7f71a --- /dev/null +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/test/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStoreTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataTestBase; +import org.eclipse.edc.connector.dataplane.store.sql.schema.BaseSqlAccessTokenStatements; +import org.eclipse.edc.connector.dataplane.store.sql.schema.postgres.PostgresAccessTokenDataStatements; +import org.eclipse.edc.junit.annotations.ComponentTest; +import org.eclipse.edc.policy.model.PolicyRegistrationTypes; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.sql.QueryExecutor; +import org.eclipse.edc.sql.testfixtures.PostgresqlStoreSetupExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@ComponentTest +@ExtendWith(PostgresqlStoreSetupExtension.class) +class SqlAccessTokenDataStoreTest extends AccessTokenDataTestBase { + + private final BaseSqlAccessTokenStatements sqlStatements = new PostgresAccessTokenDataStatements(); + private SqlAccessTokenDataStore sqlStore; + + @BeforeEach + void setup(PostgresqlStoreSetupExtension setupExtension, QueryExecutor queryExecutor) throws IOException { + var typeManager = new TypeManager(); + typeManager.registerTypes(PolicyRegistrationTypes.TYPES.toArray(Class[]::new)); + + sqlStore = new SqlAccessTokenDataStore(setupExtension.getDataSourceRegistry(), setupExtension.getDatasourceName(), + setupExtension.getTransactionContext(), sqlStatements, new ObjectMapper(), queryExecutor); + + var schema = Files.readString(Paths.get("docs/schema.sql")); + setupExtension.runQuery(schema); + } + + @AfterEach + void tearDown(PostgresqlStoreSetupExtension setupExtension) { + setupExtension.runQuery("DROP TABLE " + sqlStatements.getTableName() + " CASCADE"); + } + + @Override + protected AccessTokenDataStore getStore() { + return sqlStore; + } + + +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index b361b31d9fe..1b1d25b9dba 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -176,6 +176,7 @@ include(":extensions:data-plane:data-plane-http-oauth2") include(":extensions:data-plane:data-plane-http-oauth2-core") include(":extensions:data-plane:data-plane-integration-tests") include(":extensions:data-plane:store:sql:data-plane-store-sql") +include(":extensions:data-plane:store:sql:accesstokendata-store-sql") include(":extensions:data-plane:data-plane-kafka") include(":extensions:data-plane-selector:data-plane-selector-api") diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java new file mode 100644 index 00000000000..c1bc0f8b354 --- /dev/null +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.spi; + +import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.types.domain.DataAddress; + +/** + * Container object for a {@link ClaimToken} and a {@link DataAddress} that the data plane uses to keep track of currently + * all access tokens that are currently valid. + */ +public record AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress) { + +} diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataStore.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataStore.java new file mode 100644 index 00000000000..ccc95611082 --- /dev/null +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataStore.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.spi.store; + +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.StoreResult; + +import java.util.Collection; + +/** + * Persistence layer for {@link AccessTokenData} objects, which the data plane uses to keep track of all access tokens that + * are currently valid. + * Implementations must be thread-safe. + */ +public interface AccessTokenDataStore { + String OBJECT_EXISTS = "AccessTokenData with ID '%s' already exists."; + String OBJECT_NOT_FOUND = "AccessTokenData with ID '%s' does not exist."; + + /** + * Returns an {@link AccessTokenData} object with the given ID. Returns null if not found. + * + * @param id the ID of the entity. + * @return The entity, or null if not found. + * @throws NullPointerException if the id parameter was null + */ + AccessTokenData getById(String id); + + /** + * Adds an {@link AccessTokenData} object to the persistence layer. Will return a failure if an object with the same already exists. + * + * @param accessTokenData the new object + * @return success if stored, a failure if an object with the same ID already exists. + */ + StoreResult store(AccessTokenData accessTokenData); + + /** + * Deletes an {@link AccessTokenData} entity with the given ID. + * + * @param id The ID of the {@link AccessTokenData} that is supposed to be deleted. + * @return success if deleted, a failure if an object with the given ID does not exist. + */ + StoreResult deleteById(String id); + + /** + * Returns all {@link AccessTokenData} objects in the store that are covered by a given {@link QuerySpec}. + *

+ * Note: supplying a sort field that does not exist on the {@link AccessTokenData} may cause some implementations + * to return an empty Stream, others will return an unsorted Stream, depending on the backing storage + * implementation. + */ + Collection query(QuerySpec querySpec); + +} diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java new file mode 100644 index 00000000000..a5b328486c8 --- /dev/null +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.spi.store; + +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.junit.assertions.AbstractResultAssert; +import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.query.SortOrder; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.Test; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public abstract class AccessTokenDataTestBase { + + @Test + void storeAndGetById() { + var object = accessTokenData("1"); + getStore().store(object); + assertThat(getStore().getById("1")).usingRecursiveComparison().isEqualTo(object); + } + + @Test + void getById_notFound() { + assertThat(getStore().getById("not-exist")).isNull(); + } + + @Test + void store_alreadyExists() { + var object1 = accessTokenData("1"); + var objectDupl = accessTokenData("1"); + getStore().store(object1); + + AbstractResultAssert.assertThat(getStore().store(objectDupl)).isFailed() + .detail().isEqualTo("AccessTokenData with ID '1' already exists."); + } + + @Test + void deleteById() { + var object = accessTokenData("1"); + getStore().store(object); + + assertThat(getStore().deleteById("1").succeeded()).isTrue(); + } + + @Test + void deleteById_notFound() { + AbstractResultAssert.assertThat(getStore().deleteById("not-exist")).isFailed() + .detail().isEqualTo("AccessTokenData with ID 'not-exist' does not exist."); + } + + @Test + void query_byId() { + var atd = accessTokenData("test-id"); + getStore().store(atd); + + assertThat(getStore().query(QuerySpec.Builder.newInstance().filter(new Criterion("id", "=", "test-id")).build())) + .hasSize(1) + .usingRecursiveFieldByFieldElementComparator() + .containsExactly(atd); + } + + @Test + void query_byClaim() { + var ct = ClaimToken.Builder.newInstance().claim("foo", "bar").build(); + var atd = new AccessTokenData("test-id", ct, dataAddress()); + getStore().store(atd); + + assertThat(getStore().query(QuerySpec.Builder.newInstance().filter(new Criterion("claimToken.claims.foo", "=", "bar")).build())) + .hasSize(1) + .usingRecursiveFieldByFieldElementComparator() + .containsExactly(atd); + } + + @Test + void query_byDataAddressProperty() { + var ct = ClaimToken.Builder.newInstance().claim("foo", "bar").build(); + var atd = new AccessTokenData("test-id", ct, DataAddress.Builder.newInstance().type("foo-type").property("qux", "quz").build()); + getStore().store(atd); + + assertThat(getStore().query(QuerySpec.Builder.newInstance().filter(new Criterion("dataAddress.properties.qux", "=", "quz")).build())) + .hasSize(1) + .usingRecursiveFieldByFieldElementComparator() + .containsExactly(atd); + } + + @Test + void query_byMultipleCriteria() { + var ct = ClaimToken.Builder.newInstance().claim("foo", "bar").build(); + var atd = new AccessTokenData("test-id", ct, DataAddress.Builder.newInstance().type("foo-type").property("qux", "quz").build()); + getStore().store(atd); + + assertThat(getStore().query(QuerySpec.Builder.newInstance().filter(List.of( + new Criterion("dataAddress.properties.qux", "=", "quz"), + new Criterion("claimToken.claims.foo", "=", "bar"), + new Criterion("id", "like", "test-%"))).build())) + .hasSize(1) + .usingRecursiveFieldByFieldElementComparator() + .containsExactly(atd); + } + + @Test + void query_verifySorting() { + IntStream.range(0, 100).forEach(i -> getStore().store(accessTokenData("id" + i))); + + var q = QuerySpec.Builder.newInstance().sortField("id").sortOrder(SortOrder.DESC).build(); + assertThat(getStore().query(q)).extracting(AccessTokenData::id).isSortedAccordingTo(Comparator.reverseOrder()); + + var q2 = QuerySpec.Builder.newInstance().sortField("id").sortOrder(SortOrder.ASC).build(); + assertThat(getStore().query(q2)).extracting(AccessTokenData::id).isSortedAccordingTo(Comparator.naturalOrder()); + } + + @Test + void query_verifySorting_invalidSortField() { + IntStream.range(0, 100).forEach(i -> getStore().store(accessTokenData("id" + i))); + + + var q = QuerySpec.Builder.newInstance().sortField("not-exist").sortOrder(SortOrder.DESC).build(); + assertThatThrownBy(() -> getStore().query(q)).isInstanceOf(IllegalArgumentException.class); + } + + @Test + void verifyPaging() { + IntStream.range(0, 100).forEach(i -> getStore().store(accessTokenData("id" + i))); + + var q = QuerySpec.Builder.newInstance().offset(40).limit(25).build(); + assertThat(getStore().query(q)).hasSize(25); + } + + @Test + void query_defaultQuerySpec() { + IntStream.range(0, 100).forEach(i -> getStore().store(accessTokenData("id" + i))); + var q = QuerySpec.none(); + assertThat(getStore().query(q)).hasSize(50); + } + + + protected abstract AccessTokenDataStore getStore(); + + protected DataAddress dataAddress() { + return DataAddress.Builder.newInstance().type("foo-type").build(); + } + + protected AccessTokenData accessTokenData(String id) { + return new AccessTokenData(id, ClaimToken.Builder.newInstance().build(), dataAddress()); + } +}