Skip to content

Commit

Permalink
Merge upstream changes from develop to feature
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed May 28, 2024
2 parents b1faef1 + d9ef852 commit d6d3d0a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>3.1.5-SNAPSHOT</version>
<version>3.1.7-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,11 @@ private void rescheduleSendMessageAfterFailure(final String queueName, int retry

vertx.setTimer(retryInSeconds * 1000L, timerId -> {
if (dequeueStatisticEnabled) {
long retryDelayInMills = retryInSeconds * 1000L;
dequeueStatistic.get(queueName).setNextDequeueDueTimestamp(System.currentTimeMillis() + retryDelayInMills);
dequeueStatistic.computeIfPresent(queueName, (s, dequeueStatistic) -> {
long retryDelayInMills = retryInSeconds * 1000L;
dequeueStatistic.setNextDequeueDueTimestamp(System.currentTimeMillis() + retryDelayInMills);
return dequeueStatistic;
});
}
if (log.isDebugEnabled()) {
log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis()));
Expand Down Expand Up @@ -1027,8 +1030,11 @@ private void processMessageWithTimeout(final String queue, final String payload,
if (reply.succeeded()) {
success = OK.equals(reply.result().body().getString(STATUS));
if (success && dequeueStatisticEnabled) {
dequeueStatistic.get(queue).setLastDequeueSuccessTimestamp(System.currentTimeMillis());
dequeueStatistic.get(queue).setNextDequeueDueTimestamp(null);
dequeueStatistic.computeIfPresent(queue, (s, dequeueStatistic) -> {
dequeueStatistic.setLastDequeueSuccessTimestamp(System.currentTimeMillis());
dequeueStatistic.setNextDequeueDueTimestamp(null);
return dequeueStatistic;
});
}
} else {
log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}",
Expand Down
179 changes: 108 additions & 71 deletions src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.redisques.util;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand All @@ -19,6 +18,7 @@
import org.swisspush.redisques.exception.NoStacktraceException;
import org.swisspush.redisques.performance.UpperBoundParallel;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -29,7 +29,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static java.lang.System.currentTimeMillis;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
Expand Down Expand Up @@ -459,7 +458,6 @@ JsonObject getAsJsonObject() {
* registered). Therefore this method must be used with care and not be called too often!
*
* @param queues The queues for which we are interested in the statistics
*
* @return A Future
*/

Expand All @@ -482,7 +480,9 @@ public Future<JsonObject> getQueueStatistics(final List<String> queues) {
return promise.future();
}

/** <p>init redis connection.</p> */
/**
* <p>init redis connection.</p>
*/
Future<Void> step1(RequestCtx ctx) {
final Promise<Void> promise = Promise.promise();
redisProvider.connection()
Expand All @@ -497,54 +497,93 @@ Future<Void> step1(RequestCtx ctx) {
return promise.future();
}

class WorkerThreadTask {
private final String queueName;
private final RequestCtx ctx;

WorkerThreadTask(RequestCtx ctx, String queueName) {
this.queueName = queueName;
this.ctx = ctx;
}

Future<NumberType> execute() {
// switch to a worker thread
return vertx.executeBlocking(promise -> {
ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queueName)).onComplete(event -> {
if (event.failed()) {
promise.fail(event.cause());
return;
}
promise.complete((NumberType) event.result());
});
});
}
}

/** <p>Query queue lengths.</p> */
Future<Void> step2(RequestCtx ctx) {
assert ctx.conn != null;
final Promise<Void> promise = Promise.promise();
int numQueues = ctx.queueNames.size();
String fmt1 = "About to perform {} requests to redis just for monitoring";
if (numQueues > 256) {
log.warn(fmt1, numQueues);
} else {
log.debug(fmt1, numQueues);
}
long begRedisRequestsEpochMs = currentTimeMillis();
List<Future> responses = ctx.queueNames.stream()
.map(queue -> ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queue)))
.collect(Collectors.toList());
CompositeFuture.all(responses).onComplete( ev -> {
long durRedisRequestsMs = currentTimeMillis() - begRedisRequestsEpochMs;
String fmt2 = "All those {} redis requests took {}ms";
if (durRedisRequestsMs > 3000) {
log.warn(fmt2, numQueues, durRedisRequestsMs);

return vertx.executeBlocking(executeBlockingPromise -> {
String fmt1 = "About to perform {} requests to redis just for monitoring";
if (numQueues > 256) {
log.warn(fmt1, numQueues);
} else {
log.debug(fmt2, numQueues, durRedisRequestsMs);
log.debug(fmt1, numQueues);
}
if (ev.failed()) {
promise.fail(new Exception("Unexpected queue length result", ev.cause()));
return;
}
List<NumberType> queueLengthList = ev.result().list();
if (queueLengthList == null) {
promise.fail("Unexpected queue length result: null");
return;
}
if (queueLengthList.size() != ctx.queueNames.size()) {
String err = "Unexpected queue length result with unequal size " +
ctx.queueNames.size() + " : " + queueLengthList.size();
promise.fail(err);
return;
}
ctx.queueLengthList = queueLengthList;
promise.complete();
long begRedisRequestsEpochMs = currentTimeMillis();

List<WorkerThreadTask> workerThreadTaskList = new ArrayList<>();

ctx.queueNames.forEach(queueName -> workerThreadTaskList.add(new WorkerThreadTask(ctx, queueName)));

Future<List<NumberType>> startFuture = Future.succeededFuture(new ArrayList<>());
// chain the futures sequentially to execute tasks
Future<List<NumberType>> resultFuture = workerThreadTaskList.stream()
.reduce(startFuture, (future, workerThreadTask) -> future.compose(previousResults -> {
// perform asynchronous task
return workerThreadTask.execute().compose(taskResult -> {
// append task result to previous results
previousResults.add(taskResult);
return Future.succeededFuture(previousResults);
});
}), (a, b) -> Future.succeededFuture());


resultFuture.onComplete(ev -> {
long durRedisRequestsMs = currentTimeMillis() - begRedisRequestsEpochMs;
String fmt2 = "All those {} redis requests took {}ms";
if (durRedisRequestsMs > 3000) {
log.warn(fmt2, numQueues, durRedisRequestsMs);
} else {
log.debug(fmt2, numQueues, durRedisRequestsMs);
}
if (ev.failed()) {
executeBlockingPromise.fail(new Exception("Unexpected queue length result", ev.cause()));
return;
}
List<NumberType> queueLengthList = ev.result();
if (queueLengthList == null) {
executeBlockingPromise.fail("Unexpected queue length result: null");
return;
}
if (queueLengthList.size() != ctx.queueNames.size()) {
String err = "Unexpected queue length result with unequal size " +
ctx.queueNames.size() + " : " + queueLengthList.size();
executeBlockingPromise.fail(err);
return;
}
ctx.queueLengthList = queueLengthList;
executeBlockingPromise.complete();
});

});
return promise.future();
}

/** <p>init queue statistics.</p> */
Future<Void> step3(RequestCtx ctx) {
assert ctx.queueLengthList != null;
final Promise<Void> promise = Promise.promise();
// populate the list of queue statistics in a Hashmap for later fast merging
ctx.statistics = new HashMap<>(ctx.queueNames.size());
for (int i = 0; i < ctx.queueNames.size(); i++) {
Expand All @@ -553,8 +592,7 @@ Future<Void> step3(RequestCtx ctx) {
qs.setMessageSpeed(getQueueSpeed(qs.queueName));
ctx.statistics.put(qs.queueName, qs);
}
promise.complete();
return promise.future();
return Future.succeededFuture();
}

/** <p>init a resAPI instance we need to get more details.</p> */
Expand All @@ -579,48 +617,47 @@ Future<Void> step4(RequestCtx ctx){
Future<Void> step5(RequestCtx ctx) {
assert ctx.redisAPI != null;
assert ctx.statistics != null;
final Promise<Void> promise = Promise.promise();
ctx.redisAPI.hvals(STATSKEY, statisticsSet -> {
if( statisticsSet == null || statisticsSet.failed() ){
promise.fail(new RuntimeException("statistics queue evaluation failed",

return vertx.executeBlocking(executeBlockingPromise -> ctx.redisAPI.hvals(STATSKEY, statisticsSet -> {
if (statisticsSet == null || statisticsSet.failed()) {
executeBlockingPromise.fail(new RuntimeException("statistics queue evaluation failed",
statisticsSet == null ? null : statisticsSet.cause()));
return;
}
ctx.redisFailStats = statisticsSet.result();
assert ctx.redisFailStats != null;
promise.complete();
});
return promise.future();
executeBlockingPromise.complete();
}));
}

/** <p>put received statistics data to the former prepared statistics objects per
* queue.</p> */
Future<JsonObject> step6(RequestCtx ctx){
assert ctx.redisFailStats != null;
final Promise<JsonObject> promise = Promise.promise();
for (Response response : ctx.redisFailStats) {
JsonObject jObj = new JsonObject(response.toString());
String queueName = jObj.getString(QUEUENAME);
QueueStatistic queueStatistic = ctx.statistics.get(queueName);
if (queueStatistic != null) {
// if it isn't there, there is obviously no statistic needed
queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L));
queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L));
queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L));
return vertx.executeBlocking(executeBlockingPromise -> {
for (Response response : ctx.redisFailStats) {
JsonObject jObj = new JsonObject(response.toString());
String queueName = jObj.getString(QUEUENAME);
QueueStatistic queueStatistic = ctx.statistics.get(queueName);
if (queueStatistic != null) {
// if it isn't there, there is obviously no statistic needed
queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L));
queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L));
queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L));
}
}
}
// build the final resulting statistics list from the former merged queue
// values from various sources
JsonArray result = new JsonArray();
for (String queueName : ctx.queueNames) {
QueueStatistic stats = ctx.statistics.get(queueName);
if (stats != null) {
result.add(stats.getAsJsonObject());
// build the final resulting statistics list from the former merged queue
// values from various sources
JsonArray result = new JsonArray();
for (String queueName : ctx.queueNames) {
QueueStatistic stats = ctx.statistics.get(queueName);
if (stats != null) {
result.add(stats.getAsJsonObject());
}
}
}
promise.complete(new JsonObject().put(STATUS, OK)
.put(RedisquesAPI.QUEUES, result));
return promise.future();
executeBlockingPromise.complete(new JsonObject().put(STATUS, OK)
.put(RedisquesAPI.QUEUES, result));
});
}

/**
Expand Down

0 comments on commit d6d3d0a

Please sign in to comment.