Skip to content

Commit

Permalink
Merge pull request #71 from p-alik/redis
Browse files Browse the repository at this point in the history
Redis

Reviewed-by: Clint Byrum <[email protected]>
             https://github.com/SpamapS
  • Loading branch information
bonnyci[bot] authored May 18, 2017
2 parents fd06abb + fda1c63 commit 4ee46f8
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 82 deletions.
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ echo " * LDFLAGS Flags: $LDFLAGS"
echo " * Assertions enabled: $ax_enable_assert"
echo " * Debug enabled: $ax_enable_debug"
echo " * Warnings as failure: $ac_cv_warnings_as_errors"
echo " * Building with hiredis $ac_enable_hiredis"
echo " * Building with libsqlite3 $WANT_SQLITE3"
echo " * Building with libdrizzle $ac_enable_libdrizzle"
echo " * Building with libmemcached $ax_enable_libmemcached"
Expand Down
239 changes: 158 additions & 81 deletions libgearman-server/plugins/queue/redis/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,9 @@
*/

#include <gear_config.h>
#include <libgearman-server/common.h>

#include <libgearman-server/plugins/queue/redis/queue.h>
#include <libgearman-server/plugins/queue/base.h>

#if defined(HAVE_HIREDIS) && HAVE_HIREDIS

#include <hiredis/hiredis.h>
#if defined(GEARMAND_PLUGINS_QUEUE_REDIS_H)

/* Queue callback functions. */
static gearmand_error_t _hiredis_add(gearman_server_st *server, void *context,
Expand All @@ -72,53 +67,135 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
gearman_queue_add_fn *add_fn,
void *add_context);

/**
* gearmand::plugins::queue::Hiredis::redis()
*
* returns _redis
*/
redisContext* gearmand::plugins::queue::Hiredis::redis()
{
return this->_redis;
}

namespace gearmand { namespace plugins { namespace queue { class Hiredis; }}}
/*
* gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, size_t data_size, uint32_t priority)
*
* returns true if hiredis HMSET succeeded
*/
bool gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, size_t data_size, uint32_t priority) {
redisContext* context = this->redis();
int argc = 6;
std::string _priority = std::to_string((uint32_t)priority);

const size_t argvlen[argc] = {
(const size_t)5,
(const size_t)key.size(),
(const size_t)4,
(const size_t)data_size,
(const size_t)8,
_priority.size()
};

std::vector<const char*> argv {"HMSET"};
argv.push_back( &key[0] );
argv.push_back( "data" );
argv.push_back( static_cast<const char*>(data) );
argv.push_back( "priority" );
argv.push_back( _priority.c_str() );

redisReply *reply = (redisReply *)redisCommandArgv(context, argv.size(), &(argv[0]), &(argvlen[0]) );
if (reply == nullptr)
return false;

bool res = (reply->type == REDIS_REPLY_STATUS);

namespace gearmand {
namespace plugins {
namespace queue {
freeReplyObject(reply);

class Hiredis : public Queue {
public:
Hiredis();
~Hiredis();
return res;
}

gearmand_error_t initialize();
/*
* bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req)
*
* fetch redis result for the key by HGETALL command and put it into the redis_record_t
*
* returns true on success
*/
bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req)
{
redisContext * context = this->redis();
redisReply * reply = (redisReply*)redisCommand(context, "HGETALL %s", key);
if (reply == nullptr)
return false;

//FIXME remove workaround
if(reply->type == REDIS_REPLY_ERROR) {
// workaround to ensure gearmand upgrade.
// gearmand <=1.1.15 stores data in string, not in hash.
gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "redis replies for HGETALL: %s", reply->str);

reply = (redisReply*)redisCommand(context, "TYPE %s", key);
if (reply == nullptr)
return false;

if(strcmp(reply->str, "string") != 0) {
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "unexpected type of the value stored in key: %s", reply->str);
return false;
}

redisContext* redis()
{
return _redis;
reply = (redisReply*)redisCommand(context, "GET %s", key);
if (reply == nullptr)
return false;

std::string s{reply->str};
req.data = s;
req.priority = GEARMAN_JOB_PRIORITY_NORMAL;
} else {
// 2 x (key + value)
assert(reply->elements == 4);
std::string s{reply->element[1]->str};
req.data = s;
req.priority = (uint32_t)std::stoi(reply->element[3]->str);
}

std::string server;
std::string service;
std::string password;
freeReplyObject(reply);

private:
redisContext *_redis;
};
return true;
}

Hiredis::Hiredis() :
/**
* gearmand::plugins::queue::Hiredis::Hiredis()
*
* setup server, service and password properties
*
*/
gearmand::plugins::queue::Hiredis::Hiredis() :
Queue("redis"),
_redis(nullptr),
server("127.0.0.1"),
service("6379"),
_redis(NULL)
service("6379")
{
command_line_options().add_options()
("redis-server", boost::program_options::value(&server), "Redis server")
("redis-port", boost::program_options::value(&service), "Redis server port/service")
("redis-password", boost::program_options::value(&password), "Redis server password/service");
}

Hiredis::~Hiredis()
/**
* gearmand::plugins::queue::Hiredis::~Hiredis()
*
* free _redis context
*/
gearmand::plugins::queue::Hiredis::~Hiredis()
{
if(this->_redis)
redisFree(this->_redis);
}

gearmand_error_t Hiredis::initialize()
gearmand_error_t gearmand::plugins::queue::Hiredis::initialize()
{
int service_port= atoi(service.c_str());
if ((_redis= redisConnect(server.c_str(), service_port)) == NULL)
if ((_redis= redisConnect(server.c_str(), service_port)) == nullptr)
{
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
Expand All @@ -129,7 +206,7 @@ gearmand_error_t Hiredis::initialize()
if (password.size())
{
redisReply *reply = (redisReply*)redisCommand(_redis, "AUTH %s", password.c_str());
if(reply == NULL)
if(reply == nullptr)
{
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
Expand All @@ -154,42 +231,49 @@ gearmand_error_t Hiredis::initialize()

gearmand_info("Initializing hiredis module");

gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay);
gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay);

return GEARMAND_SUCCESS;
}

void initialize_redis()
/**
* define static gearmand::plugins::queue::Hiredis
*/
void gearmand::plugins::queue::initialize_redis()
{
static Hiredis local_instance;
static gearmand::plugins::queue::Hiredis local_instance;
}

} // namespace queue
} // namespace plugins
} // namespace gearmand

