diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/dao/Nrtm4ClientRepository.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/dao/Nrtm4ClientRepository.java index 1ba96d2992..bbe131b4a8 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/dao/Nrtm4ClientRepository.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/dao/Nrtm4ClientRepository.java @@ -1,6 +1,5 @@ package net.ripe.db.nrtm4.client.dao; -import com.fasterxml.jackson.core.JsonProcessingException; import net.ripe.db.nrtm4.client.client.MirrorSnapshotInfo; import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition; import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration; @@ -91,7 +90,7 @@ public void createIndexes(final RpslObject rpslObject, final RpslObjectUpdateInf insertIntoTablesIgnoreMissing(jdbcMasterTemplate, rpslObjectUpdateInfo, rpslObject); } - public Map.Entry processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) throws JsonProcessingException { + public Map.Entry processSnapshotRecord(final MirrorSnapshotInfo mirrorSnapshotInfo) { return Map.entry(mirrorSnapshotInfo.getRpslObject(), persistRpslObject(mirrorSnapshotInfo.getRpslObject())); } diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java index f6286c9b88..0b165d4f9e 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/importer/SnapshotMirrorImporter.java @@ -2,10 +2,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Stopwatch; +import com.hazelcast.org.codehaus.commons.nullanalysis.NotNull; import net.ripe.db.nrtm4.client.client.MirrorSnapshotInfo; import net.ripe.db.nrtm4.client.client.NrtmRestClient; import net.ripe.db.nrtm4.client.client.UpdateNotificationFileResponse; import net.ripe.db.nrtm4.client.condition.Nrtm4ClientCondition; +import net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration; import net.ripe.db.nrtm4.client.dao.Nrtm4ClientInfoRepository; import net.ripe.db.nrtm4.client.dao.Nrtm4ClientRepository; import net.ripe.db.whois.common.dao.RpslObjectUpdateInfo; @@ -19,10 +21,11 @@ import org.springframework.context.annotation.Conditional; import org.springframework.stereotype.Service; import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionManager; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -30,7 +33,7 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static net.ripe.db.nrtm4.client.config.NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION; @@ -46,6 +49,8 @@ public class SnapshotMirrorImporter extends AbstractMirrorImporter { private final PlatformTransactionManager nrtmClientUpdateTransaction; + private final int numberOfThreads; + public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient, final Nrtm4ClientInfoRepository nrtm4ClientInfoMirrorDao, @@ -54,6 +59,9 @@ public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient, super(nrtm4ClientInfoMirrorDao, nrtm4ClientRepository); this.nrtmRestClient = nrtmRestClient; this.nrtmClientUpdateTransaction = transactionManagerNrtmClientUpdate; + + final int numThreads = Runtime.getRuntime().availableProcessors(); + this.numberOfThreads = (int)Math.ceil(numThreads/4.0); } public void doImport(final String source, @@ -77,42 +85,33 @@ public void doImport(final String source, final AtomicInteger snapshotVersion = new AtomicInteger(0); - final int amount = persisSnapshot(source, payload, sessionId, snapshotVersion); - if (amount == 0){ - return; - } + final int amount = persisSnapshotAndCount(source, payload, sessionId, snapshotVersion); persistVersion(source, snapshotVersion.get(), sessionId); stopwatch.stop(); LOGGER.info("Loading snapshot file took {} for source {} and added {} records", stopwatch.elapsed().toMillis(), source, amount); } - private int persisSnapshot(final String source, final byte[] payload, final String sessionId, final AtomicInteger snapshotVersion){ + private int persisSnapshotAndCount(final String source, final byte[] payload, final String sessionId, final AtomicInteger snapshotVersion){ final AtomicInteger processedCount = new AtomicInteger(0); final Timer timer = new Timer(); printProgress(timer, processedCount); - //Transaction annotation does not work with any threaded processing methods - final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition()); - final int numThreads = Runtime.getRuntime().availableProcessors(); - final ExecutorService executor = Executors.newFixedThreadPool((int)Math.ceil(numThreads/4.0)); try { - persistDummyObjectIfNotExist(source); GzipDecompressor.decompressRecords( payload, firstRecord -> { validateSession(sessionId, firstRecord); snapshotVersion.set(extractVersion(firstRecord)); }, - recordBatches -> persistBatches(recordBatches, processedCount, executor) + recordBatches -> persistBatches(recordBatches, processedCount) ); } catch (Exception ex){ LOGGER.error("Error persisting snapshot", ex); - nrtmClientUpdateTransaction.rollback(transactionStatus); - return 0; + throw new IllegalStateException(ex); } - nrtmClientUpdateTransaction.commit(transactionStatus); + persistDummyObjectIfNotExist(source); timer.cancel(); return processedCount.get(); } @@ -122,6 +121,7 @@ final void persistVersion(final String source, final int version, final String s nrtm4ClientInfoRepository.saveSnapshotFileVersion(source, version, sessionId); } + @Transactional(transactionManager = NrtmClientTransactionConfiguration.NRTM_CLIENT_UPDATE_TRANSACTION) private void persistDummyObjectIfNotExist(final String source){ final RpslObject dummyObject = getPlaceholderPersonObject(); if (!source.equals(dummyObject.getValueForAttribute(AttributeType.SOURCE).toString())){ @@ -145,31 +145,59 @@ public void run() { }, 0, 10000); } - private void persistBatches(final String[] remainingRecords, final AtomicInteger processedCount, final ExecutorService executor) { - - final List> futures = Arrays.stream(remainingRecords) - .map(record -> executor.submit(() -> { - try { - final MirrorSnapshotInfo mirrorRpslObject = new ObjectMapper().readValue(record, MirrorSnapshotInfo.class); - final Map.Entry persistedRecord = nrtm4ClientRepository.processSnapshotRecord(mirrorRpslObject); - nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue()); - processedCount.incrementAndGet(); - return null; - } catch (Exception e) { - LOGGER.error("Unable to process record {}", record, e); - throw new IllegalStateException(e); - } - })).toList(); - - for (Future future : futures) { + private void persistBatches(final String[] remainingRecords, final AtomicInteger processedCount) { + final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); + final List batchTransactions = processBatchAndTransactions(remainingRecords, processedCount, executor); + batchTransactions.forEach(nrtmClientUpdateTransaction::commit); + } + + @NotNull + private List processBatchAndTransactions(final String[] remainingRecords, final AtomicInteger processedCount, final ExecutorService executor){ + + //Transaction annotation does not work with any threaded processing methods + final List transactionStatuses = new ArrayList<>(); + try { + Arrays.stream(remainingRecords) + .forEach(record -> { + final TransactionStatus transactionStatus = nrtmClientUpdateTransaction.getTransaction(new DefaultTransactionDefinition()); // create a transaction per record + processRecord(executor, processedCount, record, transactionStatus); // process record + transactionStatuses.add(transactionStatus); // keep the transaction in case there is an issue in another record in the batch + }); + } catch (Exception e){ + transactionStatuses.forEach(nrtmClientUpdateTransaction::rollback); + throw new IllegalStateException("Unable to persist snapshot", e); + } finally { + executor.shutdown(); try { - future.get(); - } catch (Exception e) { - throw new RuntimeException("Error processing records", e); + executor.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + executor.shutdownNow(); // Interrupted while waiting; force shutdown + Thread.currentThread().interrupt(); } } - executor.shutdown(); + return transactionStatuses; + } + + private void processRecord(final ExecutorService executor, final AtomicInteger processedCount, final String record, final TransactionStatus transactionStatus){ + try { + executor.submit(() -> { + try { + final MirrorSnapshotInfo mirrorRpslObject = new ObjectMapper().readValue(record, MirrorSnapshotInfo.class); + final Map.Entry persistedRecord = nrtm4ClientRepository.processSnapshotRecord(mirrorRpslObject); + nrtm4ClientRepository.createIndexes(persistedRecord.getKey(), persistedRecord.getValue()); + processedCount.incrementAndGet(); + } catch (Exception e) { + nrtmClientUpdateTransaction.rollback(transactionStatus); + LOGGER.error("Unable to process record {}", record, e); + throw new IllegalStateException(e); + } + }); + } catch (Exception e) { + nrtmClientUpdateTransaction.rollback(transactionStatus); + LOGGER.error("Unable to process record {}", record, e); + throw new IllegalStateException(e); + } } public static RpslObject getPlaceholderPersonObject() { diff --git a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java index 3b841688ee..0e3f148ca4 100644 --- a/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java +++ b/whois-nrtm4-client/src/main/java/net/ripe/db/nrtm4/client/processor/UpdateNotificationFileProcessor.java @@ -123,8 +123,8 @@ public void processFile(){ deltaImporter.doImport(source, updateNotificationFile.getSessionID(), newDeltas); persistVersion(source, updateNotificationFile, hostname); } catch (Exception ex){ - LOGGER.error("Failed to mirror database", ex); - snapshotImporter.truncateTables(); //clean up in case of error + LOGGER.error("Failed to mirror database, cleaning up the tables", ex); + snapshotImporter.truncateTables(); } }); }