Skip to content

Commit

Permalink
feat: manually manage the transaction for snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelAHM committed Dec 10, 2024
1 parent b36b341 commit 56f5b59
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
Expand All @@ -22,12 +22,12 @@ public class NrtmClientTransactionConfiguration {
public static final String NRTM_CLIENT_INFO_TRANSACTION = "nrtm-client-info-transaction-manager";

@Bean(name = NRTM_CLIENT_UPDATE_TRANSACTION)
public TransactionManager transactionManagerNrtmClientUpdate(@Qualifier("nrtmClientMasterDataSource") final DataSource masterDataSource) {
public PlatformTransactionManager transactionManagerNrtmClientUpdate(@Qualifier("nrtmClientMasterDataSource") final DataSource masterDataSource) {
return new DataSourceTransactionManager(masterDataSource);
}

@Bean(name = NRTM_CLIENT_INFO_TRANSACTION)
public TransactionManager transactionManagerNrtmClientInfo(@Qualifier("nrtmClientMasterInfoSource") final DataSource masterDataSource) {
public PlatformTransactionManager transactionManagerNrtmClientInfo(@Qualifier("nrtmClientMasterInfoSource") final DataSource masterDataSource) {
return new DataSourceTransactionManager(masterDataSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,22 @@
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.Arrays;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

import static net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION;


@Service
@Conditional(Nrtm4ClientCondition.class)
Expand All @@ -33,13 +40,16 @@ public class SnapshotMirrorImporter extends AbstractMirrorImporter {

private final NrtmRestClient nrtmRestClient;

private final PlatformTransactionManager nrtmClientUpdateTransaction;


public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient,
final Nrtm4ClientInfoRepository nrtm4ClientInfoMirrorDao,
final Nrtm4ClientRepository nrtm4ClientRepository) {
final Nrtm4ClientRepository nrtm4ClientRepository,
@Qualifier(NRTM_CLIENT_UPDATE_TRANSACTION) final PlatformTransactionManager transactionManagerNrtmClientUpdate) {
super(nrtm4ClientInfoMirrorDao, nrtm4ClientRepository);
this.nrtmRestClient = nrtmRestClient;

this.nrtmClientUpdateTransaction = transactionManagerNrtmClientUpdate;
}

public void doImport(final String source,
Expand All @@ -61,7 +71,7 @@ public void doImport(final String source,
return;
}

AtomicInteger snapshotVersion = new AtomicInteger(0);
final AtomicInteger snapshotVersion = new AtomicInteger(0);

final int amount = persisSnapshot(source, payload, sessionId, snapshotVersion);
persistVersion(source, snapshotVersion.get(), sessionId);
Expand All @@ -71,12 +81,14 @@ public void doImport(final String source,
}

private int persisSnapshot(final String source, final byte[] payload, final String sessionId, final AtomicInteger snapshotVersion){
persistDummyObjectIfNotExist(source);
final AtomicInteger processedCount = new AtomicInteger(0);
final Timer timer = new Timer();
printProgress(timer, processedCount);

//Transaction annotation does not work with any threaded processing methods
final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition());
try {
persistDummyObjectIfNotExist(source);
GzipDecompressor.decompressRecords(
payload,
firstRecord -> {
Expand All @@ -85,11 +97,13 @@ private int persisSnapshot(final String source, final byte[] payload, final Stri
},
recordBatches -> persistBatches(recordBatches, processedCount)
);
} catch (IllegalArgumentException ex){
} catch (Exception ex){
LOGGER.error("Error persisting snapshot", ex);
throw ex;
nrtmClientUpdateTransaction.rollback(transactionStatus);
throw new IllegalStateException(ex);
}

nrtmClientUpdateTransaction.commit(transactionStatus);
timer.cancel();
return processedCount.get();
}
Expand Down Expand Up @@ -130,7 +144,7 @@ private void persistBatches(final String[] remainingRecords, final AtomicInteger
nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue());
processedCount.incrementAndGet();
} catch (Exception e) {
LOGGER.error("Unable to parse record {}", record, e);
LOGGER.error("Unable to process record {}", record, e);
throw new IllegalStateException(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,10 @@ public void processFile(){
}

private void processSnapshot(final String source, final NrtmClientVersionInfo nrtmClientLastVersionInfo, final UpdateNotificationFileResponse updateNotificationFile) {

if (nrtmClientLastVersionInfo != null){
return;
}
try {
LOGGER.info("There is no existing Snapshot for the source {}", source);
snapshotImporter.doImport(source, updateNotificationFile.getSessionID(), updateNotificationFile.getSnapshot());
} catch (Exception ex) {
snapshotImporter.truncateTables();
LOGGER.error("There was an issue importing the records", ex);
throw ex;
}
snapshotImporter.doImport(source, updateNotificationFile.getSessionID(), updateNotificationFile.getSnapshot());
}

private void persistVersion(final String source, final UpdateNotificationFileResponse updateNotificationFile,
Expand Down

0 comments on commit 56f5b59

Please sign in to comment.