From 56f5b590394ee2fb5676a7241cdf7c8b2587be12 Mon Sep 17 00:00:00 2001 From: mherran Date: Tue, 10 Dec 2024 10:56:36 +0100 Subject: [PATCH] feat: manually manage the transaction for snapshots --- .../NrtmClientTransactionConfiguration.java | 6 ++-- .../importer/SnapshotMirrorImporter.java | 28 ++++++++++++++----- .../UpdateNotificationFileProcessor.java | 10 +------ 3 files changed, 25 insertions(+), 19 deletions(-) diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/config/NrtmClientTransactionConfiguration.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/config/NrtmClientTransactionConfiguration.java index f7242846f0..4859c9b2c3 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/config/NrtmClientTransactionConfiguration.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/config/NrtmClientTransactionConfiguration.java @@ -7,7 +7,7 @@ import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; -import org.springframework.transaction.TransactionManager; +import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.sql.DataSource; @@ -22,12 +22,12 @@ public class NrtmClientTransactionConfiguration { public static final String NRTM_CLIENT_INFO_TRANSACTION = "nrtm-client-info-transaction-manager"; @Bean(name = NRTM_CLIENT_UPDATE_TRANSACTION) - public TransactionManager transactionManagerNrtmClientUpdate(@Qualifier("nrtmClientMasterDataSource") final DataSource masterDataSource) { + public PlatformTransactionManager transactionManagerNrtmClientUpdate(@Qualifier("nrtmClientMasterDataSource") final DataSource masterDataSource) { return new DataSourceTransactionManager(masterDataSource); } @Bean(name = NRTM_CLIENT_INFO_TRANSACTION) - public TransactionManager transactionManagerNrtmClientInfo(@Qualifier("nrtmClientMasterInfoSource") final DataSource masterDataSource) { + public PlatformTransactionManager transactionManagerNrtmClientInfo(@Qualifier("nrtmClientMasterInfoSource") final DataSource masterDataSource) { return new DataSourceTransactionManager(masterDataSource); } } diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java index 6eb45d503c..847a51ea21 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java @@ -15,8 +15,13 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Conditional; import org.springframework.stereotype.Service; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionManager; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; import java.util.Arrays; import java.util.Map; @@ -24,6 +29,8 @@ import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; +import static net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION; + @Service @Conditional(Nrtm4ClientCondition.class) @@ -33,13 +40,16 @@ public class SnapshotMirrorImporter extends AbstractMirrorImporter { private final NrtmRestClient nrtmRestClient; + private final PlatformTransactionManager nrtmClientUpdateTransaction; + public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient, final Nrtm4ClientInfoRepository nrtm4ClientInfoMirrorDao, - final Nrtm4ClientRepository nrtm4ClientRepository) { + final Nrtm4ClientRepository nrtm4ClientRepository, + @Qualifier(NRTM_CLIENT_UPDATE_TRANSACTION) final PlatformTransactionManager transactionManagerNrtmClientUpdate) { super(nrtm4ClientInfoMirrorDao, nrtm4ClientRepository); this.nrtmRestClient = nrtmRestClient; - + this.nrtmClientUpdateTransaction = transactionManagerNrtmClientUpdate; } public void doImport(final String source, @@ -61,7 +71,7 @@ public void doImport(final String source, return; } - AtomicInteger snapshotVersion = new AtomicInteger(0); + final AtomicInteger snapshotVersion = new AtomicInteger(0); final int amount = persisSnapshot(source, payload, sessionId, snapshotVersion); persistVersion(source, snapshotVersion.get(), sessionId); @@ -71,12 +81,14 @@ public void doImport(final String source, } private int persisSnapshot(final String source, final byte[] payload, final String sessionId, final AtomicInteger snapshotVersion){ - persistDummyObjectIfNotExist(source); final AtomicInteger processedCount = new AtomicInteger(0); final Timer timer = new Timer(); printProgress(timer, processedCount); + //Transaction annotation does not work with any threaded processing methods + final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition()); try { + persistDummyObjectIfNotExist(source); GzipDecompressor.decompressRecords( payload, firstRecord -> { @@ -85,11 +97,13 @@ private int persisSnapshot(final String source, final byte[] payload, final Stri }, recordBatches -> persistBatches(recordBatches, processedCount) ); - } catch (IllegalArgumentException ex){ + } catch (Exception ex){ LOGGER.error("Error persisting snapshot", ex); - throw ex; + nrtmClientUpdateTransaction.rollback(transactionStatus); + throw new IllegalStateException(ex); } + nrtmClientUpdateTransaction.commit(transactionStatus); timer.cancel(); return processedCount.get(); } @@ -130,7 +144,7 @@ private void persistBatches(final String[] remainingRecords, final AtomicInteger nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue()); processedCount.incrementAndGet(); } catch (Exception e) { - LOGGER.error("Unable to parse record {}", record, e); + LOGGER.error("Unable to process record {}", record, e); throw new IllegalStateException(e); } }); diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java index f079394ec5..1dad052579 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java @@ -125,18 +125,10 @@ public void processFile(){ } private void processSnapshot(final String source, final NrtmClientVersionInfo nrtmClientLastVersionInfo, final UpdateNotificationFileResponse updateNotificationFile) { - if (nrtmClientLastVersionInfo != null){ return; } - try { - LOGGER.info("There is no existing Snapshot for the source {}", source); - snapshotImporter.doImport(source, updateNotificationFile.getSessionID(), updateNotificationFile.getSnapshot()); - } catch (Exception ex) { - snapshotImporter.truncateTables(); - LOGGER.error("There was an issue importing the records", ex); - throw ex; - } + snapshotImporter.doImport(source, updateNotificationFile.getSessionID(), updateNotificationFile.getSnapshot()); } private void persistVersion(final String source, final UpdateNotificationFileResponse updateNotificationFile,