From e8a6b7eeabe52cc1fa6079a15bdc268552dca7b0 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Tue, 29 Oct 2024 21:11:01 +0100 Subject: [PATCH] Log slow processing stream messages Signed-off-by: Paolo Di Tommaso --- .../data/stream/impl/RedisMessageStream.groovy | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy b/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy index 609b71061..c417568b3 100644 --- a/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy +++ b/src/main/groovy/io/seqera/wave/service/data/stream/impl/RedisMessageStream.groovy @@ -62,6 +62,9 @@ class RedisMessageStream implements MessageStream { @Value('${wave.message-stream.claim-timeout:5s}') private Duration claimTimeout + @Value('${wave.message-stream.consume-warn-timeout-millis:4000}') + private long consumeWarnTimeoutMillis + private String consumerName @PostConstruct @@ -102,11 +105,17 @@ class RedisMessageStream implements MessageStream { @Override boolean consume(String streamId, MessageConsumer consumer) { try (Jedis jedis = pool.getResource()) { + String msg + final long begin = System.currentTimeMillis() final entry = claimMessage(jedis,streamId) ?: readMessage(jedis, streamId) - if( entry && consumer.accept(entry.getFields().get(DATA_FIELD)) ) { + if( entry && consumer.accept(msg=entry.getFields().get(DATA_FIELD)) ) { final tx = jedis.multi() // acknowledge the entry has been processed so that it cannot be claimed anymore tx.xack(streamId, CONSUMER_GROUP_NAME, entry.getID()) + final delta = System.currentTimeMillis()-begin + if( delta>consumeWarnTimeoutMillis ) { + log.warn "Redis message stream - consume processing took ${Duration.ofMillis(delta)} - offending entry=${entry.getID()}; message=${msg}" + } // this remove permanently the entry from the stream tx.xdel(streamId, entry.getID()) tx.exec()