From 1848242bd96f37a67073f771a710fe82795812d9 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Thu, 3 Nov 2016 14:19:36 +0100 Subject: [PATCH 1/3] redis queue: improve debugging Provide more details on failures. Signed-off-by: Flavio Castelli --- libgearman-server/plugins/queue/redis/queue.cc | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index 2771da44b..e7ef75e5c 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -118,13 +118,13 @@ gearmand_error_t Hiredis::initialize() int service_port= atoi(service.c_str()); if ((_redis= redisConnect("127.0.0.1", service_port)) == NULL) { - return gearmand_gerror("Could not connect to redis server", GEARMAND_QUEUE_ERROR); + return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "Could not connect to redis server: %s", _redis->errstr); } gearmand_info("Initializing hiredis module"); gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay); - + return GEARMAND_SUCCESS; } @@ -234,7 +234,10 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %s", &key[0]); if (reply == NULL) { - return GEARMAND_QUEUE_ERROR; + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "Failed to call DEL for key %s: %s", &key[0], queue->redis()->errstr); } freeReplyObject(reply); @@ -255,7 +258,7 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX); if (reply == NULL) { - return gearmand_gerror("Failed to call KEYS during QUEUE replay", GEARMAND_QUEUE_ERROR); + return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "Failed to call KEYS during QUEUE replay: %s", queue->redis()->errstr); } for (size_t x= 0; x < reply->elements; x++) @@ -287,6 +290,9 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str); if (get_reply == NULL) { + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "GET %s failed: %s", reply->element[x]->str, queue->redis()->errstr); continue; } From 1fd1955fb2f1d23328540af38de7486ee07a9e87 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Thu, 3 Nov 2016 14:20:19 +0100 Subject: [PATCH 2/3] Redis queue: take server address into account The old code didn't consider the Redis server address, the code always attempted to connect to a Redis server running on localhost. Signed-off-by: Flavio Castelli --- libgearman-server/plugins/queue/redis/queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index e7ef75e5c..fa1479e36 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -116,7 +116,7 @@ Hiredis::~Hiredis() gearmand_error_t Hiredis::initialize() { int service_port= atoi(service.c_str()); - if ((_redis= redisConnect("127.0.0.1", service_port)) == NULL) + if ((_redis= redisConnect(server.c_str(), service_port)) == NULL) { return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "Could not connect to redis server: %s", _redis->errstr); } From 10b3c776875a29d16fe7281b32c2a97839db51ef Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Thu, 3 Nov 2016 18:52:10 +0100 Subject: [PATCH 3/3] Do not exceed 80 chars per line Signed-off-by: Flavio Castelli --- .../plugins/queue/redis/queue.cc | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index fa1479e36..53b12004d 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -118,7 +118,10 @@ gearmand_error_t Hiredis::initialize() int service_port= atoi(service.c_str()); if ((_redis= redisConnect(server.c_str(), service_port)) == NULL) { - return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "Could not connect to redis server: %s", _redis->errstr); + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "Could not connect to redis server: %s", _redis->errstr); } gearmand_info("Initializing hiredis module"); @@ -196,17 +199,24 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context, return GEARMAND_QUEUE_ERROR; } - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires add: %.*s", (uint32_t)unique_size, (char *)unique); + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "hires add: %.*s", (uint32_t)unique_size, (char *)unique); std::vector key; build_key(key, unique, unique_size, function_name, function_name_size); - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires key: %u", (uint32_t)key.size()); + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "hires key: %u", (uint32_t)key.size()); redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %s %b", &key[0], data, data_size); gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply"); if (reply == NULL) { - return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "failed to insert '%.*s' into redis", key.size(), &key[0]); + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "failed to insert '%.*s' into redis", key.size(), &key[0]); } freeReplyObject(reply); @@ -226,7 +236,9 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, { gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires done: %.*s", (uint32_t)unique_size, (char *)unique); + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "hires done: %.*s", (uint32_t)unique_size, (char *)unique); std::vector key; build_key(key, unique, unique_size, function_name, function_name_size); @@ -258,7 +270,10 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX); if (reply == NULL) { - return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "Failed to call KEYS during QUEUE replay: %s", queue->redis()->errstr); + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "Failed to call KEYS during QUEUE replay: %s", queue->redis()->errstr); } for (size_t x= 0; x < reply->elements; x++) @@ -275,7 +290,9 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context if (fmt_str_length <= 0 or size_t(fmt_str_length) >= sizeof(fmt_str)) { assert(fmt_str_length != 1); - return gearmand_gerror("snprintf() failed to produce a valud fmt_str for redis key", GEARMAND_QUEUE_ERROR); + return gearmand_gerror( + "snprintf() failed to produce a valud fmt_str for redis key", + GEARMAND_QUEUE_ERROR); } int ret= sscanf(reply->element[x]->str, fmt_str,