Skip to content

Commit

Permalink
chore: remove retries
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 committed Nov 13, 2023
1 parent 7a5f80e commit d7e8171
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 60 deletions.
13 changes: 5 additions & 8 deletions src/main/java/com/aws/greengrass/disk/spool/DiskSpool.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,9 @@ public DiskSpool(Topics topics, DiskSpoolDAO dao) {
* @return payload of the MQTT message stored with id
*/
@Override
public SpoolMessage getMessageById(long id) { // TODO support InterruptedException in interface
public SpoolMessage getMessageById(long id) {
try {
return dao.getSpoolMessageById(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (SQLException e) {
logger.atError()
.kv(KV_MESSAGE_ID, id)
Expand All @@ -62,7 +59,7 @@ public void removeMessageById(long id) {
try {
dao.removeSpoolMessageById(id);
logger.atTrace().kv(KV_MESSAGE_ID, id).log("Removed message from Disk Spooler");
} catch (SQLException | InterruptedException e) { // TODO support InterruptedException in interface
} catch (SQLException e) {
logger.atWarn()
.kv(KV_MESSAGE_ID, id)
.cause(e)
Expand All @@ -80,7 +77,7 @@ public void add(long id, SpoolMessage message) throws IOException {
try {
dao.insertSpoolMessage(message);
logger.atTrace().kv(KV_MESSAGE_ID, id).log("Added message to Disk Spooler");
} catch (SQLException | InterruptedException e) { // TODO support InterruptedException in interface
} catch (SQLException e) {
throw new IOException(e);
}
}
Expand All @@ -89,7 +86,7 @@ public void add(long id, SpoolMessage message) throws IOException {
public Iterable<Long> getAllMessageIds() throws IOException {
try {
return dao.getAllSpoolMessageIds();
} catch (SQLException | InterruptedException e) {
} catch (SQLException e) {
throw new IOException(e);
}
}
Expand All @@ -100,7 +97,7 @@ public void initializeSpooler() throws IOException {
dao.initialize();
dao.setUpDatabase();
logger.atInfo().log("Finished setting up Database");
} catch (SQLException | InterruptedException e) { // TODO support InterruptedException in interface
} catch (SQLException e) {
throw new IOException(e);
}
}
Expand Down
63 changes: 18 additions & 45 deletions src/main/java/com/aws/greengrass/disk/spool/DiskSpoolDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.aws.greengrass.util.CrashableFunction;
import com.aws.greengrass.util.LockScope;
import com.aws.greengrass.util.NucleusPaths;
import com.aws.greengrass.util.RetryUtils;
import com.aws.greengrass.util.SerializerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -28,12 +27,9 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTransientException;
import java.sql.Statement;
import java.sql.Types;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -48,13 +44,6 @@ public class DiskSpoolDAO {

private static final Logger logger = LogManager.getLogger(DiskSpoolDAO.class);
private static final ObjectMapper MAPPER = SerializerFactory.getFailSafeJsonObjectMapper();
private static final RetryUtils.RetryConfig sqlStatementRetryConfig =
RetryUtils.RetryConfig.builder()
.initialRetryInterval(Duration.ofMillis(1L))
.maxRetryInterval(Duration.ofMillis(500L))
.maxAttempt(3)
.retryableExceptions(Collections.singletonList(SQLTransientException.class))
.build();
protected static final String DATABASE_CONNECTION_URL = "jdbc:sqlite:%s";
protected static final String DATABASE_FILE_NAME = "spooler.db";
private static final Set<Integer> CORRUPTION_ERROR_CODES = new HashSet<>();
Expand Down Expand Up @@ -121,16 +110,15 @@ public void close() {
*
* @return ordered list of the existing ids in the persistent queue
* @throws SQLException when fails to get SpoolMessage IDs
* @throws InterruptedException if interrupted during execution
*/
public Iterable<Long> getAllSpoolMessageIds() throws SQLException, InterruptedException {
public Iterable<Long> getAllSpoolMessageIds() throws SQLException {
// TODO don't recreate prepared statements every time
return performSqlOperation(conn -> {
try (PreparedStatement stmt = getAllSpoolMessageIdsStatement(conn);
ResultSet rs = stmt.executeQuery()) {
return getIdsFromRs(rs);
}
}, "get-all-spool-message-ids");
});
}

private PreparedStatement getAllSpoolMessageIdsStatement(Connection conn) throws SQLException {
Expand All @@ -144,9 +132,8 @@ private PreparedStatement getAllSpoolMessageIdsStatement(Connection conn) throws
* @param messageId the id of the SpoolMessage
* @return SpoolMessage
* @throws SQLException when fails to get a SpoolMessage by id
* @throws InterruptedException if interrupted during execution
*/
public SpoolMessage getSpoolMessageById(long messageId) throws SQLException, InterruptedException {
public SpoolMessage getSpoolMessageById(long messageId) throws SQLException {
return performSqlOperation(conn -> {
try (PreparedStatement pstmt = getSpoolMessageByIdStatement(conn, messageId);
ResultSet rs = pstmt.executeQuery()) {
Expand All @@ -156,7 +143,7 @@ public SpoolMessage getSpoolMessageById(long messageId) throws SQLException, Int
throw new SQLException(e);
}
}
}, "get-spool-message-by-id");
});
}

private PreparedStatement getSpoolMessageByIdStatement(Connection conn, long messageId) throws SQLException {
Expand All @@ -172,14 +159,13 @@ private PreparedStatement getSpoolMessageByIdStatement(Connection conn, long mes
*
* @param message instance of SpoolMessage
* @throws SQLException when fails to insert SpoolMessage in the database
* @throws InterruptedException if interrupted during execution
*/
public void insertSpoolMessage(SpoolMessage message) throws SQLException, InterruptedException {
public void insertSpoolMessage(SpoolMessage message) throws SQLException {
performSqlOperation(conn -> {
try (PreparedStatement pstmt = insertSpoolMessageStatement(conn, message)) {
return pstmt.executeUpdate();
}
}, "insert-spool-message");
});
}

private PreparedStatement insertSpoolMessageStatement(Connection conn, SpoolMessage message) throws SQLException {
Expand Down Expand Up @@ -241,14 +227,13 @@ private PreparedStatement insertSpoolMessageStatement(Connection conn, SpoolMess
*
* @param messageId the id of the SpoolMessage
* @throws SQLException when fails to remove a SpoolMessage by id
* @throws InterruptedException if interrupted during execution
*/
public void removeSpoolMessageById(Long messageId) throws SQLException, InterruptedException {
public void removeSpoolMessageById(Long messageId) throws SQLException {
performSqlOperation(conn -> {
try (PreparedStatement pstmt = removeSpoolMessageByIdStatement(conn, messageId)) {
return pstmt.executeUpdate();
}
}, "remove-spool-message-by-id");
});
}

private PreparedStatement removeSpoolMessageByIdStatement(Connection conn, long messageId) throws SQLException {
Expand All @@ -268,7 +253,7 @@ public Connection getDbInstance() throws SQLException {
return DriverManager.getConnection(url);
}

protected void setUpDatabase() throws SQLException, InterruptedException {
protected void setUpDatabase() throws SQLException {
String query = "CREATE TABLE IF NOT EXISTS spooler ("
+ "message_id INTEGER PRIMARY KEY, "
+ "retried INTEGER NOT NULL, "
Expand All @@ -290,38 +275,26 @@ protected void setUpDatabase() throws SQLException, InterruptedException {
st.executeUpdate(query);
return null;
}
}, "create-spooler-table");
});
}

@SuppressWarnings("PMD.AvoidCatchingGenericException")
private <T> T performSqlOperation(CrashableFunction<Connection, T, SQLException> operation,
String operationName) throws InterruptedException, SQLException {
private <T> T performSqlOperation(CrashableFunction<Connection, T, SQLException> operation) throws SQLException {
try {
return RetryUtils.runWithRetry(
sqlStatementRetryConfig,
() -> {
try (LockScope ls = LockScope.lock(connectionLock.readLock())) {
return operation.apply(connection);
}
},
operationName,
logger
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
try (LockScope ls = LockScope.lock(connectionLock.readLock())) {
return operation.apply(connection);
}
} catch (SQLException e) {
checkAndHandleCorruption(e);
if (CORRUPTION_ERROR_CODES.contains(e.getErrorCode())) {
recoverFromCorruption();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}

void checkAndHandleCorruption(SQLException e) throws SQLException, InterruptedException {
if (!CORRUPTION_ERROR_CODES.contains(e.getErrorCode())) {
return;
}
void recoverFromCorruption() throws SQLException {
if (!recoverDBLock.tryLock()) {
// corruption recovery in progress
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DiskSpoolDAOTest {
DiskSpoolDAOFake dao;

@BeforeEach
void setUp() throws SQLException, InterruptedException {
void setUp() throws SQLException {
dao = spy(new DiskSpoolDAOFake(currDir.resolve("spooler.db")));
dao.initialize();
dao.setUpDatabase();
Expand All @@ -92,7 +92,7 @@ void tearDown() {
}

@Test
void GIVEN_empty_spooler_WHEN_messages_added_and_removed_from_spooler_THEN_success() throws SQLException, InterruptedException {
void GIVEN_empty_spooler_WHEN_messages_added_and_removed_from_spooler_THEN_success() throws SQLException {
List<Long> messageIds = LongStream.range(0, 100L).boxed().collect(Collectors.toList());

// fill db with messages
Expand Down Expand Up @@ -131,23 +131,23 @@ void GIVEN_empty_spooler_WHEN_messages_added_and_removed_from_spooler_THEN_succe

@ParameterizedTest
@MethodSource("allSpoolerOperations")
void GIVEN_spooler_WHEN_corruption_detected_during_operation_THEN_spooler_recovers(CrashableFunction<DiskSpoolDAO, Void, SQLException> operation) throws SQLException, InterruptedException {
void GIVEN_spooler_WHEN_corruption_detected_during_operation_THEN_spooler_recovers(CrashableFunction<DiskSpoolDAO, Void, SQLException> operation) throws SQLException {
SQLException corruptionException = new SQLException("DB is corrupt", "some state", 11);
dao.getConnection().addExceptionOnUpdate(corruptionException);
assertThrows(SQLException.class, () -> operation.apply(dao));
verify(dao).checkAndHandleCorruption(corruptionException);
verify(dao).recoverFromCorruption();
operation.apply(dao);
}

@ParameterizedTest
@MethodSource("allSpoolerOperations")
void GIVEN_spooler_WHEN_transient_error_during_operation_THEN_operation_retried(CrashableFunction<DiskSpoolDAO, Void, SQLException> operation, ExtensionContext context) throws SQLException, InterruptedException {
void GIVEN_spooler_WHEN_error_during_operation_THEN_exception_thrown(CrashableFunction<DiskSpoolDAO, Void, SQLException> operation, ExtensionContext context) throws SQLException {
ignoreExceptionOfType(context, SQLTransientException.class);
SQLException transientException = new SQLTransientException("Some Transient Error");
dao.getConnection().addExceptionOnUpdate(transientException);
dao.getConnection().addExceptionOnUpdate(transientException);
operation.apply(dao);
verify(dao, never()).checkAndHandleCorruption(transientException);
assertThrows(SQLException.class, () -> operation.apply(dao));
verify(dao, never()).recoverFromCorruption();
}

public static Stream<Arguments> allSpoolerOperations() {
Expand Down

0 comments on commit d7e8171

Please sign in to comment.