Skip to content

Commit

Permalink
feat: refactor using ExecutorService and a transaction per batch
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelAHM committed Dec 10, 2024
1 parent 9a4f8d8 commit 1b17729
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package net.ripe.db.nrtm4.client.dao;

import com.fasterxml.jackson.core.JsonProcessingException;
import net.ripe.db.nrtm4.client.client.MirrorSnapshotInfo;
import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration;
Expand Down Expand Up @@ -91,7 +90,7 @@ public void createIndexes(final RpslObject rpslObject, final RpslObjectUpdateInf
insertIntoTablesIgnoreMissing(jdbcMasterTemplate, rpslObjectUpdateInfo, rpslObject);
}

public Map.Entry<RpslObject, RpslObjectUpdateInfo> processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) throws JsonProcessingException {
public Map.Entry<RpslObject, RpslObjectUpdateInfo> processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) {
return Map.entry(mirrorSnapshotInfo.getRpslObject(), persistRpslObject(mirrorSnapshotInfo.getRpslObject()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import com.hazelcast.org.codehaus.commons.nullanalysis.NotNull;
import net.ripe.db.nrtm4.client.client.MirrorSnapshotInfo;
import net.ripe.db.nrtm4.client.client.NrtmRestClient;
import net.ripe.db.nrtm4.client.client.UpdateNotificationFileResponse;
import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition;
import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration;
import net.ripe.db.nrtm4.client.dao.Nrtm4ClientInfoRepository;
import net.ripe.db.nrtm4.client.dao.Nrtm4ClientRepository;
import net.ripe.db.whois.common.dao.RpslObjectUpdateInfo;
Expand All @@ -19,18 +21,19 @@
import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION;
Expand All @@ -46,6 +49,8 @@ public class SnapshotMirrorImporter extends AbstractMirrorImporter {

private final PlatformTransactionManager nrtmClientUpdateTransaction;

private final int numberOfThreads;


public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient,
final Nrtm4ClientInfoRepository nrtm4ClientInfoMirrorDao,
Expand All @@ -54,6 +59,9 @@ public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient,
super(nrtm4ClientInfoMirrorDao, nrtm4ClientRepository);
this.nrtmRestClient = nrtmRestClient;
this.nrtmClientUpdateTransaction = transactionManagerNrtmClientUpdate;

final int numThreads = Runtime.getRuntime().availableProcessors();
this.numberOfThreads = (int)Math.ceil(numThreads/4.0);
}

public void doImport(final String source,
Expand All @@ -77,42 +85,33 @@ public void doImport(final String source,

final AtomicInteger snapshotVersion = new AtomicInteger(0);

final int amount = persisSnapshot(source, payload, sessionId, snapshotVersion);
if (amount == 0){
return;
}
final int amount = persisSnapshotAndCount(source, payload, sessionId, snapshotVersion);
persistVersion(source, snapshotVersion.get(), sessionId);

stopwatch.stop();
LOGGER.info("Loading snapshot file took {} for source {} and added {} records", stopwatch.elapsed().toMillis(), source, amount);
}

private int persisSnapshot(final String source, final byte[] payload, final String sessionId, final AtomicInteger snapshotVersion){
private int persisSnapshotAndCount(final String source, final byte[] payload, final String sessionId, final AtomicInteger snapshotVersion){
final AtomicInteger processedCount = new AtomicInteger(0);
final Timer timer = new Timer();
printProgress(timer, processedCount);

//Transaction annotation does not work with any threaded processing methods
final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition());
final int numThreads = Runtime.getRuntime().availableProcessors();
final ExecutorService executor = Executors.newFixedThreadPool((int)Math.ceil(numThreads/4.0));
try {
persistDummyObjectIfNotExist(source);
GzipDecompressor.decompressRecords(
payload,
firstRecord -> {
validateSession(sessionId, firstRecord);
snapshotVersion.set(extractVersion(firstRecord));
},
recordBatches -> persistBatches(recordBatches, processedCount, executor)
recordBatches -> persistBatches(recordBatches, processedCount)
);
} catch (Exception ex){
LOGGER.error("Error persisting snapshot", ex);
nrtmClientUpdateTransaction.rollback(transactionStatus);
return 0;
throw new IllegalStateException(ex);
}

nrtmClientUpdateTransaction.commit(transactionStatus);
persistDummyObjectIfNotExist(source);
timer.cancel();
return processedCount.get();
}
Expand All @@ -122,6 +121,7 @@ final void persistVersion(final String source, final int version, final String s
nrtm4ClientInfoRepository.saveSnapshotFileVersion(source, version, sessionId);
}

@Transactional(transactionManager = NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION)
private void persistDummyObjectIfNotExist(final String source){
final RpslObject dummyObject = getPlaceholderPersonObject();
if (!source.equals(dummyObject.getValueForAttribute(AttributeType.SOURCE).toString())){
Expand All @@ -145,31 +145,59 @@ public void run() {
}, 0, 10000);
}

private void persistBatches(final String[] remainingRecords, final AtomicInteger processedCount, final ExecutorService executor) {

final List<Future<Object>> futures = Arrays.stream(remainingRecords)
.map(record -> executor.submit(() -> {
try {
final MirrorSnapshotInfo mirrorRpslObject = new ObjectMapper().readValue(record, MirrorSnapshotInfo.class);
final Map.Entry<RpslObject, RpslObjectUpdateInfo> persistedRecord = nrtm4ClientRepository.processSnapshotRecord(mirrorRpslObject);
nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue());
processedCount.incrementAndGet();
return null;
} catch (Exception e) {
LOGGER.error("Unable to process record {}", record, e);
throw new IllegalStateException(e);
}
})).toList();

for (Future<Object> future : futures) {
private void persistBatches(final String[] remainingRecords, final AtomicInteger processedCount) {
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final List<TransactionStatus> batchTransactions = processBatchAndTransactions(remainingRecords, processedCount, executor);
batchTransactions.forEach(nrtmClientUpdateTransaction::commit);
}

@NotNull
private List<TransactionStatus> processBatchAndTransactions(final String[] remainingRecords, final AtomicInteger processedCount, final ExecutorService executor){

//Transaction annotation does not work with any threaded processing methods
final List<TransactionStatus> transactionStatuses = new ArrayList<>();
try {
Arrays.stream(remainingRecords)
.forEach(record -> {
final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition()); // create a transaction per record
processRecord(executor, processedCount, record, transactionStatus); // process record
transactionStatuses.add(transactionStatus); // keep the transaction in case there is an issue in another record in the batch
});
} catch (Exception e){
transactionStatuses.forEach(nrtmClientUpdateTransaction::rollback);
throw new IllegalStateException("Unable to persist snapshot", e);
} finally {
executor.shutdown();
try {
future.get();
} catch (Exception e) {
throw new RuntimeException("Error processing records", e);
executor.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
executor.shutdownNow(); // Interrupted while waiting; force shutdown
Thread.currentThread().interrupt();
}
}

executor.shutdown();
return transactionStatuses;
}

private void processRecord(final ExecutorService executor, final AtomicInteger processedCount, final String record, final TransactionStatus transactionStatus){
try {
executor.submit(() -> {
try {
final MirrorSnapshotInfo mirrorRpslObject = new ObjectMapper().readValue(record, MirrorSnapshotInfo.class);
final Map.Entry<RpslObject, RpslObjectUpdateInfo> persistedRecord = nrtm4ClientRepository.processSnapshotRecord(mirrorRpslObject);
nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue());
processedCount.incrementAndGet();
} catch (Exception e) {
nrtmClientUpdateTransaction.rollback(transactionStatus);
LOGGER.error("Unable to process record {}", record, e);
throw new IllegalStateException(e);
}
});
} catch (Exception e) {
nrtmClientUpdateTransaction.rollback(transactionStatus);
LOGGER.error("Unable to process record {}", record, e);
throw new IllegalStateException(e);
}
}

public static RpslObject getPlaceholderPersonObject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public void processFile(){
deltaImporter.doImport(source, updateNotificationFile.getSessionID(), newDeltas);
persistVersion(source, updateNotificationFile, hostname);
} catch (Exception ex){
LOGGER.error("Failed to mirror database", ex);
snapshotImporter.truncateTables(); //clean up in case of error
LOGGER.error("Failed to mirror database, cleaning up the tables", ex);
snapshotImporter.truncateTables();
}
});
}
Expand Down

0 comments on commit 1b17729

Please sign in to comment.