From 79db9d97a9b86af109027955ac323b6b07c1b223 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Thu, 24 Oct 2024 23:49:14 +0300 Subject: [PATCH] Refine the `Awaitable` abstraction --- bench/bench_stack.ml | 86 +++++++++++++++++++ bench/dune | 1 + bench/main.ml | 1 + lib/picos_std.awaitable/dune | 6 +- .../picos_std_awaitable.ml | 52 +++++------ .../picos_std_awaitable.mli | 43 ++++------ lib/picos_std.sync/dune | 2 +- lib/picos_std.sync/picos_std_sync.ml | 1 + lib/picos_std.sync/picos_std_sync.mli | 19 ++++ lib/picos_std.sync/stack.ml | 60 +++++++++++++ 10 files changed, 219 insertions(+), 52 deletions(-) create mode 100644 bench/bench_stack.ml create mode 100644 lib/picos_std.sync/stack.ml diff --git a/bench/bench_stack.ml b/bench/bench_stack.ml new file mode 100644 index 00000000..5ccea8c3 --- /dev/null +++ b/bench/bench_stack.ml @@ -0,0 +1,86 @@ +open Multicore_bench +open Picos_std_sync + +let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = + let t = Stack.create ~padded:true () in + + let op push = + if push then Stack.push t 101 + else match Stack.pop_exn t with _ -> () | exception Stack.Empty -> () + in + + let init _ = + assert ( + match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true); + Util.generate_push_and_pop_sequence n_msgs + in + let work _ bits = Util.Bits.iter op bits in + + Times.record ~budgetf ~n_domains:1 ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" + +let run_one ~budgetf ~n_adders ~n_takers () = + let n_domains = n_adders + n_takers in + + let n_msgs = 50 * Util.iter_factor in + + let t = Stack.create ~padded:true () in + + let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in + let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in + + let init _ = + assert ( + match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true); + Countdown.non_atomic_set n_msgs_to_add n_msgs; + Countdown.non_atomic_set n_msgs_to_take n_msgs + in + let work i () = + if i < n_adders then + let rec work () = + let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in + if 0 < n then begin + for i = 1 to n do + Stack.push t i + done; + work () + end + in + work () + else + let i = i - n_adders in + let rec work () = + let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in + if 0 < n then + let rec loop n = + if 0 < n then begin + match Stack.pop_exn t with + | _ -> loop (n - 1) + | exception Stack.Empty -> + Backoff.once Backoff.default |> ignore; + loop n + end + else work () + in + loop n + in + work () + in + + let config = + let format role n = + Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s") + in + Printf.sprintf "%s, %s" + (format "nb adder" n_adders) + (format "nb taker" n_takers) + in + Times.record ~budgetf ~n_domains ~init ~work () + |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config + +let run_suite ~budgetf = + run_one_domain ~budgetf () + @ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ] + |> List.concat_map @@ fun (n_adders, n_takers) -> + if Picos_domain.recommended_domain_count () < n_adders + n_takers then [] + else run_one ~budgetf ~n_adders ~n_takers ()) diff --git a/bench/dune b/bench/dune index 26f77b0e..51b4bc58 100644 --- a/bench/dune +++ b/bench/dune @@ -22,6 +22,7 @@ (run %{test} -brief "Fib") (run %{test} -brief "Picos binaries") (run %{test} -brief "Bounded_q with Picos_sync") + (run %{test} -brief "Stack") (run %{test} -brief "Memory usage"))) (foreign_stubs (language c) diff --git a/bench/main.ml b/bench/main.ml index 423c3774..cccdbaf3 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -22,6 +22,7 @@ let benchmarks = ("Picos binaries", Bench_binaries.run_suite); ("Bounded_q with Picos_sync", Bench_bounded_q.run_suite); ("Memory usage", Bench_memory.run_suite); + ("Stack", Bench_stack.run_suite); ] let () = Multicore_bench.Cmd.run ~benchmarks () diff --git a/lib/picos_std.awaitable/dune b/lib/picos_std.awaitable/dune index 6b61e758..27e12a26 100644 --- a/lib/picos_std.awaitable/dune +++ b/lib/picos_std.awaitable/dune @@ -1,7 +1,11 @@ (library (name picos_std_awaitable) (public_name picos_std.awaitable) - (libraries picos picos_aux.htbl backoff multicore-magic)) + (libraries + (re_export picos) + picos_aux.htbl + backoff + multicore-magic)) (mdx (package picos_meta) diff --git a/lib/picos_std.awaitable/picos_std_awaitable.ml b/lib/picos_std.awaitable/picos_std_awaitable.ml index 64d73808..95751f80 100644 --- a/lib/picos_std.awaitable/picos_std_awaitable.ml +++ b/lib/picos_std.awaitable/picos_std_awaitable.ml @@ -378,46 +378,46 @@ module Awaitable = struct done with Not_found -> () - let add_as (type a) (t : a awaitable) value = - let trigger = Trigger.create () in - let one : Awaiters.is1 = - One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero } - in - let backoff = ref Backoff.default in - while - not - (match Htbl.find_exn awaiters (Packed t) with - | before -> - let many = Awaiters.snoc before one in - Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many) - | exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one)) - do - backoff := Backoff.once !backoff - done; - one - module Awaiter = struct type t = Awaiters.is1 - let add (type a) (t : a awaitable) = - add_as t (Sys.opaque_identity (Obj.magic awaiters : a)) + let add_as (type a) (t : a awaitable) trigger value = + let one : Awaiters.is1 = + One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero } + in + let backoff = ref Backoff.default in + while + not + (match Htbl.find_exn awaiters (Packed t) with + | before -> + let many = Awaiters.snoc before one in + Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many) + | exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one)) + do + backoff := Backoff.once !backoff + done; + one + + let add (type a) (t : a awaitable) trigger = + let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in + add_as t trigger unique_value let remove one = Awaiters.signal_and_clear one; update (Awaiters.awaitable_of one) ~signal:false ~count:1 + end - let await one = + let await t value = + let trigger = Trigger.create () in + let one = Awaiter.add_as t trigger value in + if Awaiters.is_signalable one then Awaiter.remove one + else match Awaiters.await one with | None -> () | Some exn_bt -> Awaiters.clear one; update (Awaiters.awaitable_of one) ~signal:true ~count:1; Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt) - end - - let await t value = - let one = add_as t value in - if Awaiters.is_signalable one then Awaiter.remove one else Awaiter.await one let[@inline] broadcast t = update (Packed t) ~signal:true ~count:Int.max_int let[@inline] signal t = update (Packed t) ~signal:true ~count:1 diff --git a/lib/picos_std.awaitable/picos_std_awaitable.mli b/lib/picos_std.awaitable/picos_std_awaitable.mli index 135669b8..c348d54b 100644 --- a/lib/picos_std.awaitable/picos_std_awaitable.mli +++ b/lib/picos_std.awaitable/picos_std_awaitable.mli @@ -1,6 +1,8 @@ (** Basic {{:https://en.wikipedia.org/wiki/Futex} futex}-like awaitable atomic location for {!Picos}. *) +open Picos + (** {1 Modules} *) module Awaitable : sig @@ -18,7 +20,7 @@ module Awaitable : sig (** {1 Atomic API} *) - type 'a t + type !'a t (** Represents an awaitable atomic location. *) val make : ?padded:bool -> 'a -> 'a t @@ -90,12 +92,7 @@ module Awaitable : sig implicitly wake up awaiters. *) module Awaiter : sig - (** Ability to await for a signal from the past. - - {!Awaitable.await} only receives a signal at or after the point of - calling it. This API allows the awaiting process to be broken into two - steps, {!add} and {!await}, such that a signal after {!add} can be - received by {!await}. *) + (** Low level interface for more flexible waiting. *) type 'a awaitable := 'a t (** An erased type alias for {!Awaitable.t}. *) @@ -103,21 +100,18 @@ module Awaitable : sig type t (** Represents a single use awaiter of a signal to an {!awaitable}. *) - val add : 'a awaitable -> t - (** [add awaitable] create a single use awaiter, adds it to the FIFO - associated with the awaitable, and returns the awaiter. *) - - val await : t -> unit - (** [await awaiter] awaits for the association awaitable to be signaled. *) + val add : 'a awaitable -> Trigger.t -> t + (** [add awaitable trigger] creates a single use awaiter, adds it to the + FIFO associated with the awaitable, and returns the awaiter. *) val remove : t -> unit (** [remove awaiter] marks the awaiter as having been signaled and removes it from the FIFO associated with the awaitable. - ⚠️ An explicit call of [remove] is needed when an {!add}ed awaiter is not - {!await}ed for. In such a case, from the point of view of lost signals, - the caller of [remove] should be considered to have received or consumed - a signal before the call of [remove]. *) + ℹ️ If the associated trigger is used with only one awaiter and the + {!Trigger.await await} on the trigger returns [None], there is no need + to explicitly remove the awaiter, because it has already been + removed. *) end end @@ -164,7 +158,7 @@ end {2 [Condition]} Let's also implement a condition variable. For that we'll also make use of - low level operations in the {!Picos} core library: + low level abstractions and operations from the {!Picos} core library: {[ # open Picos @@ -180,7 +174,8 @@ end let create () = Awaitable.make () let wait t mutex = - let awaiter = Awaitable.Awaiter.add t in + let trigger = Trigger.create () in + let awaiter = Awaitable.Awaiter.add t trigger in Mutex.unlock mutex; let lock_forbidden mutex = let fiber = Fiber.current () in @@ -188,12 +183,12 @@ end Mutex.lock mutex; Fiber.set fiber ~forbid in - match Awaitable.Awaiter.await awaiter with - | () -> lock_forbidden mutex - | exception exn -> - let bt = Printexc.get_raw_backtrace () in + match Trigger.await trigger with + | None -> lock_forbidden mutex + | Some exn_bt -> + Awaitable.Awaiter.remove awaiter; lock_forbidden mutex; - Printexc.raise_with_backtrace exn bt + Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt) let signal = Awaitable.signal let broadcast = Awaitable.broadcast diff --git a/lib/picos_std.sync/dune b/lib/picos_std.sync/dune index 39b0cbf5..70a2ef97 100644 --- a/lib/picos_std.sync/dune +++ b/lib/picos_std.sync/dune @@ -3,7 +3,7 @@ (public_name picos_std.sync) (libraries (re_export picos_std.event) - picos + picos_std.awaitable backoff multicore-magic)) diff --git a/lib/picos_std.sync/picos_std_sync.ml b/lib/picos_std.sync/picos_std_sync.ml index 007b8cf0..ee5f6d9a 100644 --- a/lib/picos_std.sync/picos_std_sync.ml +++ b/lib/picos_std.sync/picos_std_sync.ml @@ -5,3 +5,4 @@ module Lazy = Lazy module Latch = Latch module Ivar = Ivar module Stream = Stream +module Stack = Stack diff --git a/lib/picos_std.sync/picos_std_sync.mli b/lib/picos_std.sync/picos_std_sync.mli index c4cd77d8..2e7dd370 100644 --- a/lib/picos_std.sync/picos_std_sync.mli +++ b/lib/picos_std.sync/picos_std_sync.mli @@ -409,6 +409,25 @@ module Stream : sig the [cursor] position. *) end +module Stack : sig + (** *) + + type !'a t + (** *) + + val create : ?padded:bool -> ?capacity:int -> unit -> 'a t + (** *) + + val push : 'a t -> 'a -> unit + (** *) + + exception Empty + (** *) + + val pop_exn : 'a t -> 'a + (** *) +end + (** {1 Examples} {2 A simple bounded queue} diff --git a/lib/picos_std.sync/stack.ml b/lib/picos_std.sync/stack.ml new file mode 100644 index 00000000..d11d6e52 --- /dev/null +++ b/lib/picos_std.sync/stack.ml @@ -0,0 +1,60 @@ +open Picos_std_awaitable + +type 'a state = + | Nil of { mutable capacity : int } + | Cons of { mutable capacity : int; value : 'a; rest : 'a state } + +type 'a t = 'a state Awaitable.t + +let one = 0b10 +let busy_bit = 0b01 + +let create ?padded ?capacity () = + let capacity = + match capacity with + | None -> Int.max_int land lnot busy_bit + | Some capacity -> capacity * one + in + Awaitable.make ?padded (Nil { capacity }) + +let rec push t value backoff = + match Awaitable.get t with + | Nil r as before -> + let capacity = r.capacity land lnot busy_bit in + if + Awaitable.compare_and_set t before + (Cons { capacity = capacity - one; value; rest = Nil { capacity } }) + then begin + if r.capacity land busy_bit <> 0 then Awaitable.broadcast t + end + else push t value (Backoff.once backoff) + | Cons r as before -> + let capacity = r.capacity in + if one <= capacity then begin + if + not + (Awaitable.compare_and_set t before + (Cons { capacity = capacity - one; value; rest = before })) + then push t value (Backoff.once backoff) + end + else begin + if capacity <> capacity lor busy_bit then + r.capacity <- capacity lor busy_bit; + Awaitable.await t before; + push t value Backoff.default + end + +exception Empty + +let rec pop_exn t backoff = + match Awaitable.get t with + | Nil _ -> raise_notrace Empty + | Cons r as before -> + if Awaitable.compare_and_set t before r.rest then begin + if r.capacity land busy_bit <> 0 then Awaitable.broadcast t; + r.value + end + else pop_exn t (Backoff.once backoff) + +let[@inline] pop_exn t = pop_exn t Backoff.default +let[@inline] push t value = push t value Backoff.default