Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Fiber.Lazy #36

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Unreleased

- Add [Fiber.Lazy]

- Rename `Fiber.Pool.stop` to `Fiber.Pool.close` (#13, @rgrinberg)

- Remove `Make_map_traversals` and introduce `Make_parallel_map`. The new
Expand Down
1 change: 1 addition & 0 deletions fiber/src/fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Svar = Svar
module Throttle = Throttle
module Mutex = Mutex
module Scheduler = Scheduler
module Lazy = Lazy

let run =
let rec loop ~iter (s : _ Scheduler.step) =
Expand Down
28 changes: 28 additions & 0 deletions fiber/src/fiber.mli
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,34 @@ module Cancel : sig
-> ('a * 'b outcome) fiber
end

(** {1 Lazy fibers} *)
module Lazy : sig
(** An asynchronous computation which is executed once only when forced. *)
type 'a t

(** Create an already evaluated lazy computation. *)
val of_value : 'a -> 'a t

(** An already evaluated lazy computation of unit type (a more efficient shortcut for
[of_value ()]. *)
val unit : unit t

(** Create a lazy computation from a thunk which will only be executed when forced. *)
val create : (unit -> 'a fiber) -> 'a t

(** Check if a lazy computation has successfully finished. Note that this does not
force the computation and a [false] result does not guarantee that the computation
hasn't finished. *)
val is_value : 'a t -> bool

(** Force the lazy computation and return its result or reraise its exceptions. *)
val force : 'a t -> 'a fiber

(** Concurrently force multiple lazy computation and wait until they all finish,
reraising any exceptions. *)
val force_all_unit : unit t list -> unit fiber
end

module Expert : sig
(** This module offers no safety protections. It is only needed for maximizing
performance in certain situations *)
Expand Down
104 changes: 104 additions & 0 deletions fiber/src/lazy.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
open! Stdune
open Core
open Core.O

(** State of a lazy computation. Note that for efficiency, we have a separate variant for
successes ([Done]). We don't have one for errors since it doesn't seem worth it; we
get those from the [Ivar] instead. *)
type 'a state =
| (* Finished successfully *)
Done of 'a
| (* Might still be running *)
Running of ('a, Exns.t) Result.t Ivar.t
| (* Hasn't been forced yet *)
Init of (unit -> 'a Core.t)

type 'a t = 'a state ref

let of_value x = ref (Done x)
let unit = ref (Done ())
let create f = ref (Init f)

let prep t =
let v = Ivar.create () in
t := Running v;
v
;;

let execute t v f =
let* r = collect_errors_appendable_list f in
(match r with
| Ok x -> t := Done x
| Error _ -> ());
Ivar.fill v r
;;

let read v =
let* r = Ivar.read v in
match r with
| Ok x -> return x
| Error exns ->
(* Subsequent computations will always force the appendable list into a proper list
and therefore not share this work. For now, this doesn't matter to us. *)
Exns.reraise_all exns
;;

let force t =
let* () = return () in
match !t with
| Done x -> return x
| Running v -> read v
| Init f ->
let v = prep t in
let* () = execute t v f in
read v
;;

let is_value t =
match !t with
| Done _ -> true
| Running _ | Init _ -> false
;;

let force_all_unit =
let stop () = end_of_fiber in
(* Fork all computations that haven't been forced yet. Note that this should be
substantially more efficient that [parallel_map ~f:force] since we ignore
computations which have already been forced. *)
let start ts =
sequential_iter ts ~f:(fun t ->
match !t with
| Done _ | Running _ -> return ()
| Init f ->
let v = prep t in
fork (fun () -> (execute t v f) stop))
in
(* Wait for all computations, collecting all exceptions. *)
(* CR-someday rgrinberg: use [Appendable.t] for [acc] rather than [Appendable.t option]. *)
let rec collect acc = function
| [] -> return acc
| t :: ts ->
(match !t with
| Done _ -> collect acc ts
| Running v ->
let* r = Ivar.read v in
(match r with
| Ok _ -> collect acc ts
| Error exns ->
let exns =
match acc with
| None -> exns
| Some exns' -> Exns.combine exns exns'
in
collect (Some exns) ts)
| Init _ ->
(* We forked all computations previously so this should be impossible. *)
assert false)
in
fun ts ->
let* () = start ts in
let* r = collect None ts in
match r with
| None -> return ()
| Some exns -> Exns.reraise_all exns
;;
Loading