From a2add0cc9c8ce646eff1333061072006d60cc152 Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Mon, 5 Aug 2024 15:29:56 -0300 Subject: [PATCH 1/8] doc: improves in-code documentation add: - testing fucntions for better reviewing - TODO: readme.md for better descriptions --- README.md | 2 ++ docs/README.md | 1 + src/com/moclojer/rq.clj | 5 ++++- test/com/moclojer/rq/pubsub_test.clj | 2 ++ test/com/moclojer/rq/utils_test.clj | 6 ++++-- test/com/moclojer/rq_test.clj | 10 ++++++---- 6 files changed, 19 insertions(+), 7 deletions(-) create mode 100644 docs/README.md diff --git a/README.md b/README.md index cf833e8..647a4ed 100644 --- a/README.md +++ b/README.md @@ -87,3 +87,5 @@ sequenceDiagram Client-->>Logger: log closing client Client-->>User: confirm client closure ``` + +Read more about the project [here](docs/README.md). diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..64a477a --- /dev/null +++ b/docs/README.md @@ -0,0 +1 @@ +# CLJ-RQ diff --git a/src/com/moclojer/rq.clj b/src/com/moclojer/rq.clj index 5b03476..ec79966 100644 --- a/src/com/moclojer/rq.clj +++ b/src/com/moclojer/rq.clj @@ -23,6 +23,9 @@ (atom pool))))) (defn close-client - "Disconnect and close redis client" + "Disconnect and close redis client. + If no specific client is passed, the global client stored is closed;" ([] (close-client *redis-pool*)) ([client] (.close @client))) + + diff --git a/test/com/moclojer/rq/pubsub_test.clj b/test/com/moclojer/rq/pubsub_test.clj index 593df38..573186a 100644 --- a/test/com/moclojer/rq/pubsub_test.clj +++ b/test/com/moclojer/rq/pubsub_test.clj @@ -18,6 +18,8 @@ (swap! state conj msg))}) chans-msgs)})) + +;; (t/deftest pubsub-test (let [client (rq/create-client "redis://localhost:6379")] diff --git a/test/com/moclojer/rq/utils_test.clj b/test/com/moclojer/rq/utils_test.clj index 64b1232..37290d8 100644 --- a/test/com/moclojer/rq/utils_test.clj +++ b/test/com/moclojer/rq/utils_test.clj @@ -4,12 +4,14 @@ [com.moclojer.rq.utils :as utils])) (t/deftest pattern->str-test + (t/testing "packing" [(t/is "my-queue" (utils/pack-pattern :none "my-queue")) (t/is "rq:my-queue" (utils/pack-pattern :rq "my-queue")) (t/is "rq:pubsub:my-queue" (utils/pack-pattern :pubsub "my-queue")) - (t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))] + (t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))]) + (t/testing "unpacking" [(t/is "my-queue" (utils/unpack-pattern :none "my-queue")) (t/is "my-queue" (utils/unpack-pattern :rq "rq:my-queue")) (t/is "my-queue" (utils/unpack-pattern :pubsub "rq:pubsub:my-queue")) - (t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))]) + (t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))])) diff --git a/test/com/moclojer/rq_test.clj b/test/com/moclojer/rq_test.clj index 18cf275..d4fbb58 100644 --- a/test/com/moclojer/rq_test.clj +++ b/test/com/moclojer/rq_test.clj @@ -4,8 +4,10 @@ [com.moclojer.rq :as rq])) ;; WARNING: redis needs to be runing. - (t/deftest create-client-test - (let [client (rq/create-client "redis://localhost:6379")] - (t/is (.. @client getPool getResource)) - (rq/close-client client))) + (t/testing "redis-client being created" + (let [client (rq/create-client "redis://localhost:6379")] + (t/is (.. @client getPool getResource)) + (rq/close-client client)))) + + From 0ee6fbff74f979ebd9cb68c29d29dd968260eba3 Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Mon, 5 Aug 2024 17:43:49 -0300 Subject: [PATCH 2/8] feat: common redis commands wrapped to clojure adds: - bpop! - lrange - lindex - lrem - lset --- src/com/moclojer/rq/queue.clj | 103 ++++++++++++++++++++++++++-- test/com/moclojer/rq/queue_test.clj | 6 ++ 2 files changed, 103 insertions(+), 6 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index fac174d..2b62866 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -3,7 +3,8 @@ (:require [clojure.edn :as edn] [clojure.tools.logging :as log] - [com.moclojer.rq.utils :as utils])) + [com.moclojer.rq.utils :as utils] + [clojure.core.async :as async])) (defn push! "Push a message into a queue. @@ -34,11 +35,13 @@ (defn pop! "Pop a message from a queue. - - Options: - - direction: Direction to pop the message (:l or :r) - - pattern: Pattern for the queue name" + Parameters: + - client: Redis client + - queue-name: Name of the queue + - options: + - direction: Direction to pop the message (:l or :r) + - pattern: Pattern for the queue name" [client queue-name & options] (let [{:keys [direction pattern] :or {direction :l @@ -50,7 +53,6 @@ (.rpop @client packed-queue-name))] (when message - (log/debug "popped from queue" {:client client :queue-name packed-queue-name @@ -71,3 +73,92 @@ (let [{:keys [pattern] :or {pattern :rq}} options] (.llen @client (utils/pack-pattern pattern queue-name)))) + +(defn bpop! + "Block pop a message + + Parameters: + - same as pop! + - client: Redis client + - queue-name: Name of the queue + - options: Optional parameters, including: + - pattern: Pattern for the queue name + - timeout: Timeout for retrying to pop!" + ([client queue-name timeout & options] + (or (apply (partial pop! client queue-name) options) + (if (>= timeout 1000) + (do + (Thread/sleep 1000) + (bpop! client queue-name (- timeout 1000) options)) + nil)))) + +(defn lrange + "Return an entire range given min and max indexes + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - min: floor index + - max: ceiling index" + [client queue-name min max & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue (utils/pack-pattern pattern queue-name)] + (.range @client packed-queue min max))) + + +(defn lindex + "Return a element in a specified index + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - index: specific index to access" + [client queue-name index & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue (utils/pack-pattern pattern queue-name)] + (.lindex @client packed-queue index))) + +(defn lset + "Set a new message in the specified index + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - index: specific index to access + - msg: new msg to be added" + [client queue-name index msg & options] + ;TODO remove (let [old-msg (lindex client queue-name index)](println old-msg)) + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue (utils/pack-pattern pattern queue-name)] + (.lset @client packed-queue index msg)))) + +(defn lrem + "removes a specified occurance (defined by count) of the message given (if any) + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - msg: new msg to be added + - cnt: count + count > 0: Remove elements equal to element moving from head to tail. + count < 0: Remove elements equal to element moving from tail to head. + count = 0: Remove all elements equal to element." + [client queue-name cnt msg & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue (utils/pack-pattern pattern queue-name)] + (.lrem @client packed-queue cnt msg)))) + +(comment + (defn linsert []) + + (defn ltrim []) + + (defn rpoplpush []) + + (defn brpoplpsuh []) + + (defn lmove [])) diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index aa9f651..1fec968 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -24,4 +24,10 @@ (rq-queue/push! client queue-name message :pattern :pending) (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) + ;; TODO + (t/testing "block" + (rq-queue/bpop! client queue-name 10000 message) + (t/is (= message (rq-queue/pop! client queue-name message))) + ) + (rq/close-client client))) From d57030c403fa37164113827863d3ee61ffa8a9c0 Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Tue, 6 Aug 2024 19:30:06 -0300 Subject: [PATCH 3/8] feat: adds all the commands we need --- src/com/moclojer/rq/queue.clj | 162 +++++++++++++++++++++------- test/com/moclojer/rq/queue_test.clj | 79 ++++++++++++-- 2 files changed, 193 insertions(+), 48 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index 2b62866..c2bbcdb 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -3,8 +3,7 @@ (:require [clojure.edn :as edn] [clojure.tools.logging :as log] - [com.moclojer.rq.utils :as utils] - [clojure.core.async :as async])) + [com.moclojer.rq.utils :as utils])) (defn push! "Push a message into a queue. @@ -71,26 +70,34 @@ - pattern: Pattern for the queue name" [client queue-name & options] (let [{:keys [pattern] - :or {pattern :rq}} options] - (.llen @client (utils/pack-pattern pattern queue-name)))) + :or {pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name)] + (.llen @client packed-queue-name))) (defn bpop! - "Block pop a message - + "Pop a message from a queue. Blocking if necessary. + Parameters: - - same as pop! - - client: Redis client - - queue-name: Name of the queue - - options: Optional parameters, including: - - pattern: Pattern for the queue name - - timeout: Timeout for retrying to pop!" - ([client queue-name timeout & options] - (or (apply (partial pop! client queue-name) options) - (if (>= timeout 1000) - (do - (Thread/sleep 1000) - (bpop! client queue-name (- timeout 1000) options)) - nil)))) + - client: Redis client + - queue-name: Name of the queue + - tmot: Blocking timeout + - options: + - direction: Direction to pop the message (:l or :r) + - pattern: Pattern for the queue name" + [client queue-name tmot & {:keys [direction pattern] + :or {direction :l pattern :rq}}] + (let [packed-queue-name (utils/pack-pattern pattern queue-name) + result (if (= direction :l) + (.blpop @client tmot packed-queue-name) + (.brpop @client tmot packed-queue-name))] + (when result + (let [message (second result)] + (log/debug "popped from queue" + {:client client + :queue-name packed-queue-name + :options {:direction direction :pattern pattern} + :message message}) + (edn/read-string message))))) (defn lrange "Return an entire range given min and max indexes @@ -98,14 +105,13 @@ Parameters: - client: Redis client - queue-name: Name of the queue - - min: floor index - - max: ceiling index" - [client queue-name min max & options] + - floor: floor index + - ceil: ceiling index" + [client queue-name floor ceil & options] (let [{:keys [pattern] :or {pattern :rq}} options packed-queue (utils/pack-pattern pattern queue-name)] - (.range @client packed-queue min max))) - + (.lrange @client packed-queue floor ceil))) (defn lindex "Return a element in a specified index @@ -120,7 +126,7 @@ packed-queue (utils/pack-pattern pattern queue-name)] (.lindex @client packed-queue index))) -(defn lset +(defn lset "Set a new message in the specified index Parameters: @@ -128,15 +134,14 @@ - queue-name: Name of the queue - index: specific index to access - msg: new msg to be added" - [client queue-name index msg & options] - ;TODO remove (let [old-msg (lindex client queue-name index)](println old-msg)) - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-queue (utils/pack-pattern pattern queue-name)] - (.lset @client packed-queue index msg)))) + [client queue-name index msg & {:keys [pattern] + :or {pattern :rq}}] + (let [packed-queue-name (utils/pack-pattern pattern queue-name)] + (.lset @client packed-queue-name index msg))) + (defn lrem - "removes a specified occurance (defined by count) of the message given (if any) + "Removes a specified occurance of the message given (if any) Parameters: - client: Redis client @@ -149,16 +154,93 @@ [client queue-name cnt msg & options] (let [{:keys [pattern] :or {pattern :rq}} options - packed-queue (utils/pack-pattern pattern queue-name)] - (.lrem @client packed-queue cnt msg)))) + packed-queue-name (utils/pack-pattern pattern queue-name)] + (.lrem @client packed-queue-name cnt msg))) -(comment - (defn linsert []) - (defn ltrim []) +(defn linsert + "Insert a message before the first occurance of a pivot given. - (defn rpoplpush []) + Parameters: + - client: Redis client + - queue-name: Name of the queue + - msg: new msg to be added + - pivot: pivot message to be added before or after + - options: + - pos (keywords): + - before: insert the message before the pivot + - after: insert the message after the pivot" + [client queue-name msg pivot & options] + (let [{:keys [pos pattern] + :or {pos :before + pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name)] + (.linsert @client packed-queue-name pos pivot msg))) + + +(defn ltrim + "Trim a list to the specified range. - (defn brpoplpsuh []) + Parameters: + - client: Redis client + - queue-name: Name of the queue + - start: start index + - stop: stop index + - options: + - pattern: pattern to pack the queue name" + [client queue-name start stop & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name)] + (.ltrim @client packed-queue-name start stop))) + +(defn rpoplpush + "Remove the last element in a list and append it to another list. + + Parameters: + - client: Redis client + - source-queue: Name of the source queue + - destination-queue: Name of the destination queue + - options: + - pattern: pattern to pack the queue names" + [client source-queue destination-queue & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-source-queue (utils/pack-pattern pattern source-queue) + packed-destination-queue (utils/pack-pattern pattern destination-queue)] + (.rpoplpush @client packed-source-queue packed-destination-queue))) - (defn lmove [])) +(defn brpoplpush + "Remove the last element in a list and append it to another list, blocking if necessary. + + Parameters: + - client: Redis client + - source-queue: Name of the source queue + - destination-queue: Name of the destination queue + - timeout: timeout in seconds + - options: + - pattern: pattern to pack the queue names" + [client source-queue destination-queue timeout & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-source-queue (utils/pack-pattern pattern source-queue) + packed-destination-queue (utils/pack-pattern pattern destination-queue)] + (.brpoplpush @client packed-source-queue packed-destination-queue timeout))) + +(defn lmove + "Atomically return and remove the first/last element (head/tail depending on the wherefrom argument) of the source list, and push the element as the first/last element (head/tail depending on the whereto argument) of the destination list. + + Parameters: + - client: Redis client + - source-queue: Name of the source queue + - destination-queue: Name of the destination queue + - wherefrom: 'LEFT' or 'RIGHT' + - whereto: 'LEFT' or 'RIGHT' + - options: + - pattern: pattern to pack the queue names" + [client source-queue destination-queue wherefrom whereto & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-source-queue (utils/pack-pattern pattern source-queue) + packed-destination-queue (utils/pack-pattern pattern destination-queue)] + (.lmove @client packed-source-queue packed-destination-queue wherefrom whereto))) diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index 1fec968..674438c 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -8,13 +8,15 @@ (t/deftest queue-test (let [client (rq/create-client "redis://localhost:6379") queue-name (str (random-uuid)) - message (utils/gen-message)] + another-queue-name (str (random-uuid)) + message (utils/gen-message) + another-message (utils/gen-message)] (t/testing "raw" (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name (utils/gen-message)) + (rq-queue/push! client queue-name another-message) (t/is (= 2 (rq-queue/llen client queue-name))) - (t/is (= message (rq-queue/pop! client queue-name :direction :r)))) + (t/is (= message (rq-queue/pop! client queue-name {:direction :r})))) (t/testing "direction" (rq-queue/push! client queue-name message :direction :r) @@ -24,10 +26,71 @@ (rq-queue/push! client queue-name message :pattern :pending) (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) - ;; TODO - (t/testing "block" - (rq-queue/bpop! client queue-name 10000 message) - (t/is (= message (rq-queue/pop! client queue-name message))) - ) + (t/testing "bpop! left" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) + (rq-queue/push! client queue-name message) + (let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :l})] + (t/is (= message popped-message)) + (t/is (= 0 (rq-queue/llen client queue-name))))) + + (t/testing "bpop! right" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :r})))) + (rq-queue/push! client queue-name message) + (let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :r})] + (t/is (= message popped-message)) + (t/is (= 0 (rq-queue/llen client queue-name))))) + + (t/testing "lset" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (rq-queue/lset client queue-name 0 another-message) + (t/is (= another-message (rq-queue/lindex client queue-name 0))) + (rq-queue/lset client queue-name 1 message) + (t/is (= message (rq-queue/lindex client queue-name 1)))) + + (t/testing "lrange" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (t/is (= [message another-message] (rq-queue/lrange client queue-name 0 1)))) + + (t/testing "lindex" + (rq-queue/lset client queue-name 0 message) + (t/is (= message (rq-queue/lindex client queue-name 0)))) + + (t/testing "lrem" + (rq-queue/push! client queue-name message) + (rq-queue/lrem client queue-name 1 message) + (t/is (= 0 (rq-queue/llen client queue-name)))) + + (t/testing "linsert" + (rq-queue/push! client queue-name message) + (rq-queue/linsert client queue-name another-message message :pos :before) + (t/is (= another-message (rq-queue/lindex client queue-name 0)))) + + (t/testing "ltrim" + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (rq-queue/ltrim client queue-name 1 -1) + (t/is (= another-message (rq-queue/lindex client queue-name 0)))) + + (t/testing "rpoplpush" + (rq-queue/push! client queue-name message) + (rq-queue/rpoplpush client queue-name another-queue-name) + (t/is (= 0 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client another-queue-name :direction :l)))) + + (t/testing "brpoplpush" + (rq-queue/push! client queue-name message) + (rq-queue/brpoplpush client queue-name another-queue-name 1) + (t/is (= 0 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client another-queue-name :direction :l)))) + + (t/testing "lmove" + (rq-queue/push! client queue-name message) + (rq-queue/lmove client queue-name another-queue-name "LEFT" "RIGHT") + (t/is (= 0 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client another-queue-name :direction :r)))) (rq/close-client client))) From be2431d1669e3758f942ffb6619d5cc1fd84d20f Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Wed, 7 Aug 2024 13:14:29 -0300 Subject: [PATCH 4/8] add: tests for redis commands --- src/com/moclojer/rq/queue.clj | 83 +++++++++++++++++++++-------- test/com/moclojer/rq/queue_test.clj | 27 +++++----- 2 files changed, 76 insertions(+), 34 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index c2bbcdb..e2dd7e6 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -3,7 +3,8 @@ (:require [clojure.edn :as edn] [clojure.tools.logging :as log] - [com.moclojer.rq.utils :as utils])) + [com.moclojer.rq.utils :as utils] + [clojure.string :as s])) (defn push! "Push a message into a queue. @@ -30,7 +31,7 @@ :options opts :pushed-count pushed-count}) - pushed-count)) + pushed-count)) (defn pop! "Pop a message from a queue. @@ -110,8 +111,9 @@ [client queue-name floor ceil & options] (let [{:keys [pattern] :or {pattern :rq}} options - packed-queue (utils/pack-pattern pattern queue-name)] - (.lrange @client packed-queue floor ceil))) + packed-queue (utils/pack-pattern pattern queue-name) + result (.lrange @client packed-queue floor ceil)] + result)) (defn lindex "Return a element in a specified index @@ -123,21 +125,40 @@ [client queue-name index & options] (let [{:keys [pattern] :or {pattern :rq}} options - packed-queue (utils/pack-pattern pattern queue-name)] - (.lindex @client packed-queue index))) + packed-queue-name (utils/pack-pattern pattern queue-name) + result (.lindex @client packed-queue-name index)] + + (let [message (clojure.edn/read-string result)] + (log/debug "message found" + {:client client + :queue-name packed-queue-name + :index index + :message message}) + message))) (defn lset - "Set a new message in the specified index - + "Set a new message in a specified index + Parameters: - client: Redis client - queue-name: Name of the queue - index: specific index to access - - msg: new msg to be added" - [client queue-name index msg & {:keys [pattern] - :or {pattern :rq}}] - (let [packed-queue-name (utils/pack-pattern pattern queue-name)] - (.lset @client packed-queue-name index msg))) + - message: new msg to be added" + [client queue-name index message & options] + (let [{:keys [pattern] + :or {pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name) + encoded-message (clojure.edn/read-string (str message)) + return (.lset @client packed-queue-name index (str encoded-message))] + + (log/debug "set in queue" + {:client client + :queue-name packed-queue-name + :message (str encoded-message) + :index index + :options opts + :return return}) + return)) (defn lrem @@ -154,8 +175,16 @@ [client queue-name cnt msg & options] (let [{:keys [pattern] :or {pattern :rq}} options - packed-queue-name (utils/pack-pattern pattern queue-name)] - (.lrem @client packed-queue-name cnt msg))) + packed-queue-name (utils/pack-pattern pattern queue-name) + return (.lrem @client packed-queue-name cnt (str msg))] + + (log/debug "removed from queue" + {:client client + :queue-name queue-name + :msg (clojure.edn/read-string (str msg)) + :count cnt + :return return}) + return)) (defn linsert @@ -173,9 +202,19 @@ [client queue-name msg pivot & options] (let [{:keys [pos pattern] :or {pos :before - pattern :rq}} options - packed-queue-name (utils/pack-pattern pattern queue-name)] - (.linsert @client packed-queue-name pos pivot msg))) + pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name) + encoded-message (str (clojure.edn/read-string (str msg))) + encoded-pivot (str (clojure.edn/read-string (str pivot))) + encoded-pos (str (s/capitalize (str pivot))) + return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)] + (log/debug "inserted in queue" + {:client client + :queue-name queue-name + :msg (clojure.edn/read-string (str msg)) + :opts opts + :return return}) + return)) (defn ltrim @@ -187,7 +226,7 @@ - start: start index - stop: stop index - options: - - pattern: pattern to pack the queue name" + - pattern: pattern to pack the queue name" [client queue-name start stop & options] (let [{:keys [pattern] :or {pattern :rq}} options @@ -202,7 +241,7 @@ - source-queue: Name of the source queue - destination-queue: Name of the destination queue - options: - - pattern: pattern to pack the queue names" + - pattern: pattern to pack the queue names" [client source-queue destination-queue & options] (let [{:keys [pattern] :or {pattern :rq}} options @@ -219,7 +258,7 @@ - destination-queue: Name of the destination queue - timeout: timeout in seconds - options: - - pattern: pattern to pack the queue names" + - pattern: pattern to pack the queue names" [client source-queue destination-queue timeout & options] (let [{:keys [pattern] :or {pattern :rq}} options @@ -237,7 +276,7 @@ - wherefrom: 'LEFT' or 'RIGHT' - whereto: 'LEFT' or 'RIGHT' - options: - - pattern: pattern to pack the queue names" + - pattern: pattern to pack the queue names" [client source-queue destination-queue wherefrom whereto & options] (let [{:keys [pattern] :or {pattern :rq}} options diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index 674438c..7ecd7f8 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -26,6 +26,7 @@ (rq-queue/push! client queue-name message :pattern :pending) (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) + (t/testing "bpop! left" (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) (rq-queue/push! client queue-name message) @@ -40,6 +41,10 @@ (t/is (= message popped-message)) (t/is (= 0 (rq-queue/llen client queue-name))))) + (t/testing "lindex" + (rq-queue/push! client queue-name message) + (t/is (= message (rq-queue/lindex client queue-name 0)))) + (t/testing "lset" (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) (rq-queue/push! client queue-name message) @@ -47,18 +52,10 @@ (rq-queue/lset client queue-name 0 another-message) (t/is (= another-message (rq-queue/lindex client queue-name 0))) (rq-queue/lset client queue-name 1 message) - (t/is (= message (rq-queue/lindex client queue-name 1)))) - - (t/testing "lrange" - (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) - (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name another-message) - (t/is (= [message another-message] (rq-queue/lrange client queue-name 0 1)))) - - (t/testing "lindex" - (rq-queue/lset client queue-name 0 message) - (t/is (= message (rq-queue/lindex client queue-name 0)))) - + (t/is (= message (rq-queue/lindex client queue-name 1))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l) + ) (t/testing "lrem" (rq-queue/push! client queue-name message) (rq-queue/lrem client queue-name 1 message) @@ -75,6 +72,12 @@ (rq-queue/ltrim client queue-name 1 -1) (t/is (= another-message (rq-queue/lindex client queue-name 0)))) + (t/testing "lrange" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (t/is (= [message another-message] (rq-queue/lrange client queue-name 0 1)))) + (t/testing "rpoplpush" (rq-queue/push! client queue-name message) (rq-queue/rpoplpush client queue-name another-queue-name) From 36af507415bd8bd2bf502340038ee406093e84b3 Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Wed, 7 Aug 2024 16:45:49 -0300 Subject: [PATCH 5/8] fix: identation issues --- src/com/moclojer/rq/queue.clj | 33 +++++++++++++--------------- test/com/moclojer/rq/pubsub_test.clj | 1 - test/com/moclojer/rq/queue_test.clj | 4 +--- test/com/moclojer/rq/utils_test.clj | 16 +++++++------- 4 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index e2dd7e6..96d8ecc 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -31,7 +31,7 @@ :options opts :pushed-count pushed-count}) - pushed-count)) + pushed-count)) (defn pop! "Pop a message from a queue. @@ -73,7 +73,7 @@ (let [{:keys [pattern] :or {pattern :rq}} options packed-queue-name (utils/pack-pattern pattern queue-name)] - (.llen @client packed-queue-name))) + (.llen @client packed-queue-name))) (defn bpop! "Pop a message from a queue. Blocking if necessary. @@ -100,7 +100,7 @@ :message message}) (edn/read-string message))))) -(defn lrange +(defn lrange "Return an entire range given min and max indexes Parameters: @@ -127,7 +127,7 @@ :or {pattern :rq}} options packed-queue-name (utils/pack-pattern pattern queue-name) result (.lindex @client packed-queue-name index)] - + (let [message (clojure.edn/read-string result)] (log/debug "message found" {:client client @@ -160,8 +160,7 @@ :return return}) return)) - -(defn lrem +(defn lrem "Removes a specified occurance of the message given (if any) Parameters: @@ -173,21 +172,20 @@ count < 0: Remove elements equal to element moving from tail to head. count = 0: Remove all elements equal to element." [client queue-name cnt msg & options] - (let [{:keys [pattern] + (let [{:keys [pattern] :or {pattern :rq}} options packed-queue-name (utils/pack-pattern pattern queue-name) return (.lrem @client packed-queue-name cnt (str msg))] - (log/debug "removed from queue" - {:client client - :queue-name queue-name - :msg (clojure.edn/read-string (str msg)) - :count cnt - :return return}) - return)) - + (log/debug "removed from queue" + {:client client + :queue-name queue-name + :msg (clojure.edn/read-string (str msg)) + :count cnt + :return return}) + return)) -(defn linsert +(defn linsert "Insert a message before the first occurance of a pivot given. Parameters: @@ -201,7 +199,7 @@ - after: insert the message after the pivot" [client queue-name msg pivot & options] (let [{:keys [pos pattern] - :or {pos :before + :or {pos :before pattern :rq} :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) encoded-message (str (clojure.edn/read-string (str msg))) @@ -216,7 +214,6 @@ :return return}) return)) - (defn ltrim "Trim a list to the specified range. diff --git a/test/com/moclojer/rq/pubsub_test.clj b/test/com/moclojer/rq/pubsub_test.clj index 573186a..c3a6ab0 100644 --- a/test/com/moclojer/rq/pubsub_test.clj +++ b/test/com/moclojer/rq/pubsub_test.clj @@ -18,7 +18,6 @@ (swap! state conj msg))}) chans-msgs)})) - ;; (t/deftest pubsub-test (let [client (rq/create-client "redis://localhost:6379")] diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index 7ecd7f8..e628699 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -26,7 +26,6 @@ (rq-queue/push! client queue-name message :pattern :pending) (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) - (t/testing "bpop! left" (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) (rq-queue/push! client queue-name message) @@ -54,8 +53,7 @@ (rq-queue/lset client queue-name 1 message) (t/is (= message (rq-queue/lindex client queue-name 1))) (rq-queue/pop! client queue-name :direction :l) - (rq-queue/pop! client queue-name :direction :l) - ) + (rq-queue/pop! client queue-name :direction :l)) (t/testing "lrem" (rq-queue/push! client queue-name message) (rq-queue/lrem client queue-name 1 message) diff --git a/test/com/moclojer/rq/utils_test.clj b/test/com/moclojer/rq/utils_test.clj index 37290d8..9854f32 100644 --- a/test/com/moclojer/rq/utils_test.clj +++ b/test/com/moclojer/rq/utils_test.clj @@ -5,13 +5,13 @@ (t/deftest pattern->str-test (t/testing "packing" - [(t/is "my-queue" (utils/pack-pattern :none "my-queue")) - (t/is "rq:my-queue" (utils/pack-pattern :rq "my-queue")) - (t/is "rq:pubsub:my-queue" (utils/pack-pattern :pubsub "my-queue")) - (t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))]) + [(t/is "my-queue" (utils/pack-pattern :none "my-queue")) + (t/is "rq:my-queue" (utils/pack-pattern :rq "my-queue")) + (t/is "rq:pubsub:my-queue" (utils/pack-pattern :pubsub "my-queue")) + (t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))]) (t/testing "unpacking" - [(t/is "my-queue" (utils/unpack-pattern :none "my-queue")) - (t/is "my-queue" (utils/unpack-pattern :rq "rq:my-queue")) - (t/is "my-queue" (utils/unpack-pattern :pubsub "rq:pubsub:my-queue")) - (t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))])) + [(t/is "my-queue" (utils/unpack-pattern :none "my-queue")) + (t/is "my-queue" (utils/unpack-pattern :rq "rq:my-queue")) + (t/is "my-queue" (utils/unpack-pattern :pubsub "rq:pubsub:my-queue")) + (t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))])) From 2f7cf565a70699aa931ffd4b06f9993672f3d495 Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Fri, 16 Aug 2024 10:51:20 -0300 Subject: [PATCH 6/8] fix: updates remaining static impl add: - all the remaining fixes into redis static functions on order to fit the tests. - tests it all --- src/com/moclojer/rq/queue.clj | 132 +++++++++++++++++++--------- src/com/moclojer/rq/utils.clj | 7 +- test/com/moclojer/rq/queue_test.clj | 33 +++++-- 3 files changed, 120 insertions(+), 52 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index 96d8ecc..4061945 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -88,32 +88,18 @@ [client queue-name tmot & {:keys [direction pattern] :or {direction :l pattern :rq}}] (let [packed-queue-name (utils/pack-pattern pattern queue-name) - result (if (= direction :l) + return (if (= direction :l) (.blpop @client tmot packed-queue-name) (.brpop @client tmot packed-queue-name))] - (when result - (let [message (second result)] + (when return + (let [message (second return)] (log/debug "popped from queue" {:client client :queue-name packed-queue-name :options {:direction direction :pattern pattern} :message message}) - (edn/read-string message))))) + (edn/read-string message))))) -(defn lrange - "Return an entire range given min and max indexes - - Parameters: - - client: Redis client - - queue-name: Name of the queue - - floor: floor index - - ceil: ceiling index" - [client queue-name floor ceil & options] - (let [{:keys [pattern] - :or {pattern :rq}} options - packed-queue (utils/pack-pattern pattern queue-name) - result (.lrange @client packed-queue floor ceil)] - result)) (defn lindex "Return a element in a specified index @@ -126,9 +112,9 @@ (let [{:keys [pattern] :or {pattern :rq}} options packed-queue-name (utils/pack-pattern pattern queue-name) - result (.lindex @client packed-queue-name index)] + return (.lindex @client packed-queue-name index)] - (let [message (clojure.edn/read-string result)] + (let [message (clojure.edn/read-string return)] (log/debug "message found" {:client client :queue-name packed-queue-name @@ -148,8 +134,8 @@ (let [{:keys [pattern] :or {pattern :rq} :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (clojure.edn/read-string (str message)) - return (.lset @client packed-queue-name index (str encoded-message))] + encoded-message (str (clojure.edn/read-string (str message))) + return (.lset @client packed-queue-name index encoded-message)] (log/debug "set in queue" {:client client @@ -186,33 +172,57 @@ return)) (defn linsert - "Insert a message before the first occurance of a pivot given. + "insert a message before the first occurance of a pivot given. - Parameters: - - client: Redis client - - queue-name: Name of the queue + parameters: + - client: redis client + - queue-name: name of the queue - msg: new msg to be added - pivot: pivot message to be added before or after - options: - pos (keywords): - before: insert the message before the pivot - after: insert the message after the pivot" - [client queue-name msg pivot & options] + [client queue-name pivot msg & options] (let [{:keys [pos pattern] :or {pos :before pattern :rq} :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) encoded-message (str (clojure.edn/read-string (str msg))) encoded-pivot (str (clojure.edn/read-string (str pivot))) - encoded-pos (str (s/capitalize (str pivot))) + encoded-pos (if (= pos :before) + redis.clients.jedis.args.ListPosition/BEFORE + redis.clients.jedis.args.ListPosition/AFTER) return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)] - (log/debug "inserted in queue" + (log/debug "inserted in queue" + {:client client + :queue-name queue-name + :msg encoded-message + :opts opts + :return return}) + return)) + + +(defn lrange + "Return an entire range given min and max indexes + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - floor: floor index + - ceil: ceiling index" + [client queue-name floor ceil & options] + (let [{:keys [pattern] + :or {pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name) + return (.lrange @client packed-queue-name floor ceil)] + (log/debug "queue specified range" {:client client - :queue-name queue-name - :msg (clojure.edn/read-string (str msg)) + :queue-name packed-queue-name :opts opts - :return return}) - return)) + :result return}) + (mapv clojure.edn/read-string return))) + (defn ltrim "Trim a list to the specified range. @@ -226,9 +236,17 @@ - pattern: pattern to pack the queue name" [client queue-name start stop & options] (let [{:keys [pattern] - :or {pattern :rq}} options + :or {pattern :rq} :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name)] - (.ltrim @client packed-queue-name start stop))) + (let [return (.ltrim @client packed-queue-name start stop)] + (log/debug "queue trimmed" + {:client client + :queue-name queue-name + :opts opts + :result return}) + return))) + + (defn rpoplpush "Remove the last element in a list and append it to another list. @@ -243,8 +261,15 @@ (let [{:keys [pattern] :or {pattern :rq}} options packed-source-queue (utils/pack-pattern pattern source-queue) - packed-destination-queue (utils/pack-pattern pattern destination-queue)] - (.rpoplpush @client packed-source-queue packed-destination-queue))) + packed-destination-queue (utils/pack-pattern pattern destination-queue) + return (.rpoplpush @client packed-source-queue packed-destination-queue)] + (log/debug "rpoplpush operation" + {:client client + :source-queue packed-source-queue + :destination-queue packed-destination-queue + :result return}) + return)) + (defn brpoplpush "Remove the last element in a list and append it to another list, blocking if necessary. @@ -260,11 +285,20 @@ (let [{:keys [pattern] :or {pattern :rq}} options packed-source-queue (utils/pack-pattern pattern source-queue) - packed-destination-queue (utils/pack-pattern pattern destination-queue)] - (.brpoplpush @client packed-source-queue packed-destination-queue timeout))) + packed-destination-queue (utils/pack-pattern pattern destination-queue) + result (.brpoplpush @client packed-source-queue packed-destination-queue timeout)] + (log/debug "brpoplpush operation" + {:client client + :source-queue packed-source-queue + :destination-queue packed-destination-queue + :timeout timeout + :result result}) + result)) + + (defn lmove - "Atomically return and remove the first/last element (head/tail depending on the wherefrom argument) of the source list, and push the element as the first/last element (head/tail depending on the whereto argument) of the destination list. + "Atomically return and remove the first/last element of the source list, and push the element as the first/last element of the destination list. Parameters: - client: Redis client @@ -278,5 +312,19 @@ (let [{:keys [pattern] :or {pattern :rq}} options packed-source-queue (utils/pack-pattern pattern source-queue) - packed-destination-queue (utils/pack-pattern pattern destination-queue)] - (.lmove @client packed-source-queue packed-destination-queue wherefrom whereto))) + packed-destination-queue (utils/pack-pattern pattern destination-queue) + from-direction (if (= wherefrom "LEFT") + redis.clients.jedis.args.ListDirection/LEFT + redis.clients.jedis.args.ListDirection/RIGHT) + to-direction (if (= whereto "LEFT") + redis.clients.jedis.args.ListDirection/LEFT + redis.clients.jedis.args.ListDirection/RIGHT) + result (.lmove @client packed-source-queue packed-destination-queue from-direction to-direction)] + (log/debug "lmove operation" + {:client client + :source-queue packed-source-queue + :destination-queue packed-destination-queue + :from-direction from-direction + :to-direction to-direction + :result result}) + result)) diff --git a/src/com/moclojer/rq/utils.clj b/src/com/moclojer/rq/utils.clj index c6f9dab..1bf2eca 100644 --- a/src/com/moclojer/rq/utils.clj +++ b/src/com/moclojer/rq/utils.clj @@ -1,4 +1,6 @@ -(ns com.moclojer.rq.utils) +(ns com.moclojer.rq.utils + (:require + [clojure.string :as s])) (defn- pattern->str "Adapts given pattern keyword to a know internal pattern. Raises @@ -21,3 +23,6 @@ (defn unpack-pattern [pattern queue-name] (subs queue-name (count (pattern->str pattern)))) + + + diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index e628699..29fd70c 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -54,6 +54,7 @@ (t/is (= message (rq-queue/lindex client queue-name 1))) (rq-queue/pop! client queue-name :direction :l) (rq-queue/pop! client queue-name :direction :l)) + (t/testing "lrem" (rq-queue/push! client queue-name message) (rq-queue/lrem client queue-name 1 message) @@ -61,20 +62,34 @@ (t/testing "linsert" (rq-queue/push! client queue-name message) - (rq-queue/linsert client queue-name another-message message :pos :before) - (t/is (= another-message (rq-queue/lindex client queue-name 0)))) + (rq-queue/linsert client queue-name message another-message :pos :before) + (t/is (= another-message (rq-queue/lindex client queue-name 0))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) - (t/testing "ltrim" - (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name another-message) - (rq-queue/ltrim client queue-name 1 -1) - (t/is (= another-message (rq-queue/lindex client queue-name 0)))) (t/testing "lrange" - (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) (rq-queue/push! client queue-name message) (rq-queue/push! client queue-name another-message) - (t/is (= [message another-message] (rq-queue/lrange client queue-name 0 1)))) + (let [result (rq-queue/lrange client queue-name 0 1)] + (t/is (= [message another-message] (reverse result)))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) + + + (t/testing "ltrim" + (let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]} + message (assoc base-message :uuid (java.util.UUID/randomUUID)) + another-message (assoc base-message :uuid (java.util.UUID/randomUUID))] + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (t/is (= "OK" (rq-queue/ltrim client queue-name 1 -1))) + (let [result (rq-queue/lrange client queue-name 0 -1)] + (t/is (= [(dissoc another-message :uuid)] + (map #(dissoc % :uuid) result))))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) + (t/testing "rpoplpush" (rq-queue/push! client queue-name message) From 63b5de96a9ba8b1bfc339f015da2e0c5be97ea62 Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Fri, 16 Aug 2024 18:06:41 -0300 Subject: [PATCH 7/8] fix: identation --- src/com/moclojer/rq/queue.clj | 24 ++++++++---------------- test/com/moclojer/rq/queue_test.clj | 3 --- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index 4061945..1347a19 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -98,8 +98,7 @@ :queue-name packed-queue-name :options {:direction direction :pattern pattern} :message message}) - (edn/read-string message))))) - + (edn/read-string message))))) (defn lindex "Return a element in a specified index @@ -194,14 +193,13 @@ redis.clients.jedis.args.ListPosition/BEFORE redis.clients.jedis.args.ListPosition/AFTER) return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)] - (log/debug "inserted in queue" - {:client client - :queue-name queue-name - :msg encoded-message - :opts opts - :return return}) - return)) - + (log/debug "inserted in queue" + {:client client + :queue-name queue-name + :msg encoded-message + :opts opts + :return return}) + return)) (defn lrange "Return an entire range given min and max indexes @@ -223,7 +221,6 @@ :result return}) (mapv clojure.edn/read-string return))) - (defn ltrim "Trim a list to the specified range. @@ -246,8 +243,6 @@ :result return}) return))) - - (defn rpoplpush "Remove the last element in a list and append it to another list. @@ -270,7 +265,6 @@ :result return}) return)) - (defn brpoplpush "Remove the last element in a list and append it to another list, blocking if necessary. @@ -295,8 +289,6 @@ :result result}) result)) - - (defn lmove "Atomically return and remove the first/last element of the source list, and push the element as the first/last element of the destination list. diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index 29fd70c..2c39a86 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -67,7 +67,6 @@ (rq-queue/pop! client queue-name :direction :l) (rq-queue/pop! client queue-name :direction :l)) - (t/testing "lrange" (rq-queue/push! client queue-name message) (rq-queue/push! client queue-name another-message) @@ -76,7 +75,6 @@ (rq-queue/pop! client queue-name :direction :l) (rq-queue/pop! client queue-name :direction :l)) - (t/testing "ltrim" (let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]} message (assoc base-message :uuid (java.util.UUID/randomUUID)) @@ -90,7 +88,6 @@ (rq-queue/pop! client queue-name :direction :l) (rq-queue/pop! client queue-name :direction :l)) - (t/testing "rpoplpush" (rq-queue/push! client queue-name message) (rq-queue/rpoplpush client queue-name another-queue-name) From 059eae2243fca8d31d05275d592c83825573550e Mon Sep 17 00:00:00 2001 From: Felipe-gsilva Date: Mon, 19 Aug 2024 16:56:02 -0300 Subject: [PATCH 8/8] fix: string parsing on messages --- src/com/moclojer/rq/queue.clj | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index 1347a19..aa15028 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -133,7 +133,7 @@ (let [{:keys [pattern] :or {pattern :rq} :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (str (clojure.edn/read-string (str message))) + encoded-message (pr-str message) return (.lset @client packed-queue-name index encoded-message)] (log/debug "set in queue" @@ -160,12 +160,13 @@ (let [{:keys [pattern] :or {pattern :rq}} options packed-queue-name (utils/pack-pattern pattern queue-name) - return (.lrem @client packed-queue-name cnt (str msg))] + encoded-message (pr-str msg) + return (.lrem @client packed-queue-name cnt encoded-message)] (log/debug "removed from queue" {:client client :queue-name queue-name - :msg (clojure.edn/read-string (str msg)) + :msg msg :count cnt :return return}) return)) @@ -187,8 +188,8 @@ :or {pos :before pattern :rq} :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (str (clojure.edn/read-string (str msg))) - encoded-pivot (str (clojure.edn/read-string (str pivot))) + encoded-message (pr-str msg) + encoded-pivot (pr-str pivot) encoded-pos (if (= pos :before) redis.clients.jedis.args.ListPosition/BEFORE redis.clients.jedis.args.ListPosition/AFTER)