-
Notifications
You must be signed in to change notification settings - Fork 0
/
async.lisp
73 lines (61 loc) · 2.86 KB
/
async.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
(uiop:define-package :async/async
(:nicknames :async)
(:mix :async/job :cl)
(:import-from :gefjon-utils
#:typedec #:func #:void)
(:import-from :cl-cont
#:with-call/cc #:let/cc)
(:export
#:job-queue #:job
#:async #:await #:yield
#:cancel-job-queue
#:wait-for #:job-seq
#:with-executor #:asynchronously))
(in-package :async/async)
(defmacro async ((&optional (job-queue '*job-queue*))
&body body)
"Enqueue BODY as an asynchronous `job' into JOB-QUEUE.
Within BODY, the `await' and `yield' macros are available to delay computation."
`(macrolet ((await (job)
"Block the current job on JOB, causing the current job to not be executed again until JOB finishes, and returning JOB's return values.
May only occur within the lexical and dynamic extent of an `async' block."
`(let/cc callback
(error 'await-condition
:upon ,job
:callback callback)))
(yield ()
"Pause the current job and place it at the end of the job queue.
May only occur within the lexical and dynamic extent of an `async' block."
'(progn
(let/cc callback
(error 'yield-condition
:callback callback))
(values))))
(make-job :executor ,job-queue
:body (lambda ()
(with-call/cc
,@body)))))
(defmacro await (job)
"Block the current job on JOB, causing the current job to not be executed again until JOB finishes, and returning JOB's return values.
May only occur within the lexical and dynamic extent of an `async' block."
(declare (ignore job))
(error "`await' may only appear inside of an `async' block"))
(defmacro yield ()
"Pause the current job and place it at the end of the job queue.
May only occur within the lexical and dynamic extent of an `async' block."
(error "`yield' may only appear inside of an `async' block"))
(defmacro with-executor ((nthreads) &body body)
"Execute BODY in a context where `*job-queue*' is bound to an asynchronous executor with NTHREADS worker threads.
The executor will be closed after the job BODY is finished, and jobs which have not been started at that time
will be cancelled."
`(let* (queue)
(unwind-protect
(let* ((*job-queue* (make-job-queue ,nthreads)))
(setf queue *job-queue*)
,@body)
(cancel-job-queue queue))))
(defmacro asynchronously ((nthreads) &body body)
"Execute BODY as an asynchronous job as if by `async' in a new executor with NTHREADS worker threads, then return the values of BODY.
This will be a horribly inefficient `progn' unless BODY spawns and awaits some other asynchronous jobs."
`(with-executor (,nthreads)
(wait-for (async () ,@body))))