Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement queue using a new two stack representation #132

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 160 additions & 130 deletions src/kcas_data/queue.ml
Original file line number Diff line number Diff line change
@@ -1,169 +1,199 @@
open Kcas

type 'a t = {
front : 'a Elems.t Loc.t;
middle : 'a Elems.t Loc.t;
back : 'a Elems.t Loc.t;
}

let alloc ~front ~middle ~back =
(* We allocate locations in specific order to make most efficient use of the
splay-tree based transaction log. *)
let front = Loc.make ~padded:true front
and middle = Loc.make ~padded:true middle
and back = Loc.make ~padded:true back in
Multicore_magic.copy_as_padded { back; middle; front }

let create () = alloc ~front:Elems.empty ~middle:Elems.empty ~back:Elems.empty

let copy q =
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.middle, Xt.get ~xt q.back) in
let front, middle, back = Xt.commit { tx } in
alloc ~front ~middle ~back
module Elems = struct
type 'a t = { value : 'a; tl : 'a t; length : int }

module Xt = struct
let is_empty ~xt t =
(* We access locations in order of allocation to make most efficient use of
the splay-tree based transaction log. *)
Xt.get ~xt t.front == Elems.empty
&& Xt.get ~xt t.middle == Elems.empty
&& Xt.get ~xt t.back == Elems.empty

let length ~xt { back; middle; front } =
Elems.length (Xt.get ~xt front)
+ Elems.length (Xt.get ~xt middle)
+ Elems.length (Xt.get ~xt back)

let add ~xt x q = Xt.modify ~xt q.back @@ Elems.cons x
let push = add
let rec empty = { value = Obj.magic (); tl = empty; length = 0 }
let[@inline] length t = t.length lxor (t.length asr (Sys.int_size - 1))

(** Cooperative helper to move elems from back to middle. *)
let back_to_middle ~middle ~back =
let tx ~xt =
let xs = Xt.exchange ~xt back Elems.empty in
if xs == Elems.empty || Xt.exchange ~xt middle xs != Elems.empty then
raise_notrace Exit
in
try Xt.commit { tx } with Exit -> ()
let rec rev_append length t tl =
if length = 0 then tl
else rev_append (length - 1) t.tl { value = t.value; tl; length }

let take_opt_finish ~xt front elems =
let elems = Elems.rev elems in
Xt.set ~xt front (Elems.tl_safe elems);
Elems.hd_opt elems
let rec head i t = if i = -2 then t.value else head (i + 1) t.tl
let[@inline] head t = if t.length < 0 then head t.length t else t.value

let take_opt ~xt t =
let front = t.front in
let elems = Xt.update ~xt front Elems.tl_safe in
if elems != Elems.empty then Elems.hd_opt elems
let[@inline] tl t =
if -2 <= t.length then t.tl
else
let middle = t.middle and back = t.back in
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems else None
let length = lnot t.length - 1 in
rev_append (length - 1) t.tl { value = t.value; tl = empty; length }

let[@inline] peek t =
if -2 <= t.length then t
else
let length = lnot t.length in
rev_append (length - 1) t.tl { value = t.value; tl = empty; length }

let take_blocking ~xt q = Xt.to_blocking ~xt (take_opt q)
let rec prepend_to_seq t tl =
(* TODO: handle reverse! *)
if t == empty then tl
else fun () -> Seq.Cons (t.value, prepend_to_seq t.tl tl)
end

let peek_opt_finish ~xt front elems =
let elems = Elems.rev elems in
Xt.set ~xt front elems;
Elems.hd_opt elems
module Back = struct
type 'a t = { length : int; front : 'a; elems : 'a Elems.t }

let peek_opt ~xt t =
let front = t.front in
let elems = Xt.get ~xt front in
if elems != Elems.empty then Elems.hd_opt elems
let empty = { length = -1; front = Obj.magic (); elems = Elems.empty }
let[@inline] length t = lnot t.length

let[@inline] snoc x t =
let length = t.length in
if length = -1 then { length = length - 1; front = x; elems = Elems.empty }
else
let middle = t.middle and back = t.back in
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems
{
length = length - 1;
front = t.front;
elems = { value = x; tl = t.elems; length };
}

