Skip to content

Commit

Permalink
Fix review points
Browse files Browse the repository at this point in the history
  • Loading branch information
malakaganga committed Sep 20, 2024
1 parent f2cfc2e commit 35ff8b7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void becameUnresponsive(String nodeId) {
ScheduledExecutorService taskScheduler = dataHolder.getTaskScheduler();
if (taskScheduler != null) {
LOG.info("Shutting down coordinated task scheduler scheduler since the node became unresponsive.");
// Shutdown the task scheduler forcefully since node became unresponsive and need to avoid task duplication.
taskScheduler.shutdownNow();
dataHolder.setTaskScheduler(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ public void storeMembershipEvent(String changedMember, String groupId, List<Stri
storeMembershipEventPreparedStatement.addBatch();
}
storeMembershipEventPreparedStatement.executeBatch();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(StringUtil.removeCRLFCharacters(task) + " executed successfully");
Expand Down Expand Up @@ -218,11 +214,7 @@ public boolean createCoordinatorEntry(String nodeId, String groupId) throws Clus
preparedStatement.setString(2, nodeId);
preparedStatement.setLong(3, System.currentTimeMillis());
int updateCount = preparedStatement.executeUpdate();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_ADD_MESSAGE_ID + " " + nodeId + " executed successfully");
Expand Down Expand Up @@ -288,11 +280,7 @@ public boolean updateCoordinatorHeartbeat(String nodeId, String groupId, long cu
preparedStatementForCoordinatorUpdate.setString(2, nodeId);
preparedStatementForCoordinatorUpdate.setString(3, groupId);
int updateCount = preparedStatementForCoordinatorUpdate.executeUpdate();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_UPDATE_COORDINATOR_HEARTBEAT + "node id " + nodeId
Expand Down Expand Up @@ -368,11 +356,7 @@ public void removeCoordinator(String groupId, int heartbeatMaxAge, long currentH
preparedStatement.setString(1, groupId);
preparedStatement.setLong(2, thresholdTimeLimit);
preparedStatement.executeUpdate();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_REMOVE_COORDINATOR + " of group " + groupId + " executed successfully");
Expand Down Expand Up @@ -409,11 +393,7 @@ public boolean updateNodeHeartbeat(String nodeId, String groupId, long currentHe
preparedStatementForNodeUpdate.setString(2, nodeId);
preparedStatementForNodeUpdate.setString(3, groupId);
int updateCount = preparedStatementForNodeUpdate.executeUpdate();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_UPDATE_NODE_HEARTBEAT + " of node " + nodeId + " executed successfully");
Expand All @@ -436,6 +416,21 @@ public boolean updateNodeHeartbeat(String nodeId, String groupId, long currentHe
}
}

/**
* Method to commit the transaction.
*
* @param connection Connection object
*
*/
private void commitTransactionIfNotInterrupted(Connection connection) throws InterruptedException, SQLException {
// Check if the thread has been interrupted before committing
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
// Commit the transaction
connection.commit();
}

@Override
public void createNodeHeartbeatEntry(String nodeId, String groupId) throws ClusterCoordinationException {
Connection connection = null;
Expand All @@ -450,11 +445,7 @@ public void createNodeHeartbeatEntry(String nodeId, String groupId) throws Clust
preparedStatement.setLong(2, System.currentTimeMillis());
preparedStatement.setString(3, groupId);
preparedStatement.executeUpdate();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_CREATE_NODE_HEARTBEAT + " of node " + nodeId + " executed successfully");
Expand Down Expand Up @@ -614,11 +605,7 @@ public void removeNode(String nodeId, String groupId) throws ClusterCoordination
preparedStatement.setString(1, nodeId);
preparedStatement.setString(2, groupId);
preparedStatement.executeUpdate();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_REMOVE_NODE_HEARTBEAT + " of node "
Expand Down Expand Up @@ -660,11 +647,7 @@ public void insertRemovedNodeDetails(String removedMember, String groupId, List<
storeRemovedMembersPreparedStatement.addBatch();
}
storeRemovedMembersPreparedStatement.executeBatch();
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(StringUtil.removeCRLFCharacters(task) + " executed successfully");
Expand Down Expand Up @@ -700,11 +683,7 @@ public void markNodeAsNotNew(String nodeId, String groupId) throws ClusterCoordi
if (updateCount == 0) {
log.warn("No record was updated while marking node as not new");
}
// Before committing, check if the thread was interrupted
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread was interrupted, avoiding commit.");
}
connection.commit();
commitTransactionIfNotInterrupted(connection);
isTransactionSuccessful = true;
if (log.isDebugEnabled()) {
log.debug(RDBMSConstantUtils.TASK_MARK_NODE_NOT_NEW + " of node "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ private RDBMSCoordinationStrategy(RDBMSCommunicationBusContextImpl communication
}
this.heartbeatMaxRetryInterval = heartBeatInterval * heartbeatMaxRetry;
this.heartbeatWarningMargin = heartbeatMaxRetryInterval * 0.75;
this.maxDBReadTime = (long) (heartbeatWarningMargin / 30);
// maxDBReadTime is set to 1/10th of the heartbeatWarningMargin as for the max possible number of db calls
this.maxDBReadTime = (long) (heartbeatWarningMargin / 10);
this.inactiveIntervalAfterUnresponsive = heartbeatMaxRetryInterval * 2L;

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setPriority(7)
Expand Down

0 comments on commit 35ff8b7

Please sign in to comment.