Skip to content

Commit

Permalink
Merge pull request #27 from tmcgilchrist/ocaml_5_port
Browse files Browse the repository at this point in the history
Add Michael Scott queue and Treiber stack
  • Loading branch information
lyrm authored Dec 8, 2022
2 parents 4f7dfa4 + 08cbcf6 commit bf8f500
Show file tree
Hide file tree
Showing 21 changed files with 1,258 additions and 26 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

A collection of Concurrent Lockfree Data Structures for OCaml 5. It contains:

* [Treiber Stack](src/treiber_stack.mli) A classic multi-producer multi-consumer stack, robust and flexible. Recommended starting point when needing LIFO structure.

* [Michael-Scott Queue](src/michael_scott_queue.mli) A classic multi-producer multi-consumer queue, robust and flexible. Recommended starting point when needing FIFO structure. It is based on [Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms](https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).

* [Chase-Lev Work-Stealing Deque](src/ws_deque.mli) Single-producer, multi-consumer dynamic-size double-ended queue (deque) (see [Dynamic circular work-stealing deque](https://dl.acm.org/doi/10.1145/1073970.1073974) and [Correct and efficient work-stealing for weak memory models](https://dl.acm.org/doi/abs/10.1145/2442516.2442524)). Ideal for throughput-focused scheduling using per-core work distribution. Note, [pop] and [steal] follow different ordering (respectively LIFO and FIFO) and have different linearization contraints.

* [SPSC Queue](src/spsc_queue.mli) Simple single-producer single-consumer fixed-size queue. Thread-safe as long as at most one thread acts as producer and at most one as consumer at any single point in time.
Expand All @@ -11,7 +15,6 @@ A collection of Concurrent Lockfree Data Structures for OCaml 5. It contains:

* [MPSC Queue](src/mpsc_queue.mli) A multi-producer, single-consumer, thread-safe queue without support for cancellation. This makes a good data structure for a scheduler's run queue. It is used in [Eio](https://github.com/ocaml-multicore/eio). It is a single consumer version of the queue described in [Implementing lock-free queues](https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf).


## Usage

lockfree cam be installed from `opam`: `opam install lockfree`. Sample usage of
Expand Down
120 changes: 120 additions & 0 deletions bench/backoff.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
let item_count = 3_000_000

type 'a t = { value : 'a; next : 'a t option Atomic.t }

let empty () = { value = Obj.magic (); next = Atomic.make None }

let push ~backoff_once t value =
let b = Lockfree.Backoff.create () in
let new_head = ({ value; next = Atomic.make None } : 'a t) in
let rec push_f () =
let head = Atomic.get t.next in
Atomic.set new_head.next head;
if Atomic.compare_and_set t.next head (Some new_head) then ()
else (
backoff_once b;
push_f ())
in
push_f ()

let rec pop ?min_wait ~backoff_once t =
let b = Lockfree.Backoff.create ?min_wait () in
let head = Atomic.get t.next in
match head with
| None -> None
| Some node ->
if Atomic.compare_and_set t.next head (Atomic.get node.next) then
Some node.value
else (
backoff_once b;
pop ~backoff_once t)

let run_basic ~backoff_once () =
let stack = empty () in
let pusher =
Domain.spawn (fun () ->
let start_time = Unix.gettimeofday () in
for i = 1 to item_count do
push ~backoff_once stack i
done;
start_time)
in
for _ = 1 to item_count do
while Option.is_none (pop ~backoff_once stack) do
()
done
done;
let end_time = Unix.gettimeofday () in
let start_time = Domain.join pusher in
let time_diff = end_time -. start_time in
time_diff

let run_artificial ~backoff_once () =
let threads = 6 in
let stack = empty () in

(* prepare stack *)
for i = 1 to item_count do
push ~backoff_once stack i
done;

(* *)
let counter = Atomic.make 0 in
let domains =
List.init threads (fun _ ->
Domain.spawn (fun () ->
Atomic.incr counter;
(* wait for all ready *)
while Atomic.get counter <= threads do
()
done;

(* bench !*)
while Option.is_some (pop ~min_wait:100 ~backoff_once stack) do
()
done;

Unix.gettimeofday ()))
in

(* wait for all domains to start *)
while Atomic.get counter < threads do
()
done;
let start_time = Unix.gettimeofday () in

(* let them run! *)
Atomic.incr counter;

(* wait for finish *)
let end_time =
List.map Domain.join domains |> List.fold_left Float.min Float.max_float
in
let time_diff = end_time -. start_time in
time_diff

let bench ~run_type ~with_backoff () =
let backoff_once =
if with_backoff then Lockfree.Backoff.once
else fun (_ : Lockfree.Backoff.t) -> ()
in
let results = ref [] in
let run =
match run_type with `Artificial -> run_artificial | `Basic -> run_basic
in
for i = 1 to 10 do
let time = run ~backoff_once () in
if i > 1 then results := time :: !results
done;
let results = List.sort Float.compare !results in
let median_time = List.nth results 4 in
let median_throughput = Float.of_int item_count /. median_time in
let name =
Printf.sprintf "backoff-%s-%s"
(if with_backoff then "on" else "off")
(match run_type with `Artificial -> "artificial" | `Basic -> "basic")
in
Benchmark_result.create_generic ~median_time ~median_throughput name

let bench_artificial = bench ~run_type:`Artificial
let bench_basic = bench ~run_type:`Basic
2 changes: 2 additions & 0 deletions bench/backoff.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
val bench_artificial : with_backoff:bool -> unit -> Benchmark_result.t
val bench_basic : with_backoff:bool -> unit -> Benchmark_result.t
26 changes: 2 additions & 24 deletions bench/bench_spsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,6 @@ let run () =
let time_diff = end_time -. start_time in
time_diff

let create_output median_time median_throughput =
let time =
({
name = "time";
value = `Numeric median_time;
units = "s";
description = "median time result";
}
: Benchmark_result.Metric.t)
in
let throughput =
({
name = "throughput";
value = `Numeric median_throughput;
units = "item/s";
description = "median throughput result";
}
: Benchmark_result.Metric.t)
in
let metrics = [ time; throughput ] in
({ name = "spsc-queue"; metrics } : Benchmark_result.t)

let bench () =
let results = ref [] in
for i = 1 to 10 do
Expand All @@ -55,5 +33,5 @@ let bench () =
done;
let results = List.sort Float.compare !results in
let median_time = List.nth results 4 in
let throughput = Float.of_int item_count /. median_time in
create_output median_time throughput
let median_throughput = Float.of_int item_count /. median_time in
Benchmark_result.create_generic ~median_time ~median_throughput "spsc-queue"
28 changes: 28 additions & 0 deletions bench/benchmark_result.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,31 @@ type t = { name : string; metrics : Metric.t list }
let to_json { name; metrics } =
let metrics = List.map Metric.to_json metrics |> String.concat ", " in
Printf.sprintf {| {"name": "%s", "metrics": [%s]} |} name metrics

let create_generic ?median_time ?median_throughput name =
let time =
Option.map
(fun median_time : Metric.t ->
{
name = "time";
value = `Numeric median_time;
units = "s";
description = "median time result";
})
median_time
in
let throughput =
Option.map
(fun median_throughput : Metric.t ->
{
name = "throughput";
value = `Numeric median_throughput;
units = "item/s";
description = "median throughput result";
})
median_throughput
in
let metrics = [ time; throughput ] |> List.filter_map (fun v -> v) in
if metrics = [] then
failwith "Benchmark_result.create: require at least one metric";
({ name; metrics } : t)
3 changes: 3 additions & 0 deletions bench/benchmark_result.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ end
type t = { name : String.t; metrics : Metric.t list }

val to_json : t -> string

val create_generic :
?median_time:float -> ?median_throughput:float -> string -> t
10 changes: 10 additions & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
let backoff_benchmarks =
let open Backoff in
[
bench_basic ~with_backoff:true;
bench_basic ~with_backoff:false;
bench_artificial ~with_backoff:true;
bench_artificial ~with_backoff:false;
]

let benchmark_list =
[
Bench_spsc_queue.bench;
Expand All @@ -8,6 +17,7 @@ let benchmark_list =
Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8;
Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1;
]
@ backoff_benchmarks

let () =
let results =
Expand Down
17 changes: 17 additions & 0 deletions src/backoff.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
type t = { min_wait : int; max_wait : int; current : int ref }

let k = Domain.DLS.new_key Random.State.make_self_init

let create ?(min_wait = 17) ?(max_wait = 32 * 4096) () =
{ max_wait; min_wait; current = ref min_wait }

let once { max_wait; current; _ } =
let t = Random.State.int (Domain.DLS.get k) !current in
current := min (2 * !current) max_wait;
if t = 0 then ()
else
for _ = 1 to t do
Domain.cpu_relax ()
done

let reset { min_wait; current; _ } = current := min_wait
24 changes: 24 additions & 0 deletions src/backoff.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(** Truncated exponential backoff.
Generally, a backoff mechanism adjusts time waited between retries to ensure the retries
will not put too much pressure on some underlying system. This particular implementation
is built for reducing contention in lockfree algorithms.
Under the hood, it uses relevant pause instruction to avoid adverse
microarchitectural effects present in naive busy-looping.
*)

type t
(** [t] type of a backoff object. *)

val create : ?min_wait:int -> ?max_wait:int -> unit -> t
(** [create] creates a new instance of backoff. [max_wait], [min_wait] override
the upper and lower bound on the number of spins executed by [once]. *)

val once : t -> unit
(** [once] executes one wait, whose length increases for every consecutive attempt
(until [max] is reached). *)

val reset : t -> unit
(** [reset] resets the attempt counter in [t]. *)
4 changes: 3 additions & 1 deletion src/lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
########
*)

module Ws_deque = Ws_deque
module Spsc_queue = Spsc_queue
module Mpsc_queue = Mpsc_queue
module Treiber_stack = Treiber_stack
module Michael_scott_queue = Michael_scott_queue
module Backoff = Backoff
module Mpmc_relaxed_queue = Mpmc_relaxed_queue
3 changes: 3 additions & 0 deletions src/lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
module Ws_deque = Ws_deque
module Spsc_queue = Spsc_queue
module Mpsc_queue = Mpsc_queue
module Treiber_stack = Treiber_stack
module Michael_scott_queue = Michael_scott_queue
module Mpmc_relaxed_queue = Mpmc_relaxed_queue
module Backoff = Backoff
96 changes: 96 additions & 0 deletions src/michael_scott_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
(*
* Copyright (c) 2015, Théo Laurent <[email protected]>
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

(* Michael-Scott queue *)

type 'a node = Nil | Next of 'a * 'a node Atomic.t
type 'a t = { head : 'a node Atomic.t; tail : 'a node Atomic.t }

let create () =
let head = Next (Obj.magic (), Atomic.make Nil) in
{ head = Atomic.make head; tail = Atomic.make head }

let is_empty q =
match Atomic.get q.head with
| Nil -> failwith "MSQueue.is_empty: impossible"
| Next (_, x) -> ( match Atomic.get x with Nil -> true | _ -> false)

let pop q =
let b = Backoff.create () in
let rec loop () =
let s = Atomic.get q.head in
let nhead =
match s with
| Nil -> failwith "MSQueue.pop: impossible"
| Next (_, x) -> Atomic.get x
in
match nhead with
| Nil -> None
| Next (v, _) when Atomic.compare_and_set q.head s nhead -> Some v
| _ ->
Backoff.once b;
loop ()
in
loop ()

let push q v =
let rec find_tail_and_enq curr_end node =
if Atomic.compare_and_set curr_end Nil node then ()
else
match Atomic.get curr_end with
| Nil -> find_tail_and_enq curr_end node
| Next (_, n) -> find_tail_and_enq n node
in
let newnode = Next (v, Atomic.make Nil) in
let tail = Atomic.get q.tail in
match tail with
| Nil -> failwith "HW_MSQueue.push: impossible"
| Next (_, n) ->
find_tail_and_enq n newnode;
ignore (Atomic.compare_and_set q.tail tail newnode)

let clean_until q f =
let b = Backoff.create () in
let rec loop () =
let s = Atomic.get q.head in
let nhead =
match s with
| Nil -> failwith "MSQueue.pop: impossible"
| Next (_, x) -> Atomic.get x
in
match nhead with
| Nil -> ()
| Next (v, _) ->
if not (f v) then
if Atomic.compare_and_set q.head s nhead then (
Backoff.reset b;
loop ())
else (
Backoff.once b;
loop ())
else ()
in
loop ()

type 'a cursor = 'a node

let snapshot q =
match Atomic.get q.head with
| Nil -> failwith "MSQueue.snapshot: impossible"
| Next (_, n) -> Atomic.get n

let next c = match c with Nil -> None | Next (a, n) -> Some (a, Atomic.get n)
Loading

0 comments on commit bf8f500

Please sign in to comment.