Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding redis common commands #16

Merged
merged 10 commits into from
Aug 19, 2024
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,5 @@ sequenceDiagram
Client-->>Logger: log closing client
Client-->>User: confirm client closure
```

Read more about the project [here](docs/README.md).
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# CLJ-RQ
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion src/com/moclojer/rq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
(atom pool)))))

(defn close-client
"Disconnect and close redis client"
"Disconnect and close redis client.
If no specific client is passed, the global client stored is closed;"
([] (close-client *redis-pool*))
([client] (.close @client)))


225 changes: 217 additions & 8 deletions src/com/moclojer/rq/queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
(:require
[clojure.edn :as edn]
[clojure.tools.logging :as log]
[com.moclojer.rq.utils :as utils]))
[com.moclojer.rq.utils :as utils]
[clojure.string :as s]))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(defn push!
"Push a message into a queue.
Expand Down Expand Up @@ -34,11 +35,13 @@

(defn pop!
"Pop a message from a queue.

Options:

- direction: Direction to pop the message (:l or :r)
- pattern: Pattern for the queue name"
Parameters:
- client: Redis client
- queue-name: Name of the queue
- options:
- direction: Direction to pop the message (:l or :r)
- pattern: Pattern for the queue name"
[client queue-name & options]
(let [{:keys [direction pattern]
:or {direction :l
Expand All @@ -50,7 +53,6 @@
(.rpop @client packed-queue-name))]

(when message

(log/debug "popped from queue"
{:client client
:queue-name packed-queue-name
Expand All @@ -69,5 +71,212 @@
- pattern: Pattern for the queue name"
[client queue-name & options]
(let [{:keys [pattern]
:or {pattern :rq}} options]
(.llen @client (utils/pack-pattern pattern queue-name))))
:or {pattern :rq}} options
packed-queue-name (utils/pack-pattern pattern queue-name)]
(.llen @client packed-queue-name)))

(defn bpop!
"Pop a message from a queue. Blocking if necessary.

Parameters:
- client: Redis client
- queue-name: Name of the queue
- tmot: Blocking timeout
- options:
- direction: Direction to pop the message (:l or :r)
- pattern: Pattern for the queue name"
[client queue-name tmot & {:keys [direction pattern]
:or {direction :l pattern :rq}}]
(let [packed-queue-name (utils/pack-pattern pattern queue-name)
result (if (= direction :l)
(.blpop @client tmot packed-queue-name)
(.brpop @client tmot packed-queue-name))]
(when result
(let [message (second result)]
(log/debug "popped from queue"
{:client client
:queue-name packed-queue-name
:options {:direction direction :pattern pattern}
:message message})
(edn/read-string message)))))

(defn lrange
"Return an entire range given min and max indexes

Parameters:
- client: Redis client
- queue-name: Name of the queue
- floor: floor index
- ceil: ceiling index"
[client queue-name floor ceil & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-queue (utils/pack-pattern pattern queue-name)
result (.lrange @client packed-queue floor ceil)]
result))

(defn lindex
"Return a element in a specified index

Parameters:
- client: Redis client
- queue-name: Name of the queue
- index: specific index to access"
[client queue-name index & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-queue-name (utils/pack-pattern pattern queue-name)
result (.lindex @client packed-queue-name index)]

(let [message (clojure.edn/read-string result)]
(log/debug "message found"
{:client client
:queue-name packed-queue-name
:index index
:message message})
message)))

(defn lset
"Set a new message in a specified index

Parameters:
- client: Redis client
- queue-name: Name of the queue
- index: specific index to access
- message: new msg to be added"
[client queue-name index message & options]
(let [{:keys [pattern]
:or {pattern :rq} :as opts} options
packed-queue-name (utils/pack-pattern pattern queue-name)
encoded-message (clojure.edn/read-string (str message))
return (.lset @client packed-queue-name index (str encoded-message))]

(log/debug "set in queue"
{:client client
:queue-name packed-queue-name
:message (str encoded-message)
:index index
:options opts
:return return})
return))

(defn lrem
"Removes a specified occurance of the message given (if any)

Parameters:
- client: Redis client
- queue-name: Name of the queue
- msg: new msg to be added
- cnt: count
count > 0: Remove elements equal to element moving from head to tail.
count < 0: Remove elements equal to element moving from tail to head.
count = 0: Remove all elements equal to element."
[client queue-name cnt msg & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-queue-name (utils/pack-pattern pattern queue-name)
return (.lrem @client packed-queue-name cnt (str msg))]

(log/debug "removed from queue"
{:client client
:queue-name queue-name
:msg (clojure.edn/read-string (str msg))
:count cnt
:return return})
return))

(defn linsert
"Insert a message before the first occurance of a pivot given.

Parameters:
- client: Redis client
- queue-name: Name of the queue
- msg: new msg to be added
- pivot: pivot message to be added before or after
- options:
- pos (keywords):
- before: insert the message before the pivot
- after: insert the message after the pivot"
[client queue-name msg pivot & options]
(let [{:keys [pos pattern]
:or {pos :before
pattern :rq} :as opts} options
packed-queue-name (utils/pack-pattern pattern queue-name)
encoded-message (str (clojure.edn/read-string (str msg)))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
encoded-pivot (str (clojure.edn/read-string (str pivot)))
encoded-pos (str (s/capitalize (str pivot)))
return (.linsert @client packed-queue-name encoded-pos encoded-pivot encoded-message)]
(log/debug "inserted in queue"
{:client client
:queue-name queue-name
:msg (clojure.edn/read-string (str msg))
:opts opts
:return return})
return))

(defn ltrim
"Trim a list to the specified range.

Parameters:
- client: Redis client
- queue-name: Name of the queue
- start: start index
- stop: stop index
- options:
- pattern: pattern to pack the queue name"
[client queue-name start stop & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-queue-name (utils/pack-pattern pattern queue-name)]
(.ltrim @client packed-queue-name start stop)))

(defn rpoplpush
"Remove the last element in a list and append it to another list.

Parameters:
- client: Redis client
- source-queue: Name of the source queue
- destination-queue: Name of the destination queue
- options:
- pattern: pattern to pack the queue names"
[client source-queue destination-queue & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-source-queue (utils/pack-pattern pattern source-queue)
packed-destination-queue (utils/pack-pattern pattern destination-queue)]
(.rpoplpush @client packed-source-queue packed-destination-queue)))

(defn brpoplpush
"Remove the last element in a list and append it to another list, blocking if necessary.

Parameters:
- client: Redis client
- source-queue: Name of the source queue
- destination-queue: Name of the destination queue
- timeout: timeout in seconds
- options:
- pattern: pattern to pack the queue names"
[client source-queue destination-queue timeout & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-source-queue (utils/pack-pattern pattern source-queue)
packed-destination-queue (utils/pack-pattern pattern destination-queue)]
(.brpoplpush @client packed-source-queue packed-destination-queue timeout)))

(defn lmove
"Atomically return and remove the first/last element (head/tail depending on the wherefrom argument) of the source list, and push the element as the first/last element (head/tail depending on the whereto argument) of the destination list.

Parameters:
- client: Redis client
- source-queue: Name of the source queue
- destination-queue: Name of the destination queue
- wherefrom: 'LEFT' or 'RIGHT'
- whereto: 'LEFT' or 'RIGHT'
- options:
- pattern: pattern to pack the queue names"
[client source-queue destination-queue wherefrom whereto & options]
(let [{:keys [pattern]
:or {pattern :rq}} options
packed-source-queue (utils/pack-pattern pattern source-queue)
packed-destination-queue (utils/pack-pattern pattern destination-queue)]
(.lmove @client packed-source-queue packed-destination-queue wherefrom whereto)))
1 change: 1 addition & 0 deletions test/com/moclojer/rq/pubsub_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
(swap! state conj msg))})
chans-msgs)}))

;;
(t/deftest pubsub-test
(let [client (rq/create-client "redis://localhost:6379")]

Expand Down
76 changes: 73 additions & 3 deletions test/com/moclojer/rq/queue_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
(t/deftest queue-test
(let [client (rq/create-client "redis://localhost:6379")
queue-name (str (random-uuid))
message (utils/gen-message)]
another-queue-name (str (random-uuid))
message (utils/gen-message)
another-message (utils/gen-message)]

(t/testing "raw"
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name (utils/gen-message))
(rq-queue/push! client queue-name another-message)
(t/is (= 2 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client queue-name :direction :r))))
(t/is (= message (rq-queue/pop! client queue-name {:direction :r}))))

(t/testing "direction"
(rq-queue/push! client queue-name message :direction :r)
Expand All @@ -24,4 +26,72 @@
(rq-queue/push! client queue-name message :pattern :pending)
(t/is (= message (rq-queue/pop! client queue-name :pattern :pending))))

(t/testing "bpop! left"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l}))))
(rq-queue/push! client queue-name message)
(let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :l})]
(t/is (= message popped-message))
(t/is (= 0 (rq-queue/llen client queue-name)))))

(t/testing "bpop! right"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :r}))))
(rq-queue/push! client queue-name message)
(let [popped-message (rq-queue/bpop! client queue-name 1 {:direction :r})]
(t/is (= message popped-message))
(t/is (= 0 (rq-queue/llen client queue-name)))))

(t/testing "lindex"
(rq-queue/push! client queue-name message)
(t/is (= message (rq-queue/lindex client queue-name 0))))

(t/testing "lset"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l}))))
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name another-message)
(rq-queue/lset client queue-name 0 another-message)
(t/is (= another-message (rq-queue/lindex client queue-name 0)))
(rq-queue/lset client queue-name 1 message)
(t/is (= message (rq-queue/lindex client queue-name 1)))
(rq-queue/pop! client queue-name :direction :l)
(rq-queue/pop! client queue-name :direction :l))
(t/testing "lrem"
(rq-queue/push! client queue-name message)
(rq-queue/lrem client queue-name 1 message)
(t/is (= 0 (rq-queue/llen client queue-name))))

(t/testing "linsert"
(rq-queue/push! client queue-name message)
(rq-queue/linsert client queue-name another-message message :pos :before)
(t/is (= another-message (rq-queue/lindex client queue-name 0))))

(t/testing "ltrim"
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name another-message)
(rq-queue/ltrim client queue-name 1 -1)
(t/is (= another-message (rq-queue/lindex client queue-name 0))))

(t/testing "lrange"
(while (not (nil? (rq-queue/bpop! client queue-name 1 {:direction :l}))))
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name another-message)
(t/is (= [message another-message] (rq-queue/lrange client queue-name 0 1))))

(t/testing "rpoplpush"
(rq-queue/push! client queue-name message)
(rq-queue/rpoplpush client queue-name another-queue-name)
(t/is (= 0 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client another-queue-name :direction :l))))

(t/testing "brpoplpush"
(rq-queue/push! client queue-name message)
(rq-queue/brpoplpush client queue-name another-queue-name 1)
(t/is (= 0 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client another-queue-name :direction :l))))

(t/testing "lmove"
(rq-queue/push! client queue-name message)
(rq-queue/lmove client queue-name another-queue-name "LEFT" "RIGHT")
(t/is (= 0 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client another-queue-name :direction :r))))

(rq/close-client client)))
18 changes: 10 additions & 8 deletions test/com/moclojer/rq/utils_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
[com.moclojer.rq.utils :as utils]))

(t/deftest pattern->str-test
[(t/is "my-queue" (utils/pack-pattern :none "my-queue"))
(t/is "rq:my-queue" (utils/pack-pattern :rq "my-queue"))
(t/is "rq:pubsub:my-queue" (utils/pack-pattern :pubsub "my-queue"))
(t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))]
(t/testing "packing"
[(t/is "my-queue" (utils/pack-pattern :none "my-queue"))
(t/is "rq:my-queue" (utils/pack-pattern :rq "my-queue"))
(t/is "rq:pubsub:my-queue" (utils/pack-pattern :pubsub "my-queue"))
(t/is "rq:pubsub:pending:my-queue" (utils/pack-pattern :pending "my-queue"))])

[(t/is "my-queue" (utils/unpack-pattern :none "my-queue"))
(t/is "my-queue" (utils/unpack-pattern :rq "rq:my-queue"))
(t/is "my-queue" (utils/unpack-pattern :pubsub "rq:pubsub:my-queue"))
(t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))])
(t/testing "unpacking"
[(t/is "my-queue" (utils/unpack-pattern :none "my-queue"))
(t/is "my-queue" (utils/unpack-pattern :rq "rq:my-queue"))
(t/is "my-queue" (utils/unpack-pattern :pubsub "rq:pubsub:my-queue"))
(t/is "my-queue" (utils/unpack-pattern :pending "rq:pubsub:pending:my-queue"))]))
Loading
Loading