Skip to content

Commit

Permalink
Bringing int in sync with master (#340)
Browse files Browse the repository at this point in the history
* Updated v1 pipeline rules (#309)

* Updated deployment group for E2E env

* V1 branch rules updated (#308)

* Updated deployment group for E2E env (#305)

* updated rules as per v1_* branches

* 1.1.2 fixing SNS publish batch-size issue (#339)

* Updated deployment group for E2E env
* V1 branch rules updated (#308)
* Updated deployment group for E2E env (#305)
* updated rules as per v1_* branches
* Fixed SNS publish batch-size issue
* debug pipeline failure
* include hidden file in artifact

* Fixed SNS publish batch-size issue
  • Loading branch information
hirenkp2000 authored Sep 10, 2024
1 parent 94a53cb commit dee6652
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ public void run() {
return;
}
// Handover transactions to appropriate Publisher (e.g. DefaultSNSPublisher)
PublishEntryDTO lastTxn = null;
final PublishEntryDTO lastTxn = new PublishEntryDTO(lastTxnId, lastTxnRecId);
try {
lastTxn = PubUtil.getPubInstance(sub).publishTransactions(pubCfg, sub, txnList, lastTxnId, lastTxnRecId);
PubUtil.getPubInstance(sub).publishTransactions(pubCfg, sub, txnList, lastTxn);
} finally {
if (lastTxn != null) {
// Update last txn_id in AdminDB::xyz_config::xyz_txn_pub table
PubDatabaseHandler.saveLastTxnId(adminDBConnParams, subId, lastTxn);
}
// Update last txn_id in AdminDB::xyz_config::xyz_txn_pub table
PubDatabaseHandler.saveLastTxnId(adminDBConnParams, subId, lastTxn);
}
} catch (Exception ex) {
logger.error("{} - Exception in publisher job for subId={}, spaceId={}. ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ public class DefaultSNSBatchPublisher implements IPublisher {

// Convert and publish transactions to desired SNS Topic
@Override
public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub,
public void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final long lastStoredTxnId, final long lastStoredTxnRecId) throws Exception {
final PublishEntryDTO pubDTO) throws Exception {
final String subId = sub.getId();
final String spaceId = sub.getSource();
final String snsTopic = PubUtil.getSnsTopicARN(sub);
final long lotStartTS = System.currentTimeMillis();
// local counters
final PublishEntryDTO pubDTO = new PublishEntryDTO(lastStoredTxnId, lastStoredTxnRecId);
int publishedRecCnt = 0;
// Variables for batch publish
final int TXN_LIST_SIZE = txnList.size();
Expand Down Expand Up @@ -106,8 +105,6 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri
logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}",
snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId());
}

return pubDTO;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ public class DefaultSNSSinglePublisher implements IPublisher {

// Convert and publish transactions to desired SNS Topic
@Override
public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub,
public void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final long lastStoredTxnId, final long lastStoredTxnRecId) throws Exception {
final PublishEntryDTO pubDTO) throws Exception {
final String subId = sub.getId();
final String spaceId = sub.getSource();
final String snsTopic = PubUtil.getSnsTopicARN(sub);
final long lotStartTS = System.currentTimeMillis();
// local counters
final PublishEntryDTO pubDTO = new PublishEntryDTO(lastStoredTxnId, lastStoredTxnRecId);
long publishedRecCnt = 0;

try {
Expand Down Expand Up @@ -82,8 +81,6 @@ public PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscri
logger.info("Transaction publish stats for SNS [{}] [format => eventType,subId,spaceId,msgCount,timeTakenMs,lastTxnId,lastTxnRecId] - {} {} {} {} {} {} {}",
snsTopic, PubLogConstants.LOG_EVT_TXN_PUBLISH_STATS, subId, spaceId, publishedRecCnt, lotTimeTaken, pubDTO.getLastTxnId(), pubDTO.getLastTxnRecId());
}

return pubDTO;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.util.List;

public interface IPublisher {
PublishEntryDTO publishTransactions(final PubConfig pubCfg, final Subscription sub,
void publishTransactions(final PubConfig pubCfg, final Subscription sub,
final List<PubTransactionData> txnList,
final long lastTxnId, final long lastTxnRecId) throws Exception;
final PublishEntryDTO pubDTO) throws Exception;

}

0 comments on commit dee6652

Please sign in to comment.