let rev_prepend_to_seq t tl =
let tl =
if t.length >= -2 then Elems.prepend_to_seq t.elems tl
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems else None
let t = ref (Either.Left t.elems) in
fun () ->
let t =
match !t with
| Left t' ->
(* This is parallelism safe as the result is always equivalent. *)
let t' = Elems.rev_append (lnot t'.length) t' Elems.empty in
t := Right t';
t'
| Right t' -> t'
in
Elems.prepend_to_seq t tl ()
in
if t.length <= -2 then fun () -> Seq.Cons (t.front, tl) else tl
end

type 'a t = { front : 'a Elems.t Loc.t; back : 'a Back.t Loc.t }

let alloc ~front ~back =
let front = Loc.make ~padded:true front in
let back = Loc.make ~padded:true back in
Multicore_magic.copy_as_padded { front; back }

let create () = alloc ~front:Elems.empty ~back:Back.empty

let peek_blocking ~xt q = Xt.to_blocking ~xt (peek_opt q)
let copy t =
let tx ~xt = (Xt.get ~xt t.front, Xt.get ~xt t.back) in
let front, back = Xt.commit { tx } in
alloc ~front ~back

module Xt = struct
let is_empty ~xt t =
Xt.get ~xt t.front == Elems.empty && Xt.get ~xt t.back == Back.empty

let length ~xt t =
Elems.length (Xt.get ~xt t.front) + Back.length (Xt.get ~xt t.back)

let add ~xt x t = Xt.modify ~xt t.back @@ Back.snoc x
let push = add

let peek_opt ~xt t =
let front = Xt.update ~xt t.front Elems.peek in
if front.length = 0 then
let back = Xt.get ~xt t.back in
if back.length = -1 then None else Some back.front
else Some (Elems.head front)

let peek_blocking ~xt t =
let front = Xt.update ~xt t.front Elems.peek in
if front.length = 0 then
let back = Xt.get ~xt t.back in
if back.length = -1 then Retry.later () else back.front
else Elems.head front

let take_opt ~xt t =
let front = Xt.update ~xt t.front Elems.tl in
if front.length = 0 then
let back = Xt.exchange ~xt t.back Back.empty in
if back.length = -1 then None
else begin
if back.length <> -2 then Xt.set ~xt t.front back.elems;
Some back.front
end
else Some (Elems.head front)

let take_blocking ~xt t =
let front = Xt.update ~xt t.front Elems.tl in
if front.length = 0 then
let back = Xt.exchange ~xt t.back Back.empty in
if back.length = -1 then Retry.later ()
else begin
if back.length <> -2 then Xt.set ~xt t.front back.elems;
back.front
end
else Elems.head front

let clear ~xt t =
Xt.set ~xt t.front Elems.empty;
Xt.set ~xt t.middle Elems.empty;
Xt.set ~xt t.back Elems.empty

let swap ~xt q1 q2 =
let front = Xt.get ~xt q1.front
and middle = Xt.get ~xt q1.middle
and back = Xt.get ~xt q1.back in
let front = Xt.exchange ~xt q2.front front
and middle = Xt.exchange ~xt q2.middle middle
and back = Xt.exchange ~xt q2.back back in
Xt.set ~xt q1.front front;
Xt.set ~xt q1.middle middle;
Xt.set ~xt q1.back back

let seq_of ~front ~middle ~back =
(* Sequence construction is lazy, so this function is O(1). *)
Seq.empty
|> Elems.rev_prepend_to_seq back
|> Elems.rev_prepend_to_seq middle
|> Elems.prepend_to_seq front
Xt.set ~xt t.back Back.empty

let swap ~xt t1 t2 =
let front = Xt.get ~xt t1.front and back = Xt.get ~xt t1.back in
let front = Xt.exchange ~xt t2.front front
and back = Xt.exchange ~xt t2.back back in
Xt.set ~xt t1.front front;
Xt.set ~xt t1.back back

