diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index 2771da44b..53b12004d 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -116,15 +116,18 @@ Hiredis::~Hiredis() gearmand_error_t Hiredis::initialize() { int service_port= atoi(service.c_str()); - if ((_redis= redisConnect("127.0.0.1", service_port)) == NULL) + if ((_redis= redisConnect(server.c_str(), service_port)) == NULL) { - return gearmand_gerror("Could not connect to redis server", GEARMAND_QUEUE_ERROR); + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "Could not connect to redis server: %s", _redis->errstr); } gearmand_info("Initializing hiredis module"); gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay); - + return GEARMAND_SUCCESS; } @@ -196,17 +199,24 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context, return GEARMAND_QUEUE_ERROR; } - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires add: %.*s", (uint32_t)unique_size, (char *)unique); + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "hires add: %.*s", (uint32_t)unique_size, (char *)unique); std::vector key; build_key(key, unique, unique_size, function_name, function_name_size); - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires key: %u", (uint32_t)key.size()); + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "hires key: %u", (uint32_t)key.size()); redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %s %b", &key[0], data, data_size); gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply"); if (reply == NULL) { - return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "failed to insert '%.*s' into redis", key.size(), &key[0]); + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "failed to insert '%.*s' into redis", key.size(), &key[0]); } freeReplyObject(reply); @@ -226,7 +236,9 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, { gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires done: %.*s", (uint32_t)unique_size, (char *)unique); + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "hires done: %.*s", (uint32_t)unique_size, (char *)unique); std::vector key; build_key(key, unique, unique_size, function_name, function_name_size); @@ -234,7 +246,10 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %s", &key[0]); if (reply == NULL) { - return GEARMAND_QUEUE_ERROR; + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "Failed to call DEL for key %s: %s", &key[0], queue->redis()->errstr); } freeReplyObject(reply); @@ -255,7 +270,10 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX); if (reply == NULL) { - return gearmand_gerror("Failed to call KEYS during QUEUE replay", GEARMAND_QUEUE_ERROR); + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "Failed to call KEYS during QUEUE replay: %s", queue->redis()->errstr); } for (size_t x= 0; x < reply->elements; x++) @@ -272,7 +290,9 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context if (fmt_str_length <= 0 or size_t(fmt_str_length) >= sizeof(fmt_str)) { assert(fmt_str_length != 1); - return gearmand_gerror("snprintf() failed to produce a valud fmt_str for redis key", GEARMAND_QUEUE_ERROR); + return gearmand_gerror( + "snprintf() failed to produce a valud fmt_str for redis key", + GEARMAND_QUEUE_ERROR); } int ret= sscanf(reply->element[x]->str, fmt_str, @@ -287,6 +307,9 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str); if (get_reply == NULL) { + gearmand_log_debug( + GEARMAN_DEFAULT_LOG_PARAM, + "GET %s failed: %s", reply->element[x]->str, queue->redis()->errstr); continue; }