Skip to content

Commit

Permalink
feat: EDR (data address) and metadata storage/cache (#3978)
Browse files Browse the repository at this point in the history
feat: EDR (dataaddress) and metadata storage/cache
  • Loading branch information
wolf4ood authored Mar 8, 2024
1 parent 3c4c192 commit 95d71be
Show file tree
Hide file tree
Showing 24 changed files with 1,329 additions and 1 deletion.
26 changes: 26 additions & 0 deletions core/common/edr-store-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
`maven-publish`
}

dependencies {
api(project(":spi:common:edr-store-spi"))
implementation(project(":core:common:connector-core"))

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":spi:common:edr-store-spi")))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.core.edr;

import org.eclipse.edc.core.edr.defaults.InMemoryEndpointDataReferenceEntryIndex;
import org.eclipse.edc.core.edr.defaults.VaultEndpointDataReferenceCache;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;

import static org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension.NAME;

@Extension(NAME)
public class EndpointDataReferenceStoreDefaultServicesExtension implements ServiceExtension {

public static final String DEFAULT_EDR_VAULT_PATH = "";
@Setting(value = "Directory/Path where to store EDRs in the vault for vaults that supports hierarchical structuring.", defaultValue = DEFAULT_EDR_VAULT_PATH)
public static final String EDC_EDR_VAULT_PATH = "edc.edr.vault.path";
protected static final String NAME = "Endpoint Data Reference Core Default Services Extension";
@Inject
private EndpointDataReferenceEntryIndex edrEntryStore;

@Inject
private EndpointDataReferenceCache edrCache;

@Inject
private CriterionOperatorRegistry criterionOperatorRegistry;

@Inject
private Vault vault;

@Inject
private TypeManager typeManager;


@Provider(isDefault = true)
public EndpointDataReferenceCache endpointDataReferenceCache(ServiceExtensionContext context) {
var vaultDirectory = context.getConfig().getString(EDC_EDR_VAULT_PATH, DEFAULT_EDR_VAULT_PATH);
return new VaultEndpointDataReferenceCache(vault, vaultDirectory, typeManager.getMapper());
}

@Provider(isDefault = true)
public EndpointDataReferenceEntryIndex endpointDataReferenceEntryStore() {
return new InMemoryEndpointDataReferenceEntryIndex(criterionOperatorRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.core.edr;

import org.eclipse.edc.core.edr.store.EndpointDataReferenceStoreImpl;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.transaction.spi.TransactionContext;

import static org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension.NAME;

@Extension(NAME)
public class EndpointDataReferenceStoreExtension implements ServiceExtension {

protected static final String NAME = "Endpoint Data Reference Core Extension";

@Inject
private EndpointDataReferenceEntryIndex edrIndex;

@Inject
private EndpointDataReferenceCache edrCache;

@Inject
private TransactionContext transactionContext;

@Provider
public EndpointDataReferenceStore endpointDataReferenceService() {
return new EndpointDataReferenceStoreImpl(edrIndex, edrCache, transactionContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.core.edr.defaults;

import org.eclipse.edc.connector.core.store.ReflectionBasedQueryResolver;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.query.QueryResolver;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* In memory implementation of {@link EndpointDataReferenceEntryIndex}
*/
public class InMemoryEndpointDataReferenceEntryIndex implements EndpointDataReferenceEntryIndex {

private final QueryResolver<EndpointDataReferenceEntry> queryResolver;
private final Map<String, EndpointDataReferenceEntry> cache = new ConcurrentHashMap<>();

public InMemoryEndpointDataReferenceEntryIndex(CriterionOperatorRegistry criterionOperatorRegistry) {
queryResolver = new ReflectionBasedQueryResolver<>(EndpointDataReferenceEntry.class, criterionOperatorRegistry);
}

@Override
public StoreResult<List<EndpointDataReferenceEntry>> query(QuerySpec spec) {
return StoreResult.success(queryResolver.query(cache.values().stream(), spec).collect(Collectors.toList()));
}

@Override
public StoreResult<Void> save(EndpointDataReferenceEntry entry) {
cache.put(entry.getTransferProcessId(), entry);
return StoreResult.success();
}

@Override
public StoreResult<EndpointDataReferenceEntry> delete(String transferProcessId) {
return Optional.ofNullable(cache.remove(transferProcessId))
.map(StoreResult::success)
.orElse(StoreResult.notFound("EDR entry not found for transfer process %s".formatted(transferProcessId)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.core.edr.defaults;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.util.Optional;

/**
* Vault implementation of {@link EndpointDataReferenceCache}
*/
public class VaultEndpointDataReferenceCache implements EndpointDataReferenceCache {

public static final String SEPARATOR = "--";
public static final String VAULT_PREFIX = "edr" + SEPARATOR;

private final Vault vault;
private final String vaultPath;

private final ObjectMapper objectMapper;

public VaultEndpointDataReferenceCache(Vault vault, String vaultPath, ObjectMapper objectMapper) {
this.vault = vault;
this.vaultPath = vaultPath;
this.objectMapper = objectMapper;
}

@Override
public StoreResult<DataAddress> get(String transferProcessId) {
var key = toKey(transferProcessId);
return Optional.ofNullable(vault.resolveSecret(key))
.map(this::fromJson)
.map(StoreResult::success)
.orElse(StoreResult.notFound("EDR not found in the vault for transfer process: %s".formatted(transferProcessId)));
}

@Override
public StoreResult<Void> put(String transferProcessId, DataAddress edr) {
var result = vault.storeSecret(toKey(transferProcessId), toJson(edr));
if (result.failed()) {
return StoreResult.generalError(result.getFailureDetail());
}
return StoreResult.success();
}

@Override
public StoreResult<Void> delete(String transferProcessId) {
var result = vault.deleteSecret(toKey(transferProcessId));
if (result.failed()) {
return StoreResult.generalError(result.getFailureDetail());
}
return StoreResult.success();
}

private String toJson(DataAddress dataAddress) {
try {
return objectMapper.writeValueAsString(dataAddress);
} catch (JsonProcessingException e) {
throw new EdcPersistenceException(e);
}
}

private DataAddress fromJson(String dataAddress) {
try {
return objectMapper.readValue(dataAddress, DataAddress.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

private String toKey(String transferProcessId) {
return vaultPath + VAULT_PREFIX + transferProcessId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.core.edr.store;

import org.eclipse.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceEntryIndex;
import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore;
import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.List;

/**
* Default implementation of {@link EndpointDataReferenceStore}. It makes usage of two subcomponents
* The metadata storage {@link EndpointDataReferenceEntryIndex} and the EDR cache {@link EndpointDataReferenceCache}
* and coordinate the interaction between then for fulfilling the {@link EndpointDataReferenceStore} interface
*/
public class EndpointDataReferenceStoreImpl implements EndpointDataReferenceStore {

private final EndpointDataReferenceEntryIndex dataReferenceEntryIndex;
private final EndpointDataReferenceCache dataReferenceCache;
private final TransactionContext transactionalContext;

public EndpointDataReferenceStoreImpl(EndpointDataReferenceEntryIndex dataReferenceEntryIndex, EndpointDataReferenceCache dataReferenceCache, TransactionContext transactionalContext) {
this.dataReferenceEntryIndex = dataReferenceEntryIndex;
this.dataReferenceCache = dataReferenceCache;
this.transactionalContext = transactionalContext;
}

@Override
public StoreResult<DataAddress> resolveByTransferProcess(String transferProcessId) {
return transactionalContext.execute(() -> dataReferenceCache.get(transferProcessId));
}

@Override
public StoreResult<List<EndpointDataReferenceEntry>> query(QuerySpec querySpec) {
return transactionalContext.execute(() -> dataReferenceEntryIndex.query(querySpec));
}

@Override
public StoreResult<EndpointDataReferenceEntry> delete(String transferProcessId) {
return transactionalContext.execute(() -> dataReferenceCache.delete(transferProcessId)
.compose((i) -> dataReferenceEntryIndex.delete(transferProcessId)));
}

@Override
public StoreResult<Void> save(EndpointDataReferenceEntry entry, DataAddress dataAddress) {
return transactionalContext.execute(() -> dataReferenceCache.put(entry.getTransferProcessId(), dataAddress)
.compose(i -> dataReferenceEntryIndex.save(entry)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
#
# Contributors:
# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
#
#


org.eclipse.edc.core.edr.EndpointDataReferenceStoreExtension
Loading

0 comments on commit 95d71be

Please sign in to comment.