From 01c433c6dffcfd8bf699313542f2e386b87d4627 Mon Sep 17 00:00:00 2001 From: umustafi Date: Thu, 10 Aug 2023 15:04:53 -0700 Subject: [PATCH] [GOBBLIN-1859] Multi-active Unit Test for Multi-Participant state (#3721) * new unit tests passing * clean up * change upsert to insert * catch sql exception & update test * Refactor to create better api for testing --------- Co-authored-by: Urmi Mustafi --- .../api/MysqlMultiActiveLeaseArbiter.java | 182 +++++++++++------- .../api/MysqlMultiActiveLeaseArbiterTest.java | 95 ++++++++- 2 files changed, 202 insertions(+), 75 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 2cdcf71ce11..964a29851e9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -18,21 +18,19 @@ package org.apache.gobblin.runtime.api; import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.typesafe.config.Config; +import com.zaxxer.hikari.HikariDataSource; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; import java.sql.Timestamp; - -import com.google.inject.Inject; -import com.typesafe.config.Config; -import com.zaxxer.hikari.HikariDataSource; - import javax.sql.DataSource; import lombok.Data; import lombok.extern.slf4j.Slf4j; - import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.MysqlDataSourceFactory; @@ -89,6 +87,8 @@ protected interface CheckedFunction { private final int linger; private String thisTableGetInfoStatement; private String thisTableSelectAfterInsertStatement; + private String thisTableAcquireLeaseIfMatchingAllStatement; + private String thisTableAcquireLeaseIfFinishedStatement; // TODO: define retention on this table private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" @@ -120,9 +120,9 @@ protected interface CheckedFunction { + "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; // Insert or update row to acquire lease if values have not changed since the previous read // Need to define three separate statements to handle cases where row does not exist or has null values to check - protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, " - + "flow_name, flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) " - + "VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"; + protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, " + + "flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, ?, " + + "CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"; protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s " + "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP " + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL"; @@ -154,6 +154,10 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException { this.constantsTableName); this.thisTableSelectAfterInsertStatement = String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName, this.constantsTableName); + this.thisTableAcquireLeaseIfMatchingAllStatement = + String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, this.leaseArbiterTableName); + this.thisTableAcquireLeaseIfFinishedStatement = + String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName); this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); String createArbiterStatement = String.format( CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName); @@ -186,38 +190,14 @@ private void initializeConstantsTable() throws IOException { @Override public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException { - // Check table for an existing entry for this flow action and event time - Optional getResult = withPreparedStatement(thisTableGetInfoStatement, - getInfoStatement -> { - int i = 0; - getInfoStatement.setString(++i, flowAction.getFlowGroup()); - getInfoStatement.setString(++i, flowAction.getFlowName()); - getInfoStatement.setString(++i, flowAction.getFlowExecutionId()); - getInfoStatement.setString(++i, flowAction.getFlowActionType().toString()); - ResultSet resultSet = getInfoStatement.executeQuery(); - try { - if (!resultSet.next()) { - return Optional.absent(); - } - return Optional.of(createGetInfoResult(resultSet)); - } finally { - if (resultSet != null) { - resultSet.close(); - } - } - }, true); + // Query lease arbiter table about this flow action + Optional getResult = getExistingEventInfo(flowAction); try { if (!getResult.isPresent()) { log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no existing row for this flow action, then go" + " ahead and insert", flowAction, eventTimeMillis); - String formattedAcquireLeaseNewRowStatement = - String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName); - int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseNewRowStatement, - insertStatement -> { - completeInsertPreparedStatement(insertStatement, flowAction); - return insertStatement.executeUpdate(); - }, true); + int numRowsUpdated = attemptLeaseIfNewRow(flowAction); return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent()); } @@ -257,14 +237,8 @@ else if (leaseValidityStatus == 2) { dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger); } // Use our event to acquire lease, check for previous db eventTimestamp and leaseAcquisitionTimestamp - String formattedAcquireLeaseIfMatchingAllStatement = - String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, this.leaseArbiterTableName); - int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement, - insertStatement -> { - completeUpdatePreparedStatement(insertStatement, flowAction, true, - true, dbEventTimestamp, dbLeaseAcquisitionTimestamp); - return insertStatement.executeUpdate(); - }, true); + int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, flowAction, + true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp); return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp)); } // No longer leasing this event if (isWithinEpsilon) { @@ -275,20 +249,39 @@ else if (leaseValidityStatus == 2) { log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing event in " + "db", flowAction, dbCurrentTimestamp.getTime()); // Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp - String formattedAcquireLeaseIfFinishedStatement = - String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName); - int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseIfFinishedStatement, - insertStatement -> { - completeUpdatePreparedStatement(insertStatement, flowAction, true, - false, dbEventTimestamp, null); - return insertStatement.executeUpdate(); - }, true); + int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction, + true, false, dbEventTimestamp, null); return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp)); } catch (SQLException e) { throw new RuntimeException(e); } } + /** + * Checks leaseArbiterTable for an existing entry for this flow action and event time + */ + protected Optional getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException { + return withPreparedStatement(thisTableGetInfoStatement, + getInfoStatement -> { + int i = 0; + getInfoStatement.setString(++i, flowAction.getFlowGroup()); + getInfoStatement.setString(++i, flowAction.getFlowName()); + getInfoStatement.setString(++i, flowAction.getFlowExecutionId()); + getInfoStatement.setString(++i, flowAction.getFlowActionType().toString()); + ResultSet resultSet = getInfoStatement.executeQuery(); + try { + if (!resultSet.next()) { + return Optional.absent(); + } + return Optional.of(createGetInfoResult(resultSet)); + } finally { + if (resultSet != null) { + resultSet.close(); + } + } + }, true); + } + protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException { try { // Extract values from result set @@ -313,7 +306,63 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE } } - protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException { + /** + * Called by participant to try to acquire lease for a flow action that does not have an attempt in progress or in + * near past for it. + * @return int corresponding to number of rows updated by INSERT statement to acquire lease + */ + protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException { + String formattedAcquireLeaseNewRowStatement = + String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName); + return withPreparedStatement(formattedAcquireLeaseNewRowStatement, + insertStatement -> { + completeInsertPreparedStatement(insertStatement, flowAction); + try { + return insertStatement.executeUpdate(); + } catch (SQLIntegrityConstraintViolationException e) { + if (!e.getMessage().contains("Duplicate entry")) { + throw e; + } + return 0; + } + }, true); + } + + /** + * Called by participant to try to acquire lease for a flow action that has an existing, completed, or expired lease + * attempt for the flow action in the table. + * @return int corresponding to number of rows updated by INSERT statement to acquire lease + */ + protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction, + boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp, + Timestamp dbLeaseAcquisitionTimestamp) throws IOException { + return withPreparedStatement(acquireLeaseStatement, + insertStatement -> { + completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck, + needLeaseAcquisition, dbEventTimestamp, dbLeaseAcquisitionTimestamp); + return insertStatement.executeUpdate(); + }, true); + } + + /** + * Checks leaseArbiter table for a row corresponding to this flow action to determine if the lease acquisition attempt + * was successful or not. + */ + protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException { + return withPreparedStatement(thisTableSelectAfterInsertStatement, + selectStatement -> { + completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction); + ResultSet resultSet = selectStatement.executeQuery(); + try { + return createSelectInfoResult(resultSet); + } finally { + if (resultSet != null) { + resultSet.close(); + } + } + }, true); + } + protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException { try { if (!resultSet.next()) { throw new IOException("Expected num rows and lease_acquisition_timestamp returned from query but received nothing, so " @@ -347,24 +396,15 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, DagActionStore.DagAction flowAction, Optional dbCurrentTimestamp) throws SQLException, IOException { // Fetch values in row after attempted insert - SelectInfoResult selectInfoResult = withPreparedStatement(thisTableSelectAfterInsertStatement, - selectStatement -> { - completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction); - ResultSet resultSet = selectStatement.executeQuery(); - try { - return createSelectInfoResult(resultSet); - } finally { - if (resultSet != null) { - resultSet.close(); - } - } - }, true); + SelectInfoResult selectInfoResult = getRowInfo(flowAction); if (numRowsUpdated == 1) { log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", flowAction, selectInfoResult.eventTimeMillis); return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis, selectInfoResult.getLeaseAcquisitionTimeMillis()); } + log.debug("Another participant acquired lease in between for [{}, eventTimestamp: {}] - num rows updated: ", + flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated); // Another participant acquired lease in between return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis() + selectInfoResult.getDbLinger() @@ -377,8 +417,8 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated, * @param flowAction * @throws SQLException */ - protected void completeInsertPreparedStatement(PreparedStatement statement, DagActionStore.DagAction flowAction) - throws SQLException { + protected static void completeInsertPreparedStatement(PreparedStatement statement, + DagActionStore.DagAction flowAction) throws SQLException { int i = 0; // Values to set in new row statement.setString(++i, flowAction.getFlowGroup()); @@ -393,8 +433,8 @@ protected void completeInsertPreparedStatement(PreparedStatement statement, DagA * @param flowAction * @throws SQLException */ - protected void completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement, DagActionStore.DagAction flowAction) - throws SQLException { + protected static void completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement, + DagActionStore.DagAction flowAction) throws SQLException { int i = 0; statement.setString(++i, flowAction.getFlowGroup()); statement.setString(++i, flowAction.getFlowName()); @@ -413,8 +453,8 @@ protected void completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement * @param originalLeaseAcquisitionTimestamp value to compare to db one, null if not needed * @throws SQLException */ - protected void completeUpdatePreparedStatement(PreparedStatement statement, DagActionStore.DagAction flowAction, - boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck, + protected static void completeUpdatePreparedStatement(PreparedStatement statement, + DagActionStore.DagAction flowAction, boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck, Timestamp originalEventTimestamp, Timestamp originalLeaseAcquisitionTimestamp) throws SQLException { int i = 0; // Values to check if existing row matches previous read diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java index 3ede1ce835c..1eb872586d0 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java @@ -18,6 +18,8 @@ package org.apache.gobblin.runtime.api; import com.typesafe.config.Config; +import java.io.IOException; +import java.sql.Timestamp; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -27,6 +29,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*; + @Slf4j public class MysqlMultiActiveLeaseArbiterTest { private static final int EPSILON = 30000; @@ -37,11 +41,17 @@ public class MysqlMultiActiveLeaseArbiterTest { private static final String flowGroup = "testFlowGroup"; private static final String flowName = "testFlowName"; private static final String flowExecutionId = "12345677"; + // The following are considered unique because they correspond to different flow action types private static DagActionStore.DagAction launchDagAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH); - + private static DagActionStore.DagAction resumeDagAction = + new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.RESUME); private static final long eventTimeMillis = System.currentTimeMillis(); private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter; + private String formattedAcquireLeaseIfMatchingAllStatement = + String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, TABLE); + private String formattedAcquireLeaseIfFinishedStatement = + String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, TABLE); // The setup functionality verifies that the initialization of the tables is done correctly and verifies any SQL // syntax errors. @@ -50,12 +60,12 @@ public void setUp() throws Exception { ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get(); Config config = ConfigBuilder.create() - .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, EPSILON) - .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, LINGER) + .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY, EPSILON) + .addPrimitive(ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY, LINGER) .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl()) .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER) .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD) - .addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE) + .addPrimitive(ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY, TABLE) .build(); this.mysqlMultiActiveLeaseArbiter = new MysqlMultiActiveLeaseArbiter(config); @@ -143,4 +153,81 @@ public void testAcquireLeaseSingleParticipant() throws Exception { Assert.assertTrue(sixthObtainedStatus.getEventTimestamp() <= sixthObtainedStatus.getLeaseAcquisitionTimestamp()); } + + /* + Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row matches the primary key in the table. + If such a row does exist, the method should disregard the resulting SQL error and return 0 rows updated, indicating + the lease was not acquired. + Note: this isolates and tests CASE 1 in which another participant could have acquired the lease between the time + the read was done and subsequent write was carried out + */ + @Test (dependsOnMethods = "testAcquireLeaseSingleParticipant") + public void testAcquireLeaseIfNewRow() throws IOException { + // Inserting the first time should update 1 row + Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction), 1); + // Inserting the second time should not update any rows + Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction), 0); + } + + /* + Tests CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT to ensure insertion is not completed if another + participant updated the table between the prior reed and attempted insertion. + Note: this isolates and tests CASE 4 in which a flow action event has an out of date lease, so a participant + attempts a new one given the table the eventTimestamp and leaseAcquisitionTimestamp values are unchanged. + */ + @Test (dependsOnMethods = "testAcquireLeaseIfNewRow") + public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IOException { + MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = + this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); + + // The following insert will fail since the eventTimestamp does not match + int numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow( + formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true, + new Timestamp(99999), new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis())); + Assert.assertEquals(numRowsUpdated, 0); + + // The following insert will fail since the leaseAcquisitionTimestamp does not match + numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow( + formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true, + new Timestamp(selectInfoResult.getEventTimeMillis()), new Timestamp(99999)); + Assert.assertEquals(numRowsUpdated, 0); + + // This insert should work since the values match all the columns + numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow( + formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true, + new Timestamp(selectInfoResult.getEventTimeMillis()), + new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis())); + Assert.assertEquals(numRowsUpdated, 1); + } + + /* + Tests CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT to ensure the insertion will only succeed if another + participant has not updated the eventTimestamp state since the prior read. + Note: This isolates and tests CASE 6 during which current participant saw a distinct flow action event had completed + its prior lease, encouraging the current participant to acquire a lease for its event. + */ + @Test (dependsOnMethods = "testConditionallyAcquireLeaseIfFMatchingAllColsStatement") + public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() throws IOException, InterruptedException { + // Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus + MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult = + this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction); + boolean markedSuccess = this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus( + resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis())); + Assert.assertTrue(markedSuccess); + + // Sleep enough time for event to be considered distinct + Thread.sleep(LINGER); + + // The following insert will fail since eventTimestamp does not match the expected + int numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow( + formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false, + new Timestamp(99999), null); + Assert.assertEquals(numRowsUpdated, 0); + + // This insert does match since we utilize the right eventTimestamp + numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow( + formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false, + new Timestamp(selectInfoResult.getEventTimeMillis()), null); + Assert.assertEquals(numRowsUpdated, 1); + } }