Skip to content

Commit

Permalink
[GOBBLIN-1859] Multi-active Unit Test for Multi-Participant state (#3721
Browse files Browse the repository at this point in the history
)

* new unit tests passing

* clean up

* change upsert to insert

* catch sql exception & update test

* Refactor to create better api for testing

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi committed Aug 10, 2023
1 parent af48b31 commit 01c433c
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@
package org.apache.gobblin.runtime.api;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;

import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
Expand Down Expand Up @@ -89,6 +87,8 @@ protected interface CheckedFunction<T, R> {
private final int linger;
private String thisTableGetInfoStatement;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
Expand Down Expand Up @@ -120,9 +120,9 @@ protected interface CheckedFunction<T, R> {
+ "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since the previous read
// Need to define three separate statements to handle cases where row does not exist or has null values to check
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, "
+ "flow_name, flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) "
+ "VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, "
+ "flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, ?, "
+ "CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
Expand Down Expand Up @@ -154,6 +154,10 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
this.constantsTableName);
this.thisTableSelectAfterInsertStatement = String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, this.leaseArbiterTableName);
this.thisTableAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName);
this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
String createArbiterStatement = String.format(
CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
Expand Down Expand Up @@ -186,38 +190,14 @@ private void initializeConstantsTable() throws IOException {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
// Check table for an existing entry for this flow action and event time
Optional<GetEventInfoResult> getResult = withPreparedStatement(thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
return Optional.<GetEventInfoResult>absent();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);

try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no existing row for this flow action, then go"
+ " ahead and insert", flowAction, eventTimeMillis);
String formattedAcquireLeaseNewRowStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName);
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
return insertStatement.executeUpdate();
}, true);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent());
}

Expand Down Expand Up @@ -257,14 +237,8 @@ else if (leaseValidityStatus == 2) {
dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
}
// Use our event to acquire lease, check for previous db eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, this.leaseArbiterTableName);
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, flowAction,
true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
if (isWithinEpsilon) {
Expand All @@ -275,20 +249,39 @@ else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing event in "
+ "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName);
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, true,
false, dbEventTimestamp, null);
return insertStatement.executeUpdate();
}, true);
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
true, false, dbEventTimestamp, null);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

/**
* Checks leaseArbiterTable for an existing entry for this flow action and event time
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
return Optional.absent();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
}

protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException {
try {
// Extract values from result set
Expand All @@ -313,7 +306,63 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE
}
}

protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException {
/**
* Called by participant to try to acquire lease for a flow action that does not have an attempt in progress or in
* near past for it.
* @return int corresponding to number of rows updated by INSERT statement to acquire lease
*/
protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException {
String formattedAcquireLeaseNewRowStatement =
String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName);
return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
try {
return insertStatement.executeUpdate();
} catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
throw e;
}
return 0;
}
}, true);
}

/**
* Called by participant to try to acquire lease for a flow action that has an existing, completed, or expired lease
* attempt for the flow action in the table.
* @return int corresponding to number of rows updated by INSERT statement to acquire lease
*/
protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction,
boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp,
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
return withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck,
needLeaseAcquisition, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
}

/**
* Checks leaseArbiter table for a row corresponding to this flow action to determine if the lease acquisition attempt
* was successful or not.
*/
protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
try {
return createSelectInfoResult(resultSet);
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
}
protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException {
try {
if (!resultSet.next()) {
throw new IOException("Expected num rows and lease_acquisition_timestamp returned from query but received nothing, so "
Expand Down Expand Up @@ -347,24 +396,15 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
DagActionStore.DagAction flowAction, Optional<Timestamp> dbCurrentTimestamp)
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
try {
return createSelectInfoResult(resultSet);
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
SelectInfoResult selectInfoResult = getRowInfo(flowAction);
if (numRowsUpdated == 1) {
log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", flowAction,
selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis());
}
log.debug("Another participant acquired lease in between for [{}, eventTimestamp: {}] - num rows updated: ",
flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis() + selectInfoResult.getDbLinger()
Expand All @@ -377,8 +417,8 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
* @param flowAction
* @throws SQLException
*/
protected void completeInsertPreparedStatement(PreparedStatement statement, DagActionStore.DagAction flowAction)
throws SQLException {
protected static void completeInsertPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction) throws SQLException {
int i = 0;
// Values to set in new row
statement.setString(++i, flowAction.getFlowGroup());
Expand All @@ -393,8 +433,8 @@ protected void completeInsertPreparedStatement(PreparedStatement statement, DagA
* @param flowAction
* @throws SQLException
*/
protected void completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement, DagActionStore.DagAction flowAction)
throws SQLException {
protected static void completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction) throws SQLException {
int i = 0;
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
Expand All @@ -413,8 +453,8 @@ protected void completeWhereClauseMatchingKeyPreparedStatement(PreparedStatement
* @param originalLeaseAcquisitionTimestamp value to compare to db one, null if not needed
* @throws SQLException
*/
protected void completeUpdatePreparedStatement(PreparedStatement statement, DagActionStore.DagAction flowAction,
boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
protected static void completeUpdatePreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction, boolean needEventTimeCheck, boolean needLeaseAcquisitionTimeCheck,
Timestamp originalEventTimestamp, Timestamp originalLeaseAcquisitionTimestamp) throws SQLException {
int i = 0;
// Values to check if existing row matches previous read
Expand Down
Loading

0 comments on commit 01c433c

Please sign in to comment.