typedef std::vector<char> vchar_t;
#define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX "_gear_"
#define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE sizeof(GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX)
#define GEARMAND_KEY_LITERAL "%s-%.*s-%*s"

static size_t build_key(vchar_t &key,
const char *unique,
size_t unique_size,
size_t unique_size,
const char *function_name,
size_t function_name_size)
{
key.resize(function_name_size +unique_size +GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE +4);
int key_size= snprintf(&key[0], key.size(), GEARMAND_KEY_LITERAL,
size_t buf_size = function_name_size + unique_size + GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE + 4;
char buf[buf_size];
// buf size is overestimated
// so buf contains some \0 at the end
int key_size= snprintf(buf, buf_size, GEARMAND_KEY_LITERAL,
GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX,
(int)function_name_size, function_name,
(int)unique_size, unique);
if (size_t(key_size) >= key.size() or key_size <= 0)
if (size_t(key_size) >= buf_size or key_size <= 0)
{
assert(0);
return -1;
}

// std::string removes all \0 at the end of buf
std::string s{buf};

key.resize(0);
std::copy(s.begin(), s.end(), std::back_inserter(key));

return key.size();
}

Expand Down Expand Up @@ -217,38 +301,35 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context,
const char *function_name,
size_t function_name_size,
const void *data, size_t data_size,
gearman_job_priority_t,
gearman_job_priority_t priority,
int64_t when)
{
gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;

if (when) // No support for EPOCH jobs
{
return GEARMAND_QUEUE_ERROR;
return gearmand_gerror("hiredis queue does not support epoch jobs", GEARMAND_QUEUE_ERROR);
}

gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"hires add: %.*s", (uint32_t)unique_size, (char *)unique);
"hires add func: %.*s, unique: %.*s",
(uint32_t)function_name_size, function_name,
(uint32_t)unique_size, (char *)unique);

std::vector<char> key;
vchar_t key;
build_key(key, unique, unique_size, function_name, function_name_size);

gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"hires key: %u", (uint32_t)key.size());

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %s %b", &key[0], data, data_size);
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply");
if (reply == NULL)
{
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"failed to insert '%.*s' into redis", key.size(), &key[0]);
}
freeReplyObject(reply);
gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
if (queue->hmset(key, data, data_size, (uint32_t)priority))
return GEARMAND_SUCCESS;

return GEARMAND_SUCCESS;
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"failed to insert '%.*s' into redis", key.size(), &key[0]);
}

static gearmand_error_t _hiredis_flush(gearman_server_st *, void *)
Expand All @@ -258,7 +339,7 @@ static gearmand_error_t _hiredis_flush(gearman_server_st *, void *)

static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
const char *unique,
size_t unique_size,
size_t unique_size,
const char *function_name,
size_t function_name_size)
{
Expand All @@ -268,11 +349,11 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
GEARMAN_DEFAULT_LOG_PARAM,
"hires done: %.*s", (uint32_t)unique_size, (char *)unique);

std::vector<char> key;
vchar_t key;
build_key(key, unique, unique_size, function_name, function_name_size);

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %s", &key[0]);
if (reply == NULL)
redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %b", &key[0], key.size());
if (reply == nullptr)
{
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
Expand All @@ -296,7 +377,7 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
gearmand_info("hiredis replay start");

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX);
if (reply == NULL)
if (reply == nullptr)
{
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
Expand Down Expand Up @@ -332,35 +413,31 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
continue;
}

redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str);
if (get_reply == NULL)
gearmand::plugins::queue::redis_record_t record;
if(!queue->fetch(reply->element[x]->str, record))
{
gearmand_log_debug(
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
"GET %s failed: %s", reply->element[x]->str, queue->redis()->errstr);
continue;
GEARMAND_QUEUE_ERROR,
"Failed to fetch data for the key: %s", reply->element[x]->str);
}

/* need to make a copy here ... gearman_server_job_free will free it later */
char * data = (char*)malloc(get_reply->len);
if (data == NULL)
{
return gearmand_perror(errno, "malloc");
}
memcpy(data, get_reply->str, get_reply->len);
char *data = strdup(record.data.c_str());
size_t data_size = record.data.size();
gearman_job_priority_t priority = static_cast<gearman_job_priority_t>(record.priority);

(void)(add_fn)(server, add_context,
unique, strlen(unique),
function_name, strlen(function_name),
data, get_reply->len,
GEARMAN_JOB_PRIORITY_NORMAL, 0);
freeReplyObject(get_reply);
data, data_size,
priority, 0);
}

freeReplyObject(reply);

return GEARMAND_SUCCESS;
}
#pragma GCC diagnostic pop
#pragma GCC diagnostic pop

#endif // defined(HAVE_HIREDIS) && HAVE_HIREDIS
#endif // defined(GEARMAND_PLUGINS_QUEUE_REDIS_H)
Loading

0 comments on commit 4ee46f8

Please sign in to comment.