Skip to content

Commit

Permalink
Log slow processing stream messages
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Oct 29, 2024
1 parent d42bcae commit e8a6b7e
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class RedisMessageStream implements MessageStream<String> {
@Value('${wave.message-stream.claim-timeout:5s}')
private Duration claimTimeout

@Value('${wave.message-stream.consume-warn-timeout-millis:4000}')
private long consumeWarnTimeoutMillis

private String consumerName

@PostConstruct
Expand Down Expand Up @@ -102,11 +105,17 @@ class RedisMessageStream implements MessageStream<String> {
@Override
boolean consume(String streamId, MessageConsumer<String> consumer) {
try (Jedis jedis = pool.getResource()) {
String msg
final long begin = System.currentTimeMillis()
final entry = claimMessage(jedis,streamId) ?: readMessage(jedis, streamId)
if( entry && consumer.accept(entry.getFields().get(DATA_FIELD)) ) {
if( entry && consumer.accept(msg=entry.getFields().get(DATA_FIELD)) ) {
final tx = jedis.multi()
// acknowledge the entry has been processed so that it cannot be claimed anymore
tx.xack(streamId, CONSUMER_GROUP_NAME, entry.getID())
final delta = System.currentTimeMillis()-begin
if( delta>consumeWarnTimeoutMillis ) {
log.warn "Redis message stream - consume processing took ${Duration.ofMillis(delta)} - offending entry=${entry.getID()}; message=${msg}"
}
// this remove permanently the entry from the stream
tx.xdel(streamId, entry.getID())
tx.exec()
Expand Down

0 comments on commit e8a6b7e

Please sign in to comment.