Skip to content

Commit

Permalink
started adjusting jooq enums to target the correct fields, created he…
Browse files Browse the repository at this point in the history
…lper to resolve endpoint to connectorId
  • Loading branch information
kamilczaja committed Jun 27, 2024
1 parent d24bb6d commit cef88f0
Show file tree
Hide file tree
Showing 16 changed files with 100 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public record BrokerServerExtensionContext(

// Required for Integration Tests
ConnectorUpdater connectorUpdater,
ConnectorCreator connectorCreator,
PolicyMapper policyMapper,
FetchedCatalogBuilder fetchedCatalogBuilder,
DataOfferRecordUpdater dataOfferRecordUpdater
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,15 @@
import de.sovity.edc.ext.brokerserver.dao.ConnectorQueries;
import de.sovity.edc.ext.brokerserver.dao.ContractOfferQueries;
import de.sovity.edc.ext.brokerserver.dao.DataOfferQueries;
import de.sovity.edc.ext.brokerserver.dao.pages.catalog.CatalogQueryAvailableFilterFetcher;
import de.sovity.edc.ext.brokerserver.dao.pages.catalog.CatalogQueryContractOfferFetcher;
import de.sovity.edc.ext.brokerserver.dao.pages.catalog.CatalogQueryDataOfferFetcher;
import de.sovity.edc.ext.brokerserver.dao.pages.catalog.CatalogQueryFilterService;
import de.sovity.edc.ext.brokerserver.dao.pages.catalog.CatalogQueryService;
import de.sovity.edc.ext.brokerserver.dao.pages.catalog.CatalogQuerySortingService;
import de.sovity.edc.ext.brokerserver.dao.pages.connector.ConnectorDetailQueryService;
import de.sovity.edc.ext.brokerserver.dao.pages.connector.ConnectorListQueryService;
import de.sovity.edc.ext.brokerserver.dao.pages.dataoffer.DataOfferDetailPageQueryService;
import de.sovity.edc.ext.brokerserver.dao.pages.dataoffer.ViewCountLogger;
import de.sovity.edc.ext.brokerserver.db.DataSourceFactory;
import de.sovity.edc.ext.brokerserver.db.DslContextFactory;
import de.sovity.edc.ext.brokerserver.services.BrokerServerInitializer;
import de.sovity.edc.ext.brokerserver.services.ConnectorCleaner;
import de.sovity.edc.ext.brokerserver.services.ConnectorCreator;
import de.sovity.edc.ext.brokerserver.services.ConnectorKiller;
import de.sovity.edc.ext.brokerserver.services.KnownConnectorsInitializer;
import de.sovity.edc.ext.brokerserver.services.OfflineConnectorKiller;
import de.sovity.edc.ext.brokerserver.services.api.ConnectorOnlineStatusMapper;
import de.sovity.edc.ext.brokerserver.services.api.ConnectorService;
import de.sovity.edc.ext.brokerserver.services.api.DataOfferMappingUtils;
import de.sovity.edc.ext.brokerserver.services.api.PaginationMetadataUtils;
import de.sovity.edc.ext.brokerserver.services.api.filtering.CatalogFilterAttributeDefinitionService;
import de.sovity.edc.ext.brokerserver.services.api.filtering.CatalogFilterService;
import de.sovity.edc.ext.brokerserver.services.api.filtering.CatalogSearchService;
import de.sovity.edc.ext.brokerserver.services.config.AdminApiKeyValidator;
import de.sovity.edc.ext.brokerserver.services.config.BrokerServerSettingsFactory;
import de.sovity.edc.ext.brokerserver.services.logging.BrokerEventLogger;
import de.sovity.edc.ext.brokerserver.services.logging.BrokerExecutionTimeLogger;
Expand Down Expand Up @@ -119,41 +102,20 @@ public static BrokerServerExtensionContext buildContext(
) {
var brokerServerSettingsFactory = new BrokerServerSettingsFactory(config, monitor);
var brokerServerSettings = brokerServerSettingsFactory.buildBrokerServerSettings();
var adminApiKeyValidator = new AdminApiKeyValidator(brokerServerSettings);

// Dao
var dataOfferQueries = new DataOfferQueries();
var dataSourceFactory = new DataSourceFactory(config);
var dataSource = dataSourceFactory.newDataSource();
var dslContextFactory = new DslContextFactory(dataSource);
var connectorQueries = new ConnectorQueries();
var catalogQuerySortingService = new CatalogQuerySortingService();
var catalogSearchService = new CatalogSearchService();
var catalogQueryFilterService = new CatalogQueryFilterService(brokerServerSettings, catalogSearchService);
var catalogQueryContractOfferFetcher = new CatalogQueryContractOfferFetcher();
var catalogQueryDataOfferFetcher = new CatalogQueryDataOfferFetcher(
catalogQuerySortingService,
catalogQueryFilterService,
catalogQueryContractOfferFetcher
);
var catalogQueryAvailableFilterFetcher = new CatalogQueryAvailableFilterFetcher(catalogQueryFilterService);
var catalogQueryService = new CatalogQueryService(
catalogQueryDataOfferFetcher,
catalogQueryAvailableFilterFetcher,
brokerServerSettings
);
var connectorListQueryService = new ConnectorListQueryService();
var connectorDetailQueryService = new ConnectorDetailQueryService();
var dataOfferDetailPageQueryService = new DataOfferDetailPageQueryService(
catalogQueryContractOfferFetcher, brokerServerSettings);


// Services
var objectMapperJsonLd = getJsonLdObjectMapper(typeManager);
var brokerEventLogger = new BrokerEventLogger();
var brokerExecutionTimeLogger = new BrokerExecutionTimeLogger();
var contractOfferRecordUpdater = new ContractOfferRecordUpdater();
var dataOfferRecordUpdater = new DataOfferRecordUpdater();
var contractOfferRecordUpdater = new ContractOfferRecordUpdater(dataOfferMappingUtils);
var dataOfferRecordUpdater = new DataOfferRecordUpdater(connectorQueries);
var contractOfferQueries = new ContractOfferQueries();
var dataOfferLimitsEnforcer = new DataOfferLimitsEnforcer(brokerServerSettings, brokerEventLogger);
var dataOfferPatchBuilder = new DataOfferPatchBuilder(
Expand Down Expand Up @@ -202,23 +164,17 @@ public static BrokerServerExtensionContext buildContext(
monitor,
brokerExecutionTimeLogger
);
var paginationMetadataUtils = new PaginationMetadataUtils();

var threadPoolTaskQueue = new ThreadPoolTaskQueue();
var threadPool = new ThreadPool(threadPoolTaskQueue, brokerServerSettings, monitor);
var connectorQueue = new ConnectorQueue(connectorUpdater, threadPool);
var connectorQueueFiller = new ConnectorQueueFiller(connectorQueue, connectorQueries);
var connectorCreator = new ConnectorCreator(connectorQueries);
var knownConnectorsInitializer = new KnownConnectorsInitializer(
config,
connectorQueue,
connectorCreator
connectorQueue
);
var catalogFilterAttributeDefinitionService = new CatalogFilterAttributeDefinitionService();
var catalogFilterService = new CatalogFilterService(catalogFilterAttributeDefinitionService);
var viewCountLogger = new ViewCountLogger();
var connectorService = new ConnectorService(connectorCreator, connectorQueue);
var connectorKiller = new ConnectorKiller();
var connectorClearer = new ConnectorCleaner();
var connectorClearer = new ConnectorCleaner(connectorQueries);
var offlineConnectorKiller = new OfflineConnectorKiller(
brokerServerSettings,
connectorQueries,
Expand Down Expand Up @@ -269,7 +225,6 @@ public static BrokerServerExtensionContext buildContext(
return new BrokerServerExtensionContext(
brokerServerInitializer,
connectorUpdater,
connectorCreator,
policyMapper,
fetchedDataOfferBuilder,
dataOfferRecordUpdater
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ConnectorQueries {
Expand Down Expand Up @@ -51,4 +52,10 @@ public List<String> findAllConnectorsForKilling(DSLContext dsl, Duration deleteO
.where(c.LAST_SUCCESSFUL_REFRESH_AT.lt(OffsetDateTime.now().minus(deleteOfflineConnectorsAfter)))
.fetch(c.ENDPOINT);
}

public Map<String, String> getConnectorIdsByEndpointUrl(DSLContext dsl, Collection<String> endpoints) {
return dsl.selectFrom(Tables.CONNECTOR)
.where(PostgresqlUtils.in(Tables.CONNECTOR.ENDPOINT_URL, endpoints))
.fetchMap(Tables.CONNECTOR.ENDPOINT_URL, Tables.CONNECTOR.CONNECTOR_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@

package de.sovity.edc.ext.brokerserver.services;

import de.sovity.edc.ext.brokerserver.dao.ConnectorQueries;
import de.sovity.edc.ext.brokerserver.dao.utils.PostgresqlUtils;
import de.sovity.edc.ext.brokerserver.db.jooq.Tables;
import lombok.RequiredArgsConstructor;
import org.jooq.DSLContext;

import java.util.Collection;
import java.util.Map;

@RequiredArgsConstructor
public class ConnectorCleaner {

private final ConnectorQueries connectorQueries;

public void removeDataForDeadConnectors(DSLContext dsl, Collection<String> endpoints) {
var doco = Tables.CONTRACT_OFFER;
var dof = Tables.DATA_OFFER;
dsl.deleteFrom(doco).where(PostgresqlUtils.in(doco.CONNECTOR_ENDPOINT, endpoints)).execute();
dsl.deleteFrom(dof).where(PostgresqlUtils.in(dof.CONNECTOR_ENDPOINT, endpoints)).execute();

var connectorIdsByEndpointUrl = connectorQueries.getConnectorIdsByEndpointUrl(dsl, endpoints);

dsl.deleteFrom(doco).where(PostgresqlUtils.in(doco.CONNECTOR_ID, connectorIdsByEndpointUrl.values())).execute();
dsl.deleteFrom(dof).where(PostgresqlUtils.in(dof.CONNECTOR_ID, connectorIdsByEndpointUrl.values())).execute();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@
public class ConnectorKiller {
public void killConnectors(DSLContext dsl, Collection<String> endpoints) {
var c = Tables.CONNECTOR;
dsl.update(c).set(c.ONLINE_STATUS, ConnectorOnlineStatus.DEAD).where(PostgresqlUtils.in(c.ENDPOINT, endpoints)).execute();
dsl.update(c).set(c.ONLINE_STATUS, ConnectorOnlineStatus.DEAD).where(PostgresqlUtils.in(c.ENDPOINT_URL, endpoints)).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
public class KnownConnectorsInitializer {
private final Config config;
private final ConnectorQueue connectorQueue;
private final ConnectorCreator connectorCreator;

public void addKnownConnectorsOnStartup(DSLContext dsl) {
var connectorEndpoints = getKnownConnectorsConfigValue();
connectorCreator.addConnectors(dsl, connectorEndpoints);
connectorQueue.addAll(connectorEndpoints, ConnectorRefreshPriority.ADDED_ON_STARTUP);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public class OfflineConnectorKiller {
private final ConnectorQueries connectorQueries;
private final BrokerEventLogger brokerEventLogger;
private final ConnectorKiller connectorKiller;
private final ConnectorCleaner connectorClearer;
private final ConnectorCleaner connectorCleaner;

public void killIfOfflineTooLong(DSLContext dsl) {
var killOfflineConnectorsAfter = brokerServerSettings.getKillOfflineConnectorsAfter();
var toKill = connectorQueries.findAllConnectorsForKilling(dsl, killOfflineConnectorsAfter);

connectorClearer.removeDataForDeadConnectors(dsl, toKill);
connectorCleaner.removeDataForDeadConnectors(dsl, toKill);
connectorKiller.killConnectors(dsl, toKill);

brokerEventLogger.addKilledDueToOfflineTooLongMessages(dsl, toKill);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class ConnectorUpdateFailureWriter {
public void handleConnectorOffline(DSLContext dsl, ConnectorRecord connector, Throwable e) {
// Log Status Change and set status to offline if necessary
if (connector.getOnlineStatus() == ConnectorOnlineStatus.ONLINE || connector.getLastRefreshAttemptAt() == null) {
monitor.info("Connector is offline: " + connector.getEndpoint(), e);
brokerEventLogger.logConnectorOffline(dsl, connector.getEndpoint(), getFailureMessage(e));
monitor.info("Connector is offline: " + connector.getEndpointUrl(), e);
brokerEventLogger.logConnectorOffline(dsl, connector.getEndpointUrl(), getFailureMessage(e));
connector.setOnlineStatus(ConnectorOnlineStatus.OFFLINE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void handleConnectorOnline(

// Log Status Change and set status to online if necessary
if (connector.getOnlineStatus() != ConnectorOnlineStatus.ONLINE || connector.getLastRefreshAttemptAt() == null) {
brokerEventLogger.logConnectorOnline(dsl, connector.getEndpoint());
brokerEventLogger.logConnectorOnline(dsl, connector.getEndpointUrl());
connector.setOnlineStatus(ConnectorOnlineStatus.ONLINE);
}

Expand All @@ -54,11 +54,11 @@ public void handleConnectorOnline(
updateConnector(connector, catalog, changes);

// Update data offers
dataOfferWriter.updateDataOffers(dsl, connector.getEndpoint(), limitedDataOffers.abbreviatedDataOffers(), changes);
dataOfferWriter.updateDataOffers(dsl, connector.getEndpointUrl(), limitedDataOffers.abbreviatedDataOffers(), changes);

// Log event if changes are present
if (!changes.isEmpty()) {
brokerEventLogger.logConnectorUpdated(dsl, connector.getEndpoint(), changes);
brokerEventLogger.logConnectorUpdated(dsl, connector.getEndpointUrl(), changes);
}
}

Expand All @@ -68,8 +68,8 @@ private static void updateConnector(ConnectorRecord connector, FetchedCatalog ca

connector.setLastSuccessfulRefreshAt(now);
connector.setLastRefreshAttemptAt(now);
if (!Objects.equals(connector.getParticipantId(), participantId)) {
connector.setParticipantId(participantId);
if (!Objects.equals(connector.getConnectorId(), participantId)) {
connector.setConnectorId(participantId);
connector.setMdsId(MdsIdUtils.getMdsIdFromParticipantId(participantId));
changes.setParticipantIdChanged(participantId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import de.sovity.edc.ext.brokerserver.dao.utils.JsonbUtils;
import de.sovity.edc.ext.brokerserver.db.jooq.tables.records.ContractOfferRecord;
import de.sovity.edc.ext.brokerserver.db.jooq.tables.records.DataOfferRecord;
import de.sovity.edc.ext.brokerserver.services.api.DataOfferMappingUtils;
import de.sovity.edc.ext.brokerserver.services.refreshing.offers.model.FetchedContractOffer;
import de.sovity.edc.ext.wrapper.api.common.mappers.PolicyMapper;
import lombok.RequiredArgsConstructor;
import org.jooq.JSONB;

Expand All @@ -32,6 +34,8 @@
@RequiredArgsConstructor
public class ContractOfferRecordUpdater {

private final DataOfferMappingUtils dataOfferMappingUtils;

/**
* Create new {@link ContractOfferRecord} from {@link FetchedContractOffer}.
*
Expand All @@ -41,7 +45,8 @@ public class ContractOfferRecordUpdater {
*/
public ContractOfferRecord newContractOffer(DataOfferRecord dataOffer, FetchedContractOffer fetchedContractOffer) {
var contractOffer = new ContractOfferRecord();
contractOffer.setConnectorEndpoint(dataOffer.getConnectorEndpoint());

contractOffer.setConnectorId(dataOffer.getConnectorId());
contractOffer.setContractOfferId(fetchedContractOffer.getContractOfferId());
contractOffer.setAssetId(dataOffer.getAssetId());
contractOffer.setCreatedAt(OffsetDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public DataOfferLimitsEnforced enforceLimits(Collection<FetchedDataOffer> dataOf
}

public void logEnforcedLimitsIfChanged(DSLContext dsl, ConnectorRecord connector, DataOfferLimitsEnforced enforcedLimits) {
String endpoint = connector.getEndpoint();
String endpoint = connector.getEndpointUrl();

// DataOffer
if (enforcedLimits.dataOfferLimitsExceeded() && connector.getDataOffersExceeded() == ConnectorDataOffersExceeded.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public DataOfferPatch buildDataOfferPatch(
);

diff.added().forEach(fetched -> {
var newRecord = dataOfferRecordUpdater.newDataOffer(connectorEndpoint, fetched);
var newRecord = dataOfferRecordUpdater.newDataOffer(dsl, connectorEndpoint, fetched);
patch.insertDataOffer(newRecord);
patchContractOffers(patch, newRecord, List.of(), fetched.getContractOffers());
});
Expand Down
Loading

0 comments on commit cef88f0

Please sign in to comment.