From b63a94d74bf39fb8051b9cdb59559e6d8df04f34 Mon Sep 17 00:00:00 2001 From: Alexei Pastuchov Date: Wed, 21 Dec 2016 09:38:17 +0100 Subject: [PATCH 1/5] returns GEARMAND_QUEUE_ERROR with a message --- libgearman-server/plugins/queue/redis/queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index ce5eb6911..98f48b67c 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -224,7 +224,7 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context, if (when) // No support for EPOCH jobs { - return GEARMAND_QUEUE_ERROR; + return gearmand_gerror("hiredis queue does not support epoch jobs", GEARMAND_QUEUE_ERROR); } gearmand_log_debug( From fd9a6171c569029de715df8f77ecaf5a17fc760f Mon Sep 17 00:00:00 2001 From: Alexei Pastuchov Date: Tue, 24 Jan 2017 07:38:24 +0100 Subject: [PATCH 2/5] add 'Building with hiredis y/n' into build configuration summary --- configure.ac | 1 + 1 file changed, 1 insertion(+) diff --git a/configure.ac b/configure.ac index 7190921bc..b2af60c66 100644 --- a/configure.ac +++ b/configure.ac @@ -355,6 +355,7 @@ echo " * LDFLAGS Flags: $LDFLAGS" echo " * Assertions enabled: $ax_enable_assert" echo " * Debug enabled: $ax_enable_debug" echo " * Warnings as failure: $ac_cv_warnings_as_errors" +echo " * Building with hiredis $ac_enable_hiredis" echo " * Building with libsqlite3 $WANT_SQLITE3" echo " * Building with libdrizzle $ac_enable_libdrizzle" echo " * Building with libmemcached $ax_enable_libmemcached" From c95959102f3476c9a7d0dcb7289605887e8c0432 Mon Sep 17 00:00:00 2001 From: Alexei Pastuchov Date: Tue, 24 Jan 2017 14:48:56 +0100 Subject: [PATCH 3/5] redis plugin refactoring: - move declarations into header file - Hiredis class provides getter, setter methods - build_key prepares a key without \0 characters at the end --- .../plugins/queue/redis/queue.cc | 216 +++++++++++------- libgearman-server/plugins/queue/redis/queue.h | 60 ++++- 2 files changed, 194 insertions(+), 82 deletions(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index 98f48b67c..4bb462034 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -41,14 +41,9 @@ */ #include -#include - #include -#include - -#if defined(HAVE_HIREDIS) && HAVE_HIREDIS -#include +#if defined(GEARMAND_PLUGINS_QUEUE_REDIS_H) /* Queue callback functions. */ static gearmand_error_t _hiredis_add(gearman_server_st *server, void *context, @@ -72,38 +67,90 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context gearman_queue_add_fn *add_fn, void *add_context); +/** + * gearmand::plugins::queue::Hiredis::redis() + * + * returns _redis + */ +redisContext* gearmand::plugins::queue::Hiredis::redis() +{ + return this->_redis; +} -namespace gearmand { namespace plugins { namespace queue { class Hiredis; }}} +/* + * gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, size_t data_size, uint32_t priority) + * + * returns true if hiredis HMSET succeeded + */ +bool gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, size_t data_size, uint32_t priority) { + redisContext* context = this->redis(); + int argc = 6; + std::string _priority = std::to_string((uint32_t)priority); + + const size_t argvlen[argc] = { + (const size_t)5, + (const size_t)key.size(), + (const size_t)4, + (const size_t)data_size, + (const size_t)8, + _priority.size() + }; + + std::vector argv {"HMSET"}; + argv.push_back( &key[0] ); + argv.push_back( "data" ); + argv.push_back( static_cast(data) ); + argv.push_back( "priority" ); + argv.push_back( _priority.c_str() ); + + redisReply *reply = (redisReply *)redisCommandArgv(context, argv.size(), &(argv[0]), &(argvlen[0]) ); + if (reply == nullptr) + return false; + + bool res = (reply->type == REDIS_REPLY_STATUS); -namespace gearmand { -namespace plugins { -namespace queue { + freeReplyObject(reply); + + return res; +} -class Hiredis : public Queue { -public: - Hiredis(); - ~Hiredis(); +/* + * bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req) + * + * fetch redis result for the key by HGETALL command and put it into the redis_record_t + * + * returns true on success + */ +bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req) +{ + redisContext * context = this->redis(); + redisReply * reply= (redisReply*)redisCommand(context, "HGETALL %s", key); + if (reply == nullptr) + return false; - gearmand_error_t initialize(); + // 2 x (key + value) + assert(reply->elements == 4); - redisContext* redis() - { - return _redis; - } + std::string s{reply->element[1]->str}; + req.data = s; + req.priority = (uint32_t)std::stoi(reply->element[3]->str); - std::string server; - std::string service; - std::string password; + freeReplyObject(reply); -private: - redisContext *_redis; -}; + return true; +} -Hiredis::Hiredis() : +/** + * gearmand::plugins::queue::Hiredis::Hiredis() + * + * setup server, service and password properties + * + */ +gearmand::plugins::queue::Hiredis::Hiredis() : Queue("redis"), + _redis(nullptr), server("127.0.0.1"), - service("6379"), - _redis(NULL) + service("6379") { command_line_options().add_options() ("redis-server", boost::program_options::value(&server), "Redis server") @@ -111,14 +158,21 @@ Hiredis::Hiredis() : ("redis-password", boost::program_options::value(&password), "Redis server password/service"); } -Hiredis::~Hiredis() +/** + * gearmand::plugins::queue::Hiredis::~Hiredis() + * + * free _redis context + */ +gearmand::plugins::queue::Hiredis::~Hiredis() { + if(this->_redis) + redisFree(this->_redis); } -gearmand_error_t Hiredis::initialize() +gearmand_error_t gearmand::plugins::queue::Hiredis::initialize() { int service_port= atoi(service.c_str()); - if ((_redis= redisConnect(server.c_str(), service_port)) == NULL) + if ((_redis= redisConnect(server.c_str(), service_port)) == nullptr) { return gearmand_log_gerror( GEARMAN_DEFAULT_LOG_PARAM, @@ -129,7 +183,7 @@ gearmand_error_t Hiredis::initialize() if (password.size()) { redisReply *reply = (redisReply*)redisCommand(_redis, "AUTH %s", password.c_str()); - if(reply == NULL) + if(reply == nullptr) { return gearmand_log_gerror( GEARMAN_DEFAULT_LOG_PARAM, @@ -154,42 +208,49 @@ gearmand_error_t Hiredis::initialize() gearmand_info("Initializing hiredis module"); - gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay); + gearman_server_set_queue(Gearmand()->server, this, _hiredis_add, _hiredis_flush, _hiredis_done, _hiredis_replay); return GEARMAND_SUCCESS; } -void initialize_redis() +/** + * define static gearmand::plugins::queue::Hiredis + */ +void gearmand::plugins::queue::initialize_redis() { - static Hiredis local_instance; + static gearmand::plugins::queue::Hiredis local_instance; } -} // namespace queue -} // namespace plugins -} // namespace gearmand - -typedef std::vector vchar_t; #define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX "_gear_" #define GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE sizeof(GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX) #define GEARMAND_KEY_LITERAL "%s-%.*s-%*s" static size_t build_key(vchar_t &key, const char *unique, - size_t unique_size, + size_t unique_size, const char *function_name, size_t function_name_size) { - key.resize(function_name_size +unique_size +GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE +4); - int key_size= snprintf(&key[0], key.size(), GEARMAND_KEY_LITERAL, + size_t buf_size = function_name_size + unique_size + GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE + 4; + char buf[buf_size]; + // buf size is overestimated + // so buf contains some \0 at the end + int key_size= snprintf(buf, buf_size, GEARMAND_KEY_LITERAL, GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX, (int)function_name_size, function_name, (int)unique_size, unique); - if (size_t(key_size) >= key.size() or key_size <= 0) + if (size_t(key_size) >= buf_size or key_size <= 0) { assert(0); return -1; } + // std::string removes all \0 at the end of buf + std::string s{buf}; + + key.resize(0); + std::copy(s.begin(), s.end(), std::back_inserter(key)); + return key.size(); } @@ -217,11 +278,9 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context, const char *function_name, size_t function_name_size, const void *data, size_t data_size, - gearman_job_priority_t, + gearman_job_priority_t priority, int64_t when) { - gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; - if (when) // No support for EPOCH jobs { return gearmand_gerror("hiredis queue does not support epoch jobs", GEARMAND_QUEUE_ERROR); @@ -229,26 +288,25 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context, gearmand_log_debug( GEARMAN_DEFAULT_LOG_PARAM, - "hires add: %.*s", (uint32_t)unique_size, (char *)unique); + "hires add func: %.*s, unique: %.*s", + (uint32_t)function_name_size, function_name, + (uint32_t)unique_size, (char *)unique); - std::vector key; + vchar_t key; build_key(key, unique, unique_size, function_name, function_name_size); + gearmand_log_debug( GEARMAN_DEFAULT_LOG_PARAM, "hires key: %u", (uint32_t)key.size()); - redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %s %b", &key[0], data, data_size); - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply"); - if (reply == NULL) - { - return gearmand_log_gerror( - GEARMAN_DEFAULT_LOG_PARAM, - GEARMAND_QUEUE_ERROR, - "failed to insert '%.*s' into redis", key.size(), &key[0]); - } - freeReplyObject(reply); + gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context; + if (queue->hmset(key, data, data_size, (uint32_t)priority)) + return GEARMAND_SUCCESS; - return GEARMAND_SUCCESS; + return gearmand_log_gerror( + GEARMAN_DEFAULT_LOG_PARAM, + GEARMAND_QUEUE_ERROR, + "failed to insert '%.*s' into redis", key.size(), &key[0]); } static gearmand_error_t _hiredis_flush(gearman_server_st *, void *) @@ -258,7 +316,7 @@ static gearmand_error_t _hiredis_flush(gearman_server_st *, void *) static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, const char *unique, - size_t unique_size, + size_t unique_size, const char *function_name, size_t function_name_size) { @@ -268,11 +326,11 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context, GEARMAN_DEFAULT_LOG_PARAM, "hires done: %.*s", (uint32_t)unique_size, (char *)unique); - std::vector key; + vchar_t key; build_key(key, unique, unique_size, function_name, function_name_size); - redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %s", &key[0]); - if (reply == NULL) + redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %b", &key[0], key.size()); + if (reply == nullptr) { return gearmand_log_gerror( GEARMAN_DEFAULT_LOG_PARAM, @@ -296,7 +354,7 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context gearmand_info("hiredis replay start"); redisReply *reply= (redisReply*)redisCommand(queue->redis(), "KEYS %s*", GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX); - if (reply == NULL) + if (reply == nullptr) { return gearmand_log_gerror( GEARMAN_DEFAULT_LOG_PARAM, @@ -332,35 +390,31 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context continue; } - redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str); - if (get_reply == NULL) + gearmand::plugins::queue::redis_record_t record; + if(!queue->fetch(reply->element[x]->str, record)) { - gearmand_log_debug( + return gearmand_log_gerror( GEARMAN_DEFAULT_LOG_PARAM, - "GET %s failed: %s", reply->element[x]->str, queue->redis()->errstr); - continue; + GEARMAND_QUEUE_ERROR, + "Failed to fetch data for the key: %s", reply->element[x]->str); } /* need to make a copy here ... gearman_server_job_free will free it later */ - char * data = (char*)malloc(get_reply->len); - if (data == NULL) - { - return gearmand_perror(errno, "malloc"); - } - memcpy(data, get_reply->str, get_reply->len); + char *data = strdup(record.data.c_str()); + size_t data_size = record.data.size(); + gearman_job_priority_t priority = static_cast(record.priority); (void)(add_fn)(server, add_context, unique, strlen(unique), function_name, strlen(function_name), - data, get_reply->len, - GEARMAN_JOB_PRIORITY_NORMAL, 0); - freeReplyObject(get_reply); + data, data_size, + priority, 0); } + freeReplyObject(reply); return GEARMAND_SUCCESS; } #pragma GCC diagnostic pop #pragma GCC diagnostic pop - -#endif // defined(HAVE_HIREDIS) && HAVE_HIREDIS +#endif // defined(GEARMAND_PLUGINS_QUEUE_REDIS_H) diff --git a/libgearman-server/plugins/queue/redis/queue.h b/libgearman-server/plugins/queue/redis/queue.h index 0d24d311c..8a87aeac1 100644 --- a/libgearman-server/plugins/queue/redis/queue.h +++ b/libgearman-server/plugins/queue/redis/queue.h @@ -37,13 +37,71 @@ #pragma once +#if defined(HAVE_HIREDIS) && HAVE_HIREDIS +#ifndef GEARMAND_PLUGINS_QUEUE_REDIS_H +#define GEARMAND_PLUGINS_QUEUE_REDIS_H + +#include +#include +#include + +typedef std::vector vchar_t; namespace gearmand { namespace plugins { namespace queue { +/** + * redis result record struct + */ +struct redis_record_t { + uint32_t priority; + std::string data; +}; + +/** + * redis queue class + * provides redisContex object + * and a set of helper methods + */ +class Hiredis : public Queue { + private: + redisContext *_redis; + public: + std::string server; + std::string service; + std::string password; + + Hiredis(); + ~Hiredis(); + + gearmand_error_t initialize(); + + redisContext* redis(); + + /* + * hmset(vchar_t key, const void *data, size_t data_size, uint32_t) + * + * returns true if hiredis HMSET succeeded + */ + bool hmset(vchar_t, const void *, size_t, uint32_t); + + /* + * bool fetch(char *key, redis_record_t &req) + * + * fetch redis data for the key + * and put it into record + * + * returns true on success + */ + bool fetch(char *, redis_record_t &); +}; // class Hiredis + void initialize_redis(); } // namespace queue -} // namespace plugin +} // namespace plugins } // namespace gearmand + +#endif // ifndef GEARMAND_PLUGINS_QUEUE_REDIS_H +#endif // if defined(HAVE_HIREDIS) && HAVE_HIREDIS From 7853f812f01216a16d34131bb093716653a13d8c Mon Sep 17 00:00:00 2001 From: Alexei Pastuchov Date: Tue, 2 May 2017 15:37:34 +0200 Subject: [PATCH 4/5] for backwards compatibility redis queue supports old fashion job data --- .../plugins/queue/redis/queue.cc | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index 4bb462034..3042fa8cf 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -124,16 +124,39 @@ bool gearmand::plugins::queue::Hiredis::hmset(vchar_t key, const void *data, siz bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queue::redis_record_t &req) { redisContext * context = this->redis(); - redisReply * reply= (redisReply*)redisCommand(context, "HGETALL %s", key); + redisReply * reply = (redisReply*)redisCommand(context, "HGETALL %s", key); if (reply == nullptr) return false; - // 2 x (key + value) - assert(reply->elements == 4); + //FIXME remove workaround + if(reply->type == REDIS_REPLY_ERROR) { + // workaround to ensure gearmand upgrade. + // gearmand <=1.1.15 stores data in string, not in hash. + gearmand_log_warning(GEARMAN_DEFAULT_LOG_PARAM, "redis replies for HGETALL: %s", reply->str); - std::string s{reply->element[1]->str}; - req.data = s; - req.priority = (uint32_t)std::stoi(reply->element[3]->str); + reply = (redisReply*)redisCommand(context, "TYPE %s", key); + if (reply == nullptr) + return false; + + if(strcmp(reply->str, "string") != 0) { + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "unexpected type of the value stored in key: %s", reply->str); + return false; + } + + reply = (redisReply*)redisCommand(context, "GET %s", key); + if (reply == nullptr) + return false; + + std::string s{reply->str}; + req.data = s; + req.priority = GEARMAN_JOB_PRIORITY_NORMAL; + } else { + // 2 x (key + value) + assert(reply->elements == 4); + std::string s{reply->element[1]->str}; + req.data = s; + req.priority = (uint32_t)std::stoi(reply->element[3]->str); + } freeReplyObject(reply); From fda1c636bfbf3de4053e4cea87e231fd83129e35 Mon Sep 17 00:00:00 2001 From: Alexei Pastuchov Date: Thu, 18 May 2017 08:36:03 +0200 Subject: [PATCH 5/5] reduce logging level to info on HGETALL reply --- libgearman-server/plugins/queue/redis/queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libgearman-server/plugins/queue/redis/queue.cc b/libgearman-server/plugins/queue/redis/queue.cc index 3042fa8cf..d54d114d1 100644 --- a/libgearman-server/plugins/queue/redis/queue.cc +++ b/libgearman-server/plugins/queue/redis/queue.cc @@ -132,7 +132,7 @@ bool gearmand::plugins::queue::Hiredis::fetch(char *key, gearmand::plugins::queu if(reply->type == REDIS_REPLY_ERROR) { // workaround to ensure gearmand upgrade. // gearmand <=1.1.15 stores data in string, not in hash. - gearmand_log_warning(GEARMAN_DEFAULT_LOG_PARAM, "redis replies for HGETALL: %s", reply->str); + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "redis replies for HGETALL: %s", reply->str); reply = (redisReply*)redisCommand(context, "TYPE %s", key); if (reply == nullptr)