let seq_of ~front ~back =
Seq.empty |> Back.rev_prepend_to_seq back |> Elems.prepend_to_seq front

let to_seq ~xt t =
let front = Xt.get ~xt t.front
and middle = Xt.get ~xt t.middle
and back = Xt.get ~xt t.back in
seq_of ~front ~middle ~back
let front = Xt.get ~xt t.front and back = Xt.get ~xt t.back in
seq_of ~front ~back

let take_all ~xt t =
let front = Xt.exchange ~xt t.front Elems.empty
and middle = Xt.exchange ~xt t.middle Elems.empty
and back = Xt.exchange ~xt t.back Elems.empty in
seq_of ~front ~middle ~back
and back = Xt.exchange ~xt t.back Back.empty in
seq_of ~front ~back
end

let is_empty q = Kcas.Xt.commit { tx = Xt.is_empty q }
let length q = Kcas.Xt.commit { tx = Xt.length q }
let is_empty t = Kcas.Xt.commit { tx = Xt.is_empty t }
let length t = Kcas.Xt.commit { tx = Xt.length t }

let add x q =
let add x t =
(* Fenceless is safe as we always update. *)
Loc.fenceless_modify q.back @@ Elems.cons x
Loc.fenceless_modify t.back @@ Back.snoc x

let push = add

let take_opt q =
let take_opt t =
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with
| None -> Kcas.Xt.commit { tx = Xt.take_opt q }
| some -> some
let front = Loc.fenceless_update t.front Elems.tl in
if front.length = 0 then Kcas.Xt.commit { tx = Xt.take_opt t }
else Some (Elems.head front)

let take_blocking ?timeoutf q =
let take_blocking ?timeoutf t =
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with
| None -> Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking q }
| Some elem -> elem
let front = Loc.fenceless_update t.front Elems.tl in
if front.length = 0 then Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking t }
else Elems.head front

let take_all q = Kcas.Xt.commit { tx = Xt.take_all q }

let peek_opt q =
match Loc.get q.front |> Elems.hd_opt with
| None -> Kcas.Xt.commit { tx = Xt.peek_opt q }
| some -> some
let peek_opt t =
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
let front = Loc.fenceless_update t.front Elems.peek in
if front.length = 0 then Kcas.Xt.commit { tx = Xt.peek_opt t }
else Some (Elems.head front)

let peek_blocking ?timeoutf q =
Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking q }
let peek_blocking ?timeoutf t =
(* Fenceless is safe as we revert to a transaction in case we didn't update. *)
let front = Loc.fenceless_update t.front Elems.peek in
if front.length = 0 then Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking t }
else Elems.head front

let clear q = Kcas.Xt.commit { tx = Xt.clear q }
let swap q1 q2 = Kcas.Xt.commit { tx = Xt.swap q1 q2 }
let to_seq q = Kcas.Xt.commit { tx = Xt.to_seq q }
let iter f q = Seq.iter f @@ to_seq q
let fold f a q = Seq.fold_left f a @@ to_seq q
let take_all t = Kcas.Xt.commit { tx = Xt.take_all t }
let clear t = Kcas.Xt.commit { tx = Xt.clear t }
let swap t1 t2 = Kcas.Xt.commit { tx = Xt.swap t1 t2 }
let to_seq t = Kcas.Xt.commit { tx = Xt.to_seq t }
let iter f t = Seq.iter f @@ to_seq t
let fold f a t = Seq.fold_left f a @@ to_seq t

exception Empty

let[@inline] of_option = function None -> raise Empty | Some value -> value
let peek s = peek_opt s |> of_option
let peek t = peek_opt t |> of_option
let top = peek
let take s = take_opt s |> of_option
let take t = take_opt t |> of_option
3 changes: 1 addition & 2 deletions test/kcas_data/queue_test_stm.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ module Spec = struct
[
Gen.int |> Gen.map (fun x -> Push x);
Gen.return Take_opt;
Gen.return Peek_opt;
Gen.return Length;
Gen.return Length |> Gen.map (fun _ -> Peek_opt);
]
|> Gen.oneof |> make ~print:show_cmd

Expand Down
Loading