Skip to content

Commit

Permalink
feature(fiber): add [Fiber.map_reduce_seq]
Browse files Browse the repository at this point in the history
Map reduce a sequence in parallel without intermediate data structures

<!-- ps-id: 3e833707-4b3b-41a0-bc11-5f363522d287 -->

Signed-off-by: Rudi Grinberg <[email protected]>
  • Loading branch information
rgrinberg committed Apr 4, 2024
1 parent 5e9bfce commit e12c6c2
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
15 changes: 15 additions & 0 deletions fiber/src/core.ml
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,21 @@ let rec sequential_iter_seq (seq : _ Seq.t) ~f =
sequential_iter_seq seq ~f
;;

let map_reduce_seq (seq : _ Seq.t) ~f ~empty ~commutative_combine k =
match seq () with
| Seq.Nil -> k empty
| Cons (x, xs) ->
let current = ref empty in
let running = ref 1 in
let f a =
f a (fun b ->
current := commutative_combine !current b;
decr running;
if !running = 0 then k !current else end_of_fiber)
in
nfork_seq running x xs f
;;

let parallel_iter_set
(type a s)
(module S : Set.S with type elt = a and type t = s)
Expand Down
19 changes: 13 additions & 6 deletions fiber/src/fiber.mli
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ end

val repeat_while : f:('a -> 'a option t) -> init:'a -> unit t

val map_reduce_seq
: 'a Seq.t
-> f:('a -> 'm t)
-> empty:'m
-> commutative_combine:('m -> 'm -> 'm)
-> 'm t

module Stream : sig
(** Destructive streams that can be composed to pipelines.
Expand Down Expand Up @@ -459,29 +466,29 @@ end
module Lazy : sig
(** An asynchronous computation which is executed once only when forced. *)
type 'a t

(** Create an already evaluated lazy computation. *)
val of_value : 'a -> 'a t

(** An already evaluated lazy computation of unit type (a more efficient shortcut for
[of_value ()]. *)
val unit : unit t

(** Create a lazy computation from a thunk which will only be executed when forced. *)
val create : (unit -> 'a fiber) -> 'a t

(** Check if a lazy computation has successfully finished. Note that this does not
force the computation and a [false] result does not guarantee that the computation
hasn't finished. *)
val is_value : 'a t -> bool

(** Force the lazy computation and return its result or reraise its exceptions. *)
val force : 'a t -> 'a fiber

(** Concurrently force multiple lazy computation and wait until they all finish,
reraising any exceptions. *)
val force_all_unit : unit t list -> unit fiber
end
end

module Expert : sig
(** This module offers no safety protections. It is only needed for maximizing
Expand Down
65 changes: 65 additions & 0 deletions fiber/test/map_reduce_tests.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
open Stdune
open Fiber.O

let printf = Printf.printf
let print_dyn dyn = Dyn.to_string dyn |> print_endline
let () = Printexc.record_backtrace false

module Scheduler = struct
let t = Test_scheduler.create ()
let yield () = Test_scheduler.yield t
let run f = Test_scheduler.run t f
end

let%expect_test "map_reduce_seq" =
let test =
let+ res =
Fiber.map_reduce_seq
(List.to_seq [ 1; 2; 3 ])
~f:(fun x ->
printfn "x: %d" x;
Fiber.return x)
~empty:0
~commutative_combine:( + )
in
printfn "final: %d" res
in
Scheduler.run test;
[%expect {|
x: 1
x: 2
x: 3
final: 6 |}];
let test =
let ivars = List.init 3 ~f:(fun _ -> Fiber.Ivar.create ()) in
Fiber.fork_and_join_unit
(fun () ->
let+ res =
Fiber.map_reduce_seq
(List.to_seq ivars)
~f:(fun ivar ->
let+ x = Fiber.Ivar.read ivar in
printfn "x: %d" x;
x)
~empty:0
~commutative_combine:( + )
in
printfn "final: %d" res)
(fun () ->
let i = ref 0 in
Fiber.parallel_iter ivars ~f:(fun ivar ->
incr i;
printfn "filling ivar %d" !i;
Fiber.Ivar.fill ivar !i))
in
Scheduler.run test;
[%expect
{|
filling ivar 1
filling ivar 2
filling ivar 3
x: 1
x: 2
x: 3
final: 6 |}]
;;

0 comments on commit e12c6c2

Please sign in to comment.