Skip to content

Commit

Permalink
Refine the Awaitable abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Oct 24, 2024
1 parent 1ba3357 commit 2ab880c
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 52 deletions.
86 changes: 86 additions & 0 deletions bench/bench_stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
open Multicore_bench
open Picos_std_sync

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Stack.create ~padded:true () in

let op push =
if push then Stack.push t 101
else match Stack.pop_exn t with _ -> () | exception Stack.Empty -> ()
in

let init _ =
assert (
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ~n_adders ~n_takers () =
let n_domains = n_adders + n_takers in

let n_msgs = 50 * Util.iter_factor in

let t = Stack.create ~padded:true () in

let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in

let init _ =
assert (
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
Countdown.non_atomic_set n_msgs_to_add n_msgs;
Countdown.non_atomic_set n_msgs_to_take n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
if 0 < n then begin
for i = 1 to n do
Stack.push t i
done;
work ()
end
in
work ()
else
let i = i - n_adders in
let rec work () =
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
if 0 < n then
let rec loop n =
if 0 < n then begin
match Stack.pop_exn t with
| _ -> loop (n - 1)
| exception Stack.Empty ->
Backoff.once Backoff.default |> ignore;
loop n
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
else run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
(run %{test} -brief "Fib")
(run %{test} -brief "Picos binaries")
(run %{test} -brief "Bounded_q with Picos_sync")
(run %{test} -brief "Stack")
(run %{test} -brief "Memory usage")))
(foreign_stubs
(language c)
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let benchmarks =
("Picos binaries", Bench_binaries.run_suite);
("Bounded_q with Picos_sync", Bench_bounded_q.run_suite);
("Memory usage", Bench_memory.run_suite);
("Stack", Bench_stack.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
6 changes: 5 additions & 1 deletion lib/picos_std.awaitable/dune
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
(library
(name picos_std_awaitable)
(public_name picos_std.awaitable)
(libraries picos picos_aux.htbl backoff multicore-magic))
(libraries
(re_export picos)
picos_aux.htbl
backoff
multicore-magic))

(mdx
(package picos_meta)
Expand Down
52 changes: 26 additions & 26 deletions lib/picos_std.awaitable/picos_std_awaitable.ml
Original file line number Diff line number Diff line change
Expand Up @@ -378,46 +378,46 @@ module Awaitable = struct
done
with Not_found -> ()

let add_as (type a) (t : a awaitable) value =
let trigger = Trigger.create () in
let one : Awaiters.is1 =
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
in
let backoff = ref Backoff.default in
while
not
(match Htbl.find_exn awaiters (Packed t) with
| before ->
let many = Awaiters.snoc before one in
Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many)
| exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one))
do
backoff := Backoff.once !backoff
done;
one

module Awaiter = struct
type t = Awaiters.is1

let add (type a) (t : a awaitable) =
add_as t (Sys.opaque_identity (Obj.magic awaiters : a))
let add_as (type a) (t : a awaitable) trigger value =
let one : Awaiters.is1 =
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
in
let backoff = ref Backoff.default in
while
not
(match Htbl.find_exn awaiters (Packed t) with
| before ->
let many = Awaiters.snoc before one in
Htbl.try_compare_and_set awaiters (Packed t) before (Min1 many)
| exception Not_found -> Htbl.try_add awaiters (Packed t) (Min1 one))
do
backoff := Backoff.once !backoff
done;
one

let add (type a) (t : a awaitable) trigger =
let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in
add_as t trigger unique_value

let remove one =
Awaiters.signal_and_clear one;
update (Awaiters.awaitable_of one) ~signal:false ~count:1
end

let await one =
let await t value =
let trigger = Trigger.create () in
let one = Awaiter.add_as t trigger value in
if Awaiters.is_signalable one then Awaiter.remove one
else
match Awaiters.await one with
| None -> ()
| Some exn_bt ->
Awaiters.clear one;
update (Awaiters.awaitable_of one) ~signal:true ~count:1;
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
end

let await t value =
let one = add_as t value in
if Awaiters.is_signalable one then Awaiter.remove one else Awaiter.await one

let[@inline] broadcast t = update (Packed t) ~signal:true ~count:Int.max_int
let[@inline] signal t = update (Packed t) ~signal:true ~count:1
Expand Down
43 changes: 19 additions & 24 deletions lib/picos_std.awaitable/picos_std_awaitable.mli
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(** Basic {{:https://en.wikipedia.org/wiki/Futex} futex}-like awaitable atomic
location for {!Picos}. *)

open Picos

(** {1 Modules} *)

module Awaitable : sig
Expand All @@ -18,7 +20,7 @@ module Awaitable : sig

(** {1 Atomic API} *)

type 'a t
type !'a t
(** Represents an awaitable atomic location. *)

val make : ?padded:bool -> 'a -> 'a t
Expand Down Expand Up @@ -90,34 +92,26 @@ module Awaitable : sig
implicitly wake up awaiters. *)

module Awaiter : sig
(** Ability to await for a signal from the past.
{!Awaitable.await} only receives a signal at or after the point of
calling it. This API allows the awaiting process to be broken into two
steps, {!add} and {!await}, such that a signal after {!add} can be
received by {!await}. *)
(** Low level interface for more flexible waiting. *)

type 'a awaitable := 'a t
(** An erased type alias for {!Awaitable.t}. *)

type t
(** Represents a single use awaiter of a signal to an {!awaitable}. *)

val add : 'a awaitable -> t
(** [add awaitable] create a single use awaiter, adds it to the FIFO
associated with the awaitable, and returns the awaiter. *)

val await : t -> unit
(** [await awaiter] awaits for the association awaitable to be signaled. *)
val add : 'a awaitable -> Trigger.t -> t
(** [add awaitable trigger] creates a single use awaiter, adds it to the
FIFO associated with the awaitable, and returns the awaiter. *)

val remove : t -> unit
(** [remove awaiter] marks the awaiter as having been signaled and removes it
from the FIFO associated with the awaitable.
⚠️ An explicit call of [remove] is needed when an {!add}ed awaiter is not
{!await}ed for. In such a case, from the point of view of lost signals,
the caller of [remove] should be considered to have received or consumed
a signal before the call of [remove]. *)
ℹ️ If the associated trigger is used with only one awaiter and the
{!Trigger.await await} on the trigger returns [None], there is no need
to explicitly remove the awaiter, because it has already been
removed. *)
end
end

Expand Down Expand Up @@ -164,7 +158,7 @@ end
{2 [Condition]}
Let's also implement a condition variable. For that we'll also make use of
low level operations in the {!Picos} core library:
low level abstractions and operations from the {!Picos} core library:
{[
# open Picos
Expand All @@ -180,20 +174,21 @@ end
let create () = Awaitable.make ()
let wait t mutex =
let awaiter = Awaitable.Awaiter.add t in
let trigger = Trigger.create () in
let awaiter = Awaitable.Awaiter.add t trigger in
Mutex.unlock mutex;
let lock_forbidden mutex =
let fiber = Fiber.current () in
let forbid = Fiber.exchange fiber ~forbid:true in
Mutex.lock mutex;
Fiber.set fiber ~forbid
in
match Awaitable.Awaiter.await awaiter with
| () -> lock_forbidden mutex
| exception exn ->
let bt = Printexc.get_raw_backtrace () in
match Trigger.await trigger with
| None -> lock_forbidden mutex
| Some exn_bt ->
Awaitable.Awaiter.remove awaiter;
lock_forbidden mutex;
Printexc.raise_with_backtrace exn bt
Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt)
let signal = Awaitable.signal
let broadcast = Awaitable.broadcast
Expand Down
2 changes: 1 addition & 1 deletion lib/picos_std.sync/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(public_name picos_std.sync)
(libraries
(re_export picos_std.event)
picos
picos_std.awaitable
backoff
multicore-magic))

Expand Down
1 change: 1 addition & 0 deletions lib/picos_std.sync/picos_std_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ module Lazy = Lazy
module Latch = Latch
module Ivar = Ivar
module Stream = Stream
module Stack = Stack
19 changes: 19 additions & 0 deletions lib/picos_std.sync/picos_std_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,25 @@ module Stream : sig
the [cursor] position. *)
end

module Stack : sig
(** *)

type !'a t
(** *)

val create : ?padded:bool -> ?capacity:int -> unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop_exn : 'a t -> 'a
(** *)
end

(** {1 Examples}
{2 A simple bounded queue}
Expand Down
60 changes: 60 additions & 0 deletions lib/picos_std.sync/stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
open Picos_std_awaitable

type 'a state =
| Nil of { mutable capacity : int }
| Cons of { mutable capacity : int; value : 'a; rest : 'a state }

type 'a t = 'a state Awaitable.t

exception Empty

let one = 0b10
let busy_bit = 0b01

let create ?padded ?capacity () =
let capacity =
match capacity with
| None -> Int.max_int land lnot busy_bit
| Some capacity -> capacity * one
in
Awaitable.make ?padded (Nil { capacity })

let rec push t value backoff =
match Awaitable.get t with
| Nil r as before ->
let capacity = r.capacity land lnot busy_bit in
if
Awaitable.compare_and_set t before
(Cons { capacity = capacity - one; value; rest = Nil { capacity } })
then begin
if r.capacity land busy_bit <> 0 then Awaitable.broadcast t
end
else push t value (Backoff.once backoff)
| Cons r as before ->
let capacity = r.capacity in
if one <= capacity then begin
if
not
(Awaitable.compare_and_set t before
(Cons { capacity = capacity - one; value; rest = before }))
then push t value (Backoff.once backoff)
end
else begin
if capacity <> capacity lor busy_bit then
r.capacity <- capacity lor busy_bit;
Awaitable.await t before;
push t value Backoff.default
end

let rec pop_exn t backoff =
match Awaitable.get t with
| Nil _ -> raise_notrace Empty
| Cons r as before ->
if Awaitable.compare_and_set t before r.rest then begin
if r.capacity land busy_bit <> 0 then Awaitable.broadcast t;
r.value
end
else pop_exn t (Backoff.once backoff)

let[@inline] push t value = push t value Backoff.default
let[@inline] pop_exn t = pop_exn t Backoff.default

0 comments on commit 2ab880c

Please sign in to comment.