Skip to content

Commit

Permalink
feat: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelAHM committed Dec 10, 2024
1 parent 56f5b59 commit 9a4f8d8
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ private Metadata extractMetadata(final String firstRecord){
private void validateSession(final String metadataSessionId, final String sessionId) throws IllegalArgumentException{
if (!metadataSessionId.equals(sessionId)){
LOGGER.error("The session {} is not the same in the UNF and delta {}", metadataSessionId, sessionId);
truncateTables();
throw new IllegalArgumentException("The session is not the same in the UNF and delta");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import static net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION;
Expand Down Expand Up @@ -74,6 +78,9 @@ public void doImport(final String source,
final AtomicInteger snapshotVersion = new AtomicInteger(0);

final int amount = persisSnapshot(source, payload, sessionId, snapshotVersion);
if (amount == 0){
return;
}
persistVersion(source, snapshotVersion.get(), sessionId);

stopwatch.stop();
Expand All @@ -87,6 +94,8 @@ private int persisSnapshot(final String source, final byte[] payload, final Stri

//Transaction annotation does not work with any threaded processing methods
final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition());
final int numThreads = Runtime.getRuntime().availableProcessors();
final ExecutorService executor = Executors.newFixedThreadPool((int)Math.ceil(numThreads/4.0));
try {
persistDummyObjectIfNotExist(source);
GzipDecompressor.decompressRecords(
Expand All @@ -95,12 +104,12 @@ private int persisSnapshot(final String source, final byte[] payload, final Stri
validateSession(sessionId, firstRecord);
snapshotVersion.set(extractVersion(firstRecord));
},
recordBatches -> persistBatches(recordBatches, processedCount)
recordBatches -> persistBatches(recordBatches, processedCount, executor)
);
} catch (Exception ex){
LOGGER.error("Error persisting snapshot", ex);
nrtmClientUpdateTransaction.rollback(transactionStatus);
throw new IllegalStateException(ex);
return 0;
}

nrtmClientUpdateTransaction.commit(transactionStatus);
Expand Down Expand Up @@ -136,18 +145,31 @@ public void run() {
}, 0, 10000);
}

private void persistBatches(final String[] remainingRecords, final AtomicInteger processedCount) {
Arrays.stream(remainingRecords).parallel().forEach(record -> {
private void persistBatches(final String[] remainingRecords, final AtomicInteger processedCount, final ExecutorService executor) {

final List<Future<Object>> futures = Arrays.stream(remainingRecords)
.map(record -> executor.submit(() -> {
try {
final MirrorSnapshotInfo mirrorRpslObject = new ObjectMapper().readValue(record, MirrorSnapshotInfo.class);
final Map.Entry<RpslObject, RpslObjectUpdateInfo> persistedRecord = nrtm4ClientRepository.processSnapshotRecord(mirrorRpslObject);
nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue());
processedCount.incrementAndGet();
return null;
} catch (Exception e) {
LOGGER.error("Unable to process record {}", record, e);
throw new IllegalStateException(e);
}
})).toList();

for (Future<Object> future : futures) {
try {
final MirrorSnapshotInfo mirrorRpslObject = new ObjectMapper().readValue(record, MirrorSnapshotInfo.class);
final Map.Entry<RpslObject, RpslObjectUpdateInfo> persistedRecord = nrtm4ClientRepository.processSnapshotRecord(mirrorRpslObject);
nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue());
processedCount.incrementAndGet();
future.get();
} catch (Exception e) {
LOGGER.error("Unable to process record {}", record, e);
throw new IllegalStateException(e);
throw new RuntimeException("Error processing records", e);
}
});
}

executor.shutdown();
}

public static RpslObject getPlaceholderPersonObject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,21 @@ public void processFile(){
return;
}

processSnapshot(source, nrtmClientLastVersionInfo, updateNotificationFile);

final List<UpdateNotificationFileResponse.NrtmFileLink> newDeltas = getNewDeltasFromNotificationFile(source, updateNotificationFile);
deltaImporter.doImport(source, updateNotificationFile.getSessionID(), newDeltas);
persistVersion(source, updateNotificationFile, hostname);
try {
processSnapshot(source, nrtmClientLastVersionInfo, updateNotificationFile);

final List<UpdateNotificationFileResponse.NrtmFileLink> newDeltas = getNewDeltasFromNotificationFile(source, updateNotificationFile);
deltaImporter.doImport(source, updateNotificationFile.getSessionID(), newDeltas);
persistVersion(source, updateNotificationFile, hostname);
} catch (Exception ex){
LOGGER.error("Failed to mirror database", ex);
snapshotImporter.truncateTables(); //clean up in case of error
}
});
}

private void processSnapshot(final String source, final NrtmClientVersionInfo nrtmClientLastVersionInfo, final UpdateNotificationFileResponse updateNotificationFile) {
if (nrtmClientLastVersionInfo != null){
if (nrtmClientLastVersionInfo != null) {
return;
}
snapshotImporter.doImport(source, updateNotificationFile.getSessionID(), updateNotificationFile.getSnapshot());
Expand Down

0 comments on commit 9a4f8d8

Please sign in to comment.