Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement AccessTokenStore #3907

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

import org.eclipse.edc.connector.dataplane.framework.pipeline.PipelineServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryAccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
Expand All @@ -32,18 +34,16 @@
public class DataPlaneDefaultServicesExtension implements ServiceExtension {

public static final String NAME = "Data Plane Framework Default Services";
@Inject
private Clock clock;
@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Override
public String name() {
return NAME;
}

@Inject
private Clock clock;

@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Provider(isDefault = true)
public TransferServiceSelectionStrategy transferServiceSelectionStrategy() {
return TransferServiceSelectionStrategy.selectFirst();
Expand All @@ -54,6 +54,11 @@ public DataPlaneStore dataPlaneStore() {
return new InMemoryDataPlaneStore(clock, criterionOperatorRegistry);
}

@Provider(isDefault = true)
public AccessTokenDataStore defaultAccessTokenDataStore() {
return new InMemoryAccessTokenDataStore(criterionOperatorRegistry);
}

@Provider(isDefault = true)
public PipelineService pipelineService(ServiceExtensionContext context) {
return new PipelineServiceImpl(context.getMonitor());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework.store;

import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver;
import org.eclipse.edc.connector.dataplane.spi.AccessTokenData;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
* In-mem implementation of the {@link AccessTokenDataStore} based on a {@link ConcurrentHashMap}.
*/
public class InMemoryAccessTokenDataStore implements AccessTokenDataStore {
private final Map<String, AccessTokenData> store = new ConcurrentHashMap<>();
private final QueryResolver<AccessTokenData> queryResolver;

public InMemoryAccessTokenDataStore(CriterionOperatorRegistry operatorRegistry) {
this.queryResolver = new ReflectionBasedQueryResolver<>(AccessTokenData.class, operatorRegistry);
}

@Override
public AccessTokenData getById(String id) {
return store.get(id);
}

@Override
public StoreResult<Void> store(AccessTokenData accessTokenData) {

var prev = store.putIfAbsent(accessTokenData.id(), accessTokenData);
return Optional.ofNullable(prev)
.map(a -> StoreResult.<Void>alreadyExists(OBJECT_EXISTS.formatted(accessTokenData.id())))
.orElse(StoreResult.success());
}

@Override
public StoreResult<Void> deleteById(String id) {
var prev = store.remove(id);
return Optional.ofNullable(prev)
.map(p -> StoreResult.<Void>success())
.orElse(StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id)));
}

@Override
public Collection<AccessTokenData> query(QuerySpec querySpec) {
return queryResolver.query(store.values().stream(), querySpec).toList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework.store;

import org.eclipse.edc.connector.core.store.CriterionOperatorRegistryImpl;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataTestBase;

class InMemoryAccessTokenDataStoreTest extends AccessTokenDataTestBase {
private final InMemoryAccessTokenDataStore store = new InMemoryAccessTokenDataStore(CriterionOperatorRegistryImpl.ofDefaults());

@Override
protected AccessTokenDataStore getStore() {
return store;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
`maven-publish`
}

dependencies {
api(project(":spi:common:transaction-spi"))
api(project(":spi:data-plane:data-plane-spi"))

implementation(project(":spi:common:transaction-datasource-spi"))
implementation(project(":extensions:common:sql:sql-core"))

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":spi:data-plane:data-plane-spi")))
testImplementation(testFixtures(project(":extensions:common:sql:sql-lease")))
testImplementation(testFixtures(project(":extensions:common:sql:sql-core")))

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

-- Statements are designed for and tested with Postgres only!

CREATE TABLE IF NOT EXISTS edc_accesstokendata
(
id VARCHAR NOT NULL PRIMARY KEY,
claim_token json NOT NULL,
data_address JSON NOT NULL
);

COMMENT ON COLUMN edc_accesstokendata.claim_token IS 'ClaimToken serialized as JSON map';
COMMENT ON COLUMN edc_accesstokendata.data_address IS 'DataAddress serialized as JSON map';
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.store.sql;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.connector.dataplane.spi.AccessTokenData;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.connector.dataplane.store.sql.schema.AccessTokenDataStatements;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.store.AbstractSqlStore;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.jetbrains.annotations.Nullable;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;

import static org.eclipse.edc.spi.query.Criterion.criterion;

/**
* SQL implementation of {@link DataPlaneStore}
*/
public class SqlAccessTokenDataStore extends AbstractSqlStore implements AccessTokenDataStore {

private final AccessTokenDataStatements statements;

public SqlAccessTokenDataStore(DataSourceRegistry dataSourceRegistry, String dataSourceName, TransactionContext transactionContext,
paullatzelsperger marked this conversation as resolved.
Show resolved Hide resolved
AccessTokenDataStatements statements, ObjectMapper objectMapper, QueryExecutor queryExecutor) {
super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper, queryExecutor);
this.statements = statements;
}

@Override
public AccessTokenData getById(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
return findByIdInternal(connection, id);
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public StoreResult<Void> store(AccessTokenData accessTokenData) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
if (findByIdInternal(connection, accessTokenData.id()) != null) {
return StoreResult.alreadyExists(OBJECT_EXISTS.formatted(accessTokenData.id()));
}
insert(connection, accessTokenData);
return StoreResult.success();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public StoreResult<Void> deleteById(String id) {
return transactionContext.execute(() -> {
try (var connection = getConnection()) {
if (findByIdInternal(connection, id) == null) {
return StoreResult.notFound(OBJECT_NOT_FOUND.formatted(id));
}
var sql = statements.getDeleteTemplate();
queryExecutor.execute(connection, sql, id);
return StoreResult.success();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

@Override
public Collection<AccessTokenData> query(QuerySpec querySpec) {
return transactionContext.execute(() -> {
try (var conn = getConnection()) {
var sql = statements.createQuery(querySpec);
return queryExecutor.query(conn, true, this::mapAccessTokenData, sql.getQueryAsString(), sql.getParameters()).toList();
} catch (SQLException e) {
throw new EdcPersistenceException(e);
}
});
}

private void insert(Connection connection, AccessTokenData dataFlow) {
var sql = statements.getInsertTemplate();
queryExecutor.execute(connection, sql,
dataFlow.id(),
toJson(dataFlow.claimToken()),
toJson(dataFlow.dataAddress())
);
}


private AccessTokenData mapAccessTokenData(ResultSet resultSet) throws SQLException {
var claimToken = fromJson(resultSet.getString(statements.getClaimTokenColumn()), ClaimToken.class);
var dataAddress = fromJson(resultSet.getString(statements.getDataAddressColumn()), DataAddress.class);
var id = resultSet.getString(statements.getIdColumn());

return new AccessTokenData(id, claimToken, dataAddress);
}

private @Nullable AccessTokenData findByIdInternal(Connection conn, String id) {
return transactionContext.execute(() -> {
var querySpec = QuerySpec.Builder.newInstance().filter(criterion("id", "=", id)).build();
var statement = statements.createQuery(querySpec);
return queryExecutor.query(conn, true, this::mapAccessTokenData, statement.getQueryAsString(), statement.getParameters())
.findFirst().orElse(null);
});
}

}
Loading
Loading