Skip to content

Commit

Permalink
Update to Bordeaux-Threads 2 (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
hxzrx authored May 5, 2024
1 parent bcd08c5 commit 9d20a59
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 86 deletions.
12 changes: 6 additions & 6 deletions bench.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
(defun runner-bt (&optional (withreply-p nil) (asyncask nil) (queue-size 0))
(declare (ignore queue-size))
;; dispatchers used for the async-ask
(setf *per-thread* 125000)
(setf *per-thread* 125000)
(setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8)))))
(setf *actor* (ac:actor-of *system*
:receive (lambda (msg)
Expand All @@ -32,9 +32,9 @@
(format t "Times: ~a~%" (max-loop))
(time
(progn
(map nil #'bt:join-thread
(map nil #'bt2:join-thread
(mapcar (lambda (x)
(bt:make-thread
(bt2:make-thread
(lambda ()
(dotimes (n *per-thread*)
(if withreply-p
Expand Down Expand Up @@ -68,9 +68,9 @@
(format t "Times: ~a~%" (max-loop))
(time
(progn
(map nil #'bt:join-thread
(map nil #'bt2:join-thread
(mapcar (lambda (x)
(bt:make-thread
(bt2:make-thread
(lambda ()
(dotimes (n *per-thread*)
(if withreply-p
Expand Down Expand Up @@ -112,7 +112,7 @@

;; (unwind-protect
;; (time
;; (progn
;; (progn
;; (map nil #'lparallel:force
;; (mapcar (lambda (x)
;; (lparallel:future
Expand Down
16 changes: 8 additions & 8 deletions src/actor-cell.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ one can decide what kind of message-box/dispatcher should be used for the new `a
See `actor-context` `actor-of` method for more information on this.
To stop an `actor` message handling and you can send the `:stop` message
To stop an `actor` message handling and you can send the `:stop` message
either via `call` (which will respond with `:stopped`) or `cast`.
This is to cleanup thread resources when the actor is not needed anymore.
Expand Down Expand Up @@ -104,12 +104,12 @@ State of the cell can be changed via `setf`ing `*state*` variable."))

(defgeneric stop (actor-cell &optional wait)
(:documentation "Stops the actor-cells message processing gracefully.
This is not an immediate stop.
This is not an immediate stop.
There are two ways to stop an actor (cell).
1. by calling this function.
It is not an immediate stop. The actor will finish the current message processing.
`wait`: waits until the cell is stopped.
It is not an immediate stop. The actor will finish the current message processing.
`wait`: waits until the cell is stopped.
2. by sending `:stop` to the actor (cell).
This won't allow to wait when the actor is stopped, even not with `ask-s`.
Expand All @@ -124,9 +124,9 @@ The `:stop` message (symbol) is normally processed by the actors message process
Specify a timeout in seconds if you require a result within a certain period of time.
Be aware though that this is a resource intensive wait based on a waiting thread.
The result can be of different types.
Normal result: the last expression of `handle-call` (or `receive` in `act:actor`) implementation.
Normal result: the last expression of `handle-call` (or `receive` in `act:actor`) implementation.
Error result: `(cons :handler-error <condition>)'
In case of time-out the error condition is a bt:timeout."
In case of time-out the error condition is a bt2:timeout."
(when message
(let ((result (submit-message actor-cell message t nil time-out)))
(log:debug "~a: message process result: ~a" (name actor-cell) result)
Expand All @@ -138,7 +138,7 @@ If a `sender' is specified the result will be sent to the sender."
(when message
(let ((result (submit-message actor-cell message nil sender nil)))
(log:debug "~a: message process result: ~a" (name actor-cell) result)
result)))
result)))

(defun running-p (actor-cell)
"Returns true if this server is running. `nil` otherwise."
Expand All @@ -153,7 +153,7 @@ If a `sender' is specified the result will be sent to the sender."
(when msgbox
(mesgb:stop msgbox wait)))))

;; -----------------------------------------------
;; -----------------------------------------------
;; internal functions
;; -----------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion src/actor.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ In any case stop the actor-cell. See `actor-cell:stop` for more info on stopping
(handler-case
(timeutils:with-waitfor (time-out)
(timeutils:wait-cond (lambda () result-received-p) 0.1))
(bt:timeout (c)
(bt2:timeout (c)
(handle-timeout c))))))))))

;; -------------------------------
Expand Down
8 changes: 4 additions & 4 deletions src/atomic/atomic-clisp.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
lock)

(defun make-atomic-reference (&key (value nil))
(%make-atomic-reference value (bt:make-lock)))
(%make-atomic-reference value (bt2:make-lock)))

(defmethod print-object ((ref atomic-reference) stream)
(print-unreadable-object (ref stream :type t :identity t)
Expand All @@ -20,7 +20,7 @@

(defmethod atomic-cas ((ref atomic-reference) old new)
(declare (ignore old))
(bt:with-lock-held ((atomic-reference-lock ref))
(bt2:with-lock-held ((atomic-reference-lock ref))
(setf (atomic-place ref) new)))

(defmethod atomic-get ((ref atomic-reference))
Expand All @@ -39,7 +39,7 @@
lock)

(defun make-atomic-integer (&key (value 0))
(%make-atomic-reference value (bt:make-lock)))
(%make-atomic-reference value (bt2:make-lock)))

(defmethod print-object ((int atomic-integer) stream)
(print-unreadable-object (int stream :type t :identity t)
Expand All @@ -50,7 +50,7 @@

(defmethod atomic-cas ((int atomic-integer) old new)
(declare (ignore old))
(bt:with-lock-held ((atomic-reference-lock int))
(bt2:with-lock-held ((atomic-reference-lock int))
(setf (atomic-place int) new)))

(defmethod atomic-swap ((int atomic-integer) fn &rest args)
Expand Down
18 changes: 9 additions & 9 deletions src/fcomputation.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
(defclass future ()
((promise :initform nil))
(:documentation
"The wrapped [blackbird](https://orthecreedence.github.io/blackbird/) `promise`, here called `future`.
Not all features of blackbird's `promise` are supported.
"The wrapped [blackbird](https://orthecreedence.github.io/blackbird/) `promise`, here called `future`.
Not all features of blackbird's `promise` are supported.
This `future` wrapper changes the terminology. A `future` is a delayed computation.
A `promise` is the fulfillment of the delayed computation."))

Expand All @@ -47,7 +47,7 @@ Example:
```
(with-fut-resolve
(bt:make-thread
(bt2:make-thread
(lambda ()
(let ((result (do-some-lengthy-calculation)))
(fresolve result)))))
Expand All @@ -62,15 +62,15 @@ Example:
"Creates a future. `execute-fun` is the lambda that is executed when the future is created.
`execute-fun` takes a parameter which is the `execute-fun` funtion. `execute-fun` function
takes the `promise` as parameter which is the computed value. Calling `execute-fun` with the promise
will fulfill the `future`.
will fulfill the `future`.
Manually calling `execute-fun` to fulfill the `future` is in contrast to just fulfill the `future` from a return value. The benefit of the `execute-fun` is flexibility. In a multi-threaded environment `execute-fun` could spawn a thread, in which case `execute-fun` would return immediately but no promise-value can be given at that time. The `execute-fun` can be called from a thread and provide the promise.
Create a future with:
```elisp
(make-future (lambda (execute-fun)
(make-future (lambda (execute-fun)
(let ((promise (delayed-computation)))
(bt:make-thread (lambda ()
(bt2:make-thread (lambda ()
(sleep 0.5)
(funcall execute-fun promise))))))
```
Expand Down Expand Up @@ -106,7 +106,7 @@ Create a future with:
(defmacro fcompleted (future (result) &body body)
"Completion handler on the given `future`.
If the `future` is already complete then the `body` executed immediately.
If the `future` is already complete then the `body` executed immediately.
`result` represents the future result.
`body` is executed when future completed.
Returns the future.
Expand All @@ -124,9 +124,9 @@ Example:
`(%fcompleted ,future (lambda (,result) ,@body)))

(defun fawait (fut &key timeout (sleep-time 0.1))
"Wait for the future `FUT` to be ready. Returns `VALUES` with `result' of the future and `FUT'.
"Wait for the future `FUT` to be ready. Returns `VALUES` with `result' of the future and `FUT'.
If the future is not ready after `TIMEOUT` seconds the `result' is `NIL'.
The `SLEEP-TIME` parameter specifies the time to sleep between checks of the future.
The `SLEEP-TIME` parameter specifies the time to sleep between checks of the future.
The wait is based on attempts. To be accurate in terms of `TIMEOUT` the `SLEEP-TIME` should be a divisor of `TIMEOUT`.
Disclaimer: naive implementation. There may be better solutions."
(assert (and timeout (>= timeout 0)) (timeout) "Timeout must be greater or equal to 0")
Expand Down
28 changes: 14 additions & 14 deletions src/mbox/message-box.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The default name is concatenated of \"mesgb-\" and a `gensym` generated random n
:initarg :max-queue-size
:reader max-queue-size
:documentation
"0 or nil will make an unbounded queue.
"0 or nil will make an unbounded queue.
A value `> 0` will make a bounded queue.
Don't make it too small. A queue size of 1000 might be a good choice."))
(:documentation "The user does not need to create a message-box manually. It is automatically created and added to the `actor` when the actor is created through `ac:actor-of`."))
Expand Down Expand Up @@ -121,7 +121,7 @@ this kind of queue because each message-box (and with that each actor) requires

(defmethod initialize-instance :after ((self message-box/bt) &key)
(with-slots (name queue-thread) self
(setf queue-thread (bt:make-thread
(setf queue-thread (bt2:make-thread
(lambda () (message-processing-loop self))
:name (mkstr "message-thread-" name))))
(when (next-method-p)
Expand Down Expand Up @@ -159,23 +159,23 @@ This function sets the result as `handler-result' in `item'. The return of this
(when cancelled-p
(log:warn "~a: item got cancelled: ~a" (name msgbox) item)
(when withreply-p
(bt:condition-notify withreply-cvar))
(bt2:condition-notify withreply-cvar))
(return-from process-queue-item :cancelled))

(flet ((handler-fun ()
(log:trace "~a: withreply: handler-fun-args..." (name msgbox))
(setf handler-result
(call-handler-fun handler-fun-args message))
(log:trace "~a: withreply: handler-fun-args result: ~a"
(name msgbox) handler-result)))
(if withreply-p
(bt:with-lock-held (withreply-lock)
(bt2:with-lock-held (withreply-lock)
;; make sure we release the lock also on error
(unwind-protect
(if time-out
(unless cancelled-p (handler-fun))
(handler-fun))
(bt:condition-notify withreply-cvar)))
(bt2:condition-notify withreply-cvar)))
(handler-fun)))))

(defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args)
Expand All @@ -200,8 +200,8 @@ The submitting code has to await the side-effect and possibly handle a timeout."

(defun submit/reply (msgbox queue message time-out handler-fun-args)
"This function has to provide a result and so it has to wait until the queue thread has processed the message. Processing of the queue item is done in `process-queue-item'."
(let* ((withreply-lock (bt:make-lock))
(withreply-cvar (bt:make-condition-variable))
(let* ((withreply-lock (bt2:make-lock))
(withreply-cvar (bt2:make-condition-variable))
(push-item (make-message-item/bt
:message message
:withreply-p t
Expand All @@ -211,14 +211,14 @@ The submitting code has to await the side-effect and possibly handle a timeout."
:handler-fun-args handler-fun-args
:handler-result 'no-result)))
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))
(bt:with-lock-held (withreply-lock)
(bt2:with-lock-held (withreply-lock)
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
(queue:pushq queue push-item)

(if time-out
(wait-and-probe-for-msg-handler-result msgbox push-item)
(bt:condition-wait withreply-cvar withreply-lock)))
(bt2:condition-wait withreply-cvar withreply-lock)))

(with-slots (handler-result) push-item
(log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result)
handler-result)))
Expand Down Expand Up @@ -256,7 +256,7 @@ The submitting code has to await the side-effect and possibly handle a timeout."
:reader dispatcher
:documentation
"The dispatcher from the system.")
(lock :initform (bt:make-lock)))
(lock :initform (bt2:make-lock)))
(:documentation
"This message box is a message-box that uses the `system`s `dispatcher`.
This has the advantage that an almost unlimited actors/agents can be created.
Expand Down Expand Up @@ -286,12 +286,12 @@ The `handler-fun-args' is part of the message item."
(log:warn "~a: item got cancelled: ~a" name popped-item))
(unless cancelled-p
;; protect the actor from concurrent state changes on the shared dispatcher
(bt:acquire-lock lock t)
(bt2:acquire-lock lock :wait t)
(unwind-protect
(unless cancelled-p
(setf handler-result (call-handler-fun handler-fun-args message))
handler-result)
(bt:release-lock lock))))))
(bt2:release-lock lock))))))

(defmethod submit ((self message-box/dp) message withreply-p time-out handler-fun-args)
"Submitting a message on a multi-threaded `dispatcher` is different as submitting on a single threaded message-box. On a single threaded message-box the order of message processing is guaranteed even when submitting from multiple threads. On the `dispatcher` this is not the case. The order cannot be guaranteed when messages are processed by different `dispatcher` threads. However, we still guarantee a 'single-threadedness' regarding the state of the actor. This is achieved here by protecting the `handler-fun-args` execution with a lock.
Expand Down
12 changes: 6 additions & 6 deletions src/queue/queue-locked.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,24 @@ Copyright (c) 2011-2012, James M. Lawrence. All rights reserved.

(defclass queue-unbounded (queue-base)
((queue :initform (make-queue))
(lock :initform (bt:make-lock))
(cvar :initform (bt:make-condition-variable)))
(lock :initform (bt2:make-lock))
(cvar :initform (bt2:make-condition-variable)))
(:documentation "Unbounded queue."))

(defmethod pushq ((self queue-unbounded) element)
(with-slots (queue lock cvar) self
(bt:with-lock-held (lock)
(bt2:with-lock-held (lock)
(enqueue element queue)
(bt:condition-notify cvar))))
(bt2:condition-notify cvar))))

(defmethod popq ((self queue-unbounded))
(with-slots (queue lock cvar) self
(bt:with-lock-held (lock)
(bt2:with-lock-held (lock)
(loop (multiple-value-bind (value presentp)
(dequeue queue)
(if presentp
(return value)
(bt:condition-wait cvar lock)))))))
(bt2:condition-wait cvar lock)))))))

(defmethod emptyq-p ((self queue-unbounded))
(with-slots (queue) self
Expand Down
12 changes: 6 additions & 6 deletions src/queue/queue.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@

(defclass queue-bounded (queue-base)
((queue :initform nil)
(lock :initform (bt:make-lock))
(cvar :initform (bt:make-condition-variable))
(lock :initform (bt2:make-lock))
(cvar :initform (bt2:make-condition-variable))
(max-items :initform 1000 :initarg :max-items)
(fill-count :initform 0)) ; cl-speedy-queue has issues with queued items count
(:documentation "Bounded queue."))
Expand All @@ -58,16 +58,16 @@
(with-slots (queue lock cvar fill-count max-items) self
(when (>= fill-count max-items)
(error 'queue-full-error :queue self))
(bt:with-lock-held (lock)
(bt2:with-lock-held (lock)
(cl-speedy-queue:enqueue element queue)
(incf fill-count)
(bt:condition-notify cvar))))
(bt2:condition-notify cvar))))

(defmethod popq ((self queue-bounded))
(with-slots (queue lock cvar) self
(bt:with-lock-held (lock)
(bt2:with-lock-held (lock)
(loop :while (cl-speedy-queue:queue-empty-p queue)
:do (bt:condition-wait cvar lock)
:do (bt2:condition-wait cvar lock)
:finally (return
(progn
(decf (slot-value self 'fill-count))
Expand Down
8 changes: 4 additions & 4 deletions src/timeutils.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ This blocks the calling thread."
"Spawns thread with timeout. Blocks until computation is done, or timeout elapsed."
(with-gensyms (c)
`(handler-case
(bt:with-timeout (,wait-time)
(bt2:with-timeout (,wait-time)
,@body)
(bt:timeout (,c)
(bt2:timeout (,c)
(error ,c))
;; the below is not needed anymore with SBCL 2.1. Will keep it anyway for compatibility.
#+sbcl
(sb-ext:timeout (,c)
(declare (ignore ,c))
(log:warn "sb-ext:timeout, wrapping to 'expired'.")
(error 'bt:timeout :length ,wait-time)))))
(error 'bt2:timeout :seconds ,wait-time)))))

(defun make-timer (delay run-fun)
(bt:make-thread (lambda ()
(bt2:make-thread (lambda ()
(sleep delay)
(funcall run-fun))
:name (string (gensym "timer-"))))
Loading

0 comments on commit 9d20a59

Please sign in to comment.