diff --git a/xyz-txn-handler/src/main/java/com/here/xyz/pub/handlers/PubTxnListHandler.java b/xyz-txn-handler/src/main/java/com/here/xyz/pub/handlers/PubTxnListHandler.java index fd8155aea..4ec228c07 100644 --- a/xyz-txn-handler/src/main/java/com/here/xyz/pub/handlers/PubTxnListHandler.java +++ b/xyz-txn-handler/src/main/java/com/here/xyz/pub/handlers/PubTxnListHandler.java @@ -46,14 +46,12 @@ public void run() { return; } // Handover transactions to appropriate Publisher (e.g. DefaultSNSPublisher) - PublishEntryDTO lastTxn = null; + final PublishEntryDTO lastTxn = new PublishEntryDTO(lastTxnId, lastTxnRecId); try { - lastTxn = PubUtil.getPubInstance(sub).publishTransactions(pubCfg, sub, txnList, lastTxnId, lastTxnRecId); + PubUtil.getPubInstance(sub).publishTransactions(pubCfg, sub, txnList, lastTxn); } finally { - if (lastTxn != null) { - // Update last txn_id in AdminDB::xyz_config::xyz_txn_pub table - PubDatabaseHandler.saveLastTxnId(adminDBConnParams, subId, lastTxn); - } + // Update last txn_id in AdminDB::xyz_config::xyz_txn_pub table + PubDatabaseHandler.saveLastTxnId(adminDBConnParams, subId, lastTxn); } } catch (Exception ex) { logger.error("{} - Exception in publisher job for subId={}, spaceId={}. ", diff --git a/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSBatchPublisher.java b/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSBatchPublisher.java index 5154e0eb6..14f568703 100644 --- a/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSBatchPublisher.java +++ b/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSBatchPublisher.java @@ -26,15 +26,14 @@ public class DefaultSNSBatchPublisher implements IPublisher { // Convert and publish transactions to desired SNS Topic @Override - public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub, + public void publishTransactions(final PubConfig pubCfg, final Subscription sub, final List txnList, - final long lastStoredTxnId, final long lastStoredTxnRecId) throws Exception { + final PublishEntryDTO pubDTO) throws Exception { final String subId = sub.getId(); final String spaceId = sub.getSource(); final String snsTopic = PubUtil.getSnsTopicARN(sub); final long lotStartTS = System.currentTimeMillis(); // local counters - final PublishEntryDTO pubDTO = new PublishEntryDTO(lastStoredTxnId, lastStoredTxnRecId); int publishedRecCnt = 0; // Variables for batch publish final int TXN_LIST_SIZE = txnList.size(); @@ -106,8 +105,6 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}", snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId()); } - - return pubDTO; } diff --git a/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSSinglePublisher.java b/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSSinglePublisher.java index 21fe32cff..80e2c987e 100644 --- a/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSSinglePublisher.java +++ b/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/DefaultSNSSinglePublisher.java @@ -25,15 +25,14 @@ public class DefaultSNSSinglePublisher implements IPublisher { // Convert and publish transactions to desired SNS Topic @Override - public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub, + public void publishTransactions(final PubConfig pubCfg, final Subscription sub, final List txnList, - final long lastStoredTxnId, final long lastStoredTxnRecId) throws Exception { + final PublishEntryDTO pubDTO) throws Exception { final String subId = sub.getId(); final String spaceId = sub.getSource(); final String snsTopic = PubUtil.getSnsTopicARN(sub); final long lotStartTS = System.currentTimeMillis(); // local counters - final PublishEntryDTO pubDTO = new PublishEntryDTO(lastStoredTxnId, lastStoredTxnRecId); long publishedRecCnt = 0; try { @@ -82,8 +81,6 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}", snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId()); } - - return pubDTO; } diff --git a/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/IPublisher.java b/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/IPublisher.java index 9e6175c61..5418513ee 100644 --- a/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/IPublisher.java +++ b/xyz-txn-handler/src/main/java/com/here/xyz/pub/impl/IPublisher.java @@ -8,8 +8,8 @@ import java.util.List; public interface IPublisher { - PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub, + void publishTransactions(final PubConfig pubCfg, final Subscription sub, final List txnList, - final long lastTxnId, final long lastTxnRecId) throws Exception; + final PublishEntryDTO pubDTO) throws Exception; }