Skip to content

Commit

Permalink
Merge pull request #31 from flavio/fix-redis-queue
Browse files Browse the repository at this point in the history
More fixes for the redis queue
  • Loading branch information
SpamapS authored Nov 3, 2016
2 parents 5fa613c + 10b3c77 commit 9238048
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions libgearman-server/plugins/queue/redis/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ Hiredis::~Hiredis()
gearmand_error_t Hiredis::initialize()
{
int service_port= atoi(service.c_str());
if ((_redis= redisConnect("127.0.0.1", service_port)) == NULL)
if ((_redis= redisConnect(server.c_str(), service_port)) == NULL)
{
return gearmand_gerror("Could not connect to redis server", GEARMAND_QUEUE_ERROR);
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"Could not connect to redis server: %s", _redis->errstr);
}

gearmand_info("Initializing hiredis module");

gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay);

return GEARMAND_SUCCESS;
}

Expand Down Expand Up @@ -196,17 +199,24 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context,
return GEARMAND_QUEUE_ERROR;
}

gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires add: %.*s", (uint32_t)unique_size, (char *)unique);
gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"hires add: %.*s", (uint32_t)unique_size, (char *)unique);

std::vector<char> key;
build_key(key, unique, unique_size, function_name, function_name_size);
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires key: %u", (uint32_t)key.size());
gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"hires key: %u", (uint32_t)key.size());

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %s %b", &key[0], data, data_size);
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply");
if (reply == NULL)
{
return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "failed to insert '%.*s' into redis", key.size(), &key[0]);
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"failed to insert '%.*s' into redis", key.size(), &key[0]);
}
freeReplyObject(reply);

Expand All @@ -226,15 +236,20 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
{
gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;

gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "hires done: %.*s", (uint32_t)unique_size, (char *)unique);
gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"hires done: %.*s", (uint32_t)unique_size, (char *)unique);

std::vector<char> key;
build_key(key, unique, unique_size, function_name, function_name_size);

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %s", &key[0]);
if (reply == NULL)
{
return GEARMAND_QUEUE_ERROR;
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"Failed to call DEL for key %s: %s", &key[0], queue->redis()->errstr);
}
freeReplyObject(reply);

Expand All @@ -255,7 +270,10 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX);
if (reply == NULL)
{
return gearmand_gerror("Failed to call KEYS during QUEUE replay", GEARMAND_QUEUE_ERROR);
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"Failed to call KEYS during QUEUE replay: %s", queue->redis()->errstr);
}

for (size_t x= 0; x < reply->elements; x++)
Expand All @@ -272,7 +290,9 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
if (fmt_str_length <= 0 or size_t(fmt_str_length) >= sizeof(fmt_str))
{
assert(fmt_str_length != 1);
return gearmand_gerror("snprintf() failed to produce a valud fmt_str for redis key", GEARMAND_QUEUE_ERROR);
return gearmand_gerror(
"snprintf() failed to produce a valud fmt_str for redis key",
GEARMAND_QUEUE_ERROR);
}
int ret= sscanf(reply->element[x]->str,
fmt_str,
Expand All @@ -287,6 +307,9 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str);
if (get_reply == NULL)
{
gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"GET %s failed: %s", reply->element[x]->str, queue->redis()->errstr);
continue;
}

Expand Down

0 comments on commit 9238048

Please sign in to comment.