diff --git a/docs/README.md b/docs/README.md index 809b5f2..a666a52 100644 --- a/docs/README.md +++ b/docs/README.md @@ -28,4 +28,4 @@ lein test com.moclojer.rq.queue-test ``` This commands will run all the test cases defined in the our tests namespaces and provide feedback on their status. -By running these tests, you can verify the correctness and reliability of the queue operations, such as seeing some important info output on debug mode. +By running these tests, you can verify the correctness and reliability of the queue operations, such as seeing some important info output on debug mode. \ No newline at end of file diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index fac174d..aa15028 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -3,7 +3,8 @@ (:require [clojure.edn :as edn] [clojure.tools.logging :as log] - [com.moclojer.rq.utils :as utils])) + [com.moclojer.rq.utils :as utils] + [clojure.string :as s])) (defn push! "Push a message into a queue. @@ -34,11 +35,13 @@ (defn pop! "Pop a message from a queue. - - Options: - - direction: Direction to pop the message (:l or :r) - - pattern: Pattern for the queue name" + Parameters: + - client: Redis client + - queue-name: Name of the queue + - options: + - direction: Direction to pop the message (:l or :r) + - pattern: Pattern for the queue name" [client queue-name & options] (let [{:keys [direction pattern] :or {direction :l @@ -50,7 +53,6 @@ (.rpop @client packed-queue-name))] (when message - (log/debug "popped from queue" {:client client :queue-name packed-queue-name @@ -69,5 +71,253 @@ - pattern: Pattern for the queue name" [client queue-name & options] (let [{:keys [pattern] - :or {pattern :rq}} options] - (.llen @client (utils/pack-pattern pattern queue-name)))) + :or {pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name)] + (.llen @client packed-queue-name))) + +(defn bpop! + "Pop a message from a queue. Blocking if necessary. + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - tmot: Blocking timeout + - options: + - direction: Direction to pop the message (:l or :r) + - pattern: Pattern for the queue name" + [client queue-name tmot & {:keys [direction pattern] + :or {direction :l pattern :rq}}] + (let [packed-queue-name (utils/pack-pattern pattern queue-name) + return (if (= direction :l) + (.blpop @client tmot packed-queue-name) + (.brpop @client tmot packed-queue-name))] + (when return + (let [message (second return)] + (log/debug "popped from queue" + {:client client + :queue-name packed-queue-name + :options {:direction direction :pattern pattern} + :message message}) + (edn/read-string message))))) + +(defn lindex + "Return a element in a specified index + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - index: specific index to access" + [client queue-name index & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name) + return (.lindex @client packed-queue-name index)] + + (let [message (clojure.edn/read-string return)] + (log/debug "message found" + {:client client + :queue-name packed-queue-name + :index index + :message message}) + message))) + +(defn lset + "Set a new message in a specified index + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - index: specific index to access + - message: new msg to be added" + [client queue-name index message & options] + (let [{:keys [pattern] + :or {pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name) + encoded-message (pr-str message) + return (.lset @client packed-queue-name index encoded-message)] + + (log/debug "set in queue" + {:client client + :queue-name packed-queue-name + :message (str encoded-message) + :index index + :options opts + :return return}) + return)) + +(defn lrem + "Removes a specified occurance of the message given (if any) + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - msg: new msg to be added + - cnt: count + count > 0: Remove elements equal to element moving from head to tail. + count < 0: Remove elements equal to element moving from tail to head. + count = 0: Remove all elements equal to element." + [client queue-name cnt msg & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name) + encoded-message (pr-str msg) + return (.lrem @client packed-queue-name cnt encoded-message)] + + (log/debug "removed from queue" + {:client client + :queue-name queue-name + :msg msg + :count cnt + :return return}) + return)) + +(defn linsert + "insert a message before the first occurance of a pivot given. + + parameters: + - client: redis client + - queue-name: name of the queue + - msg: new msg to be added + - pivot: pivot message to be added before or after + - options: + - pos (keywords): + - before: insert the message before the pivot + - after: insert the message after the pivot" + [client queue-name pivot msg & options] + (let [{:keys [pos pattern] + :or {pos :before + pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name) + encoded-message (pr-str msg) + encoded-pivot (pr-str pivot) + encoded-pos (if (= pos :before) + redis.clients.jedis.args.ListPosition/BEFORE + redis.clients.jedis.args.ListPosition/AFTER) + return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)] + (log/debug "inserted in queue" + {:client client + :queue-name queue-name + :msg encoded-message + :opts opts + :return return}) + return)) + +(defn lrange + "Return an entire range given min and max indexes + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - floor: floor index + - ceil: ceiling index" + [client queue-name floor ceil & options] + (let [{:keys [pattern] + :or {pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name) + return (.lrange @client packed-queue-name floor ceil)] + (log/debug "queue specified range" + {:client client + :queue-name packed-queue-name + :opts opts + :result return}) + (mapv clojure.edn/read-string return))) + +(defn ltrim + "Trim a list to the specified range. + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - start: start index + - stop: stop index + - options: + - pattern: pattern to pack the queue name" + [client queue-name start stop & options] + (let [{:keys [pattern] + :or {pattern :rq} :as opts} options + packed-queue-name (utils/pack-pattern pattern queue-name)] + (let [return (.ltrim @client packed-queue-name start stop)] + (log/debug "queue trimmed" + {:client client + :queue-name queue-name + :opts opts + :result return}) + return))) + +(defn rpoplpush + "Remove the last element in a list and append it to another list. + + Parameters: + - client: Redis client + - source-queue: Name of the source queue + - destination-queue: Name of the destination queue + - options: + - pattern: pattern to pack the queue names" + [client source-queue destination-queue & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-source-queue (utils/pack-pattern pattern source-queue) + packed-destination-queue (utils/pack-pattern pattern destination-queue) + return (.rpoplpush @client packed-source-queue packed-destination-queue)] + (log/debug "rpoplpush operation" + {:client client + :source-queue packed-source-queue + :destination-queue packed-destination-queue + :result return}) + return)) + +(defn brpoplpush + "Remove the last element in a list and append it to another list, blocking if necessary. + + Parameters: + - client: Redis client + - source-queue: Name of the source queue + - destination-queue: Name of the destination queue + - timeout: timeout in seconds + - options: + - pattern: pattern to pack the queue names" + [client source-queue destination-queue timeout & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-source-queue (utils/pack-pattern pattern source-queue) + packed-destination-queue (utils/pack-pattern pattern destination-queue) + result (.brpoplpush @client packed-source-queue packed-destination-queue timeout)] + (log/debug "brpoplpush operation" + {:client client + :source-queue packed-source-queue + :destination-queue packed-destination-queue + :timeout timeout + :result result}) + result)) + +(defn lmove + "Atomically return and remove the first/last element of the source list, and push the element as the first/last element of the destination list. + + Parameters: + - client: Redis client + - source-queue: Name of the source queue + - destination-queue: Name of the destination queue + - wherefrom: 'LEFT' or 'RIGHT' + - whereto: 'LEFT' or 'RIGHT' + - options: + - pattern: pattern to pack the queue names" + [client source-queue destination-queue wherefrom whereto & options] + (let [{:keys [pattern] + :or {pattern :rq}} options + packed-source-queue (utils/pack-pattern pattern source-queue) + packed-destination-queue (utils/pack-pattern pattern destination-queue) + from-direction (if (= wherefrom "LEFT") + redis.clients.jedis.args.ListDirection/LEFT + redis.clients.jedis.args.ListDirection/RIGHT) + to-direction (if (= whereto "LEFT") + redis.clients.jedis.args.ListDirection/LEFT + redis.clients.jedis.args.ListDirection/RIGHT) + result (.lmove @client packed-source-queue packed-destination-queue from-direction to-direction)] + (log/debug "lmove operation" + {:client client + :source-queue packed-source-queue + :destination-queue packed-destination-queue + :from-direction from-direction + :to-direction to-direction + :result result}) + result)) diff --git a/src/com/moclojer/rq/utils.clj b/src/com/moclojer/rq/utils.clj index c6f9dab..1bf2eca 100644 --- a/src/com/moclojer/rq/utils.clj +++ b/src/com/moclojer/rq/utils.clj @@ -1,4 +1,6 @@ -(ns com.moclojer.rq.utils) +(ns com.moclojer.rq.utils + (:require + [clojure.string :as s])) (defn- pattern->str "Adapts given pattern keyword to a know internal pattern. Raises @@ -21,3 +23,6 @@ (defn unpack-pattern [pattern queue-name] (subs queue-name (count (pattern->str pattern)))) + + + diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj index aa9f651..2c39a86 100644 --- a/test/com/moclojer/rq/queue_test.clj +++ b/test/com/moclojer/rq/queue_test.clj @@ -8,13 +8,15 @@ (t/deftest queue-test (let [client (rq/create-client "redis://localhost:6379") queue-name (str (random-uuid)) - message (utils/gen-message)] + another-queue-name (str (random-uuid)) + message (utils/gen-message) + another-message (utils/gen-message)] (t/testing "raw" (rq-queue/push! client queue-name message) - (rq-queue/push! client queue-name (utils/gen-message)) + (rq-queue/push! client queue-name another-message) (t/is (= 2 (rq-queue/llen client queue-name))) - (t/is (= message (rq-queue/pop! client queue-name :direction :r)))) + (t/is (= message (rq-queue/pop! client queue-name {:direction :r})))) (t/testing "direction" (rq-queue/push! client queue-name message :direction :r) @@ -24,4 +26,84 @@ (rq-queue/push! client queue-name message :pattern :pending) (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) + (t/testing "bpop! left" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) + (rq-queue/push! client queue-name message) + (let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :l})] + (t/is (= message popped-message)) + (t/is (= 0 (rq-queue/llen client queue-name))))) + + (t/testing "bpop! right" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :r})))) + (rq-queue/push! client queue-name message) + (let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :r})] + (t/is (= message popped-message)) + (t/is (= 0 (rq-queue/llen client queue-name))))) + + (t/testing "lindex" + (rq-queue/push! client queue-name message) + (t/is (= message (rq-queue/lindex client queue-name 0)))) + + (t/testing "lset" + (while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l})))) + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (rq-queue/lset client queue-name 0 another-message) + (t/is (= another-message (rq-queue/lindex client queue-name 0))) + (rq-queue/lset client queue-name 1 message) + (t/is (= message (rq-queue/lindex client queue-name 1))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) + + (t/testing "lrem" + (rq-queue/push! client queue-name message) + (rq-queue/lrem client queue-name 1 message) + (t/is (= 0 (rq-queue/llen client queue-name)))) + + (t/testing "linsert" + (rq-queue/push! client queue-name message) + (rq-queue/linsert client queue-name message another-message :pos :before) + (t/is (= another-message (rq-queue/lindex client queue-name 0))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) + + (t/testing "lrange" + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (let [result (rq-queue/lrange client queue-name 0 1)] + (t/is (= [message another-message] (reverse result)))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) + + (t/testing "ltrim" + (let [base-message {:test "hello", :my/test2 "123", :foobar ["321"]} + message (assoc base-message :uuid (java.util.UUID/randomUUID)) + another-message (assoc base-message :uuid (java.util.UUID/randomUUID))] + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name another-message) + (t/is (= "OK" (rq-queue/ltrim client queue-name 1 -1))) + (let [result (rq-queue/lrange client queue-name 0 -1)] + (t/is (= [(dissoc another-message :uuid)] + (map #(dissoc % :uuid) result))))) + (rq-queue/pop! client queue-name :direction :l) + (rq-queue/pop! client queue-name :direction :l)) + + (t/testing "rpoplpush" + (rq-queue/push! client queue-name message) + (rq-queue/rpoplpush client queue-name another-queue-name) + (t/is (= 0 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client another-queue-name :direction :l)))) + + (t/testing "brpoplpush" + (rq-queue/push! client queue-name message) + (rq-queue/brpoplpush client queue-name another-queue-name 1) + (t/is (= 0 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client another-queue-name :direction :l)))) + + (t/testing "lmove" + (rq-queue/push! client queue-name message) + (rq-queue/lmove client queue-name another-queue-name "LEFT" "RIGHT") + (t/is (= 0 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client another-queue-name :direction :r)))) + (rq/close-client client)))