diff --git a/lib/picos/picos.ocaml5.ml b/lib/picos/picos.ocaml5.ml index 970f8eb9..1dd78998 100644 --- a/lib/picos/picos.ocaml5.ml +++ b/lib/picos/picos.ocaml5.ml @@ -219,7 +219,7 @@ module Computation = struct let trigger = Trigger.from_action x y action in Atomic.make (S (Continue { balance_and_mode; triggers = [ trigger ] })) - let is_canceled t = + let[@inline] is_canceled t = match Atomic.get t with | S (Canceled { tx; _ }) -> tx == Stopped | S (Returned _) | S (Continue _) -> false @@ -460,13 +460,13 @@ module Fiber = struct let has_forbidden (Fiber r : t) = r.forbid - let is_canceled (Fiber r : t) = + let[@inline] is_canceled (Fiber r : t) = (not r.forbid) && let (Packed computation) = r.packed in Computation.is_canceled computation - let canceled (Fiber r : t) = + let[@inline] canceled (Fiber r : t) = if r.forbid then None else let (Packed computation) = r.packed in diff --git a/lib/picos_mux.fifo/picos_mux_fifo.ml b/lib/picos_mux.fifo/picos_mux_fifo.ml index 49575358..0c77b191 100644 --- a/lib/picos_mux.fifo/picos_mux_fifo.ml +++ b/lib/picos_mux.fifo/picos_mux_fifo.ml @@ -1,6 +1,6 @@ open Picos -let[@inline never] quota_non_positive () = invalid_arg "quota must be positive" +let[@inline never] quota_non_positive _ = invalid_arg "quota must be positive" type ready = | Spawn of Fiber.t * (Fiber.t -> unit) @@ -17,16 +17,16 @@ type t = { needs_wakeup : bool Atomic.t; mutex : Mutex.t; condition : Condition.t; - resume : + mutable resume : Trigger.t -> Fiber.t -> ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation -> unit; - current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option; - yield : ((unit, unit) Effect.Deep.continuation -> unit) option; - return : ((unit, unit) Effect.Deep.continuation -> unit) option; - discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option; - handler : (unit, unit) Effect.Deep.handler; + mutable current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option; + mutable yield : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable handler : (unit, unit) Effect.Deep.handler; quota : int; mutable fiber : Fiber.Maybe.t; mutable remaining_quota : int; @@ -67,48 +67,112 @@ let rec next t = next t end -let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = - let quota = - match quota with - | None -> Int.max_int - | Some quota -> - if quota <= 0 then quota_non_positive (); - quota - in +let run_fiber ?quota ?fatal_exn_handler fiber main = Select.check_configured (); - let ready = Mpscq.create ~padded:true () - and needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded - and mutex = Mutex.create () - and condition = Condition.create () in - let rec t = + let t = + let quota = + match quota with + | None -> Int.max_int + | Some quota -> if quota <= 0 then quota_non_positive quota else quota + in { - ready; - fiber = Fiber.Maybe.of_fiber fiber; - needs_wakeup; - mutex; - condition; - resume; - current; - yield; - return; - discontinue; - handler; + ready = Mpscq.create ~padded:true (); + needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded; + mutex = Mutex.create (); + condition = Condition.create (); + resume = Obj.magic (); + current = Obj.magic (); + yield = Obj.magic (); + return = Obj.magic (); + discontinue = Obj.magic (); + handler = Obj.magic (); quota; + fiber = Fiber.Maybe.of_fiber fiber; remaining_quota = quota; num_alive_fibers = 1; } - and current = + in + t.handler <- + { + exnc = (match fatal_exn_handler with None -> raise | Some exnc -> exnc); + effc = + (fun (type a) (e : a Effect.t) : + ((a, _) Effect.Deep.continuation -> _) option -> + match e with + | Fiber.Current -> t.current + | Fiber.Spawn r -> + let fiber = Fiber.Maybe.to_fiber t.fiber in + if Fiber.is_canceled fiber then t.discontinue + else begin + t.num_alive_fibers <- t.num_alive_fibers + 1; + Mpscq.push t.ready (Spawn (r.fiber, r.main)); + t.return + end + | Fiber.Yield -> t.yield + | Computation.Cancel_after r -> begin + let fiber = Fiber.Maybe.to_fiber t.fiber in + if Fiber.is_canceled fiber then t.discontinue + else + match + Select.cancel_after r.computation ~seconds:r.seconds r.exn + r.bt + with + | () -> t.return + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + Some + (fun k -> Effect.Deep.discontinue_with_backtrace k exn bt) + end + | Trigger.Await trigger -> + Some + (fun k -> + let fiber = Fiber.Maybe.to_fiber t.fiber in + if Fiber.try_suspend fiber trigger fiber k t.resume then + next t + else + let remaining_quota = t.remaining_quota - 1 in + if 0 < remaining_quota then begin + t.remaining_quota <- remaining_quota; + Fiber.resume fiber k + end + else begin + Mpscq.push t.ready (Resume (fiber, k)); + next t + end) + | _ -> None); + retc = + (fun () -> + t.num_alive_fibers <- t.num_alive_fibers - 1; + next t); + }; + t.resume <- + (fun trigger fiber k -> + let resume = Resume (fiber, k) in + if Fiber.unsuspend fiber trigger then Mpscq.push t.ready resume + else Mpscq.push_head t.ready resume; + if + Atomic.get t.needs_wakeup + && Atomic.compare_and_set t.needs_wakeup true false + then begin + begin + match Mutex.lock t.mutex with + | () -> Mutex.unlock t.mutex + | exception Sys_error _ -> () + end; + Condition.broadcast t.condition + end); + t.current <- Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in - Effect.Deep.continue k fiber) - and yield = + Effect.Deep.continue k fiber); + t.yield <- Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in Mpscq.push t.ready (Continue (fiber, k)); - next t) - and return = + next t); + t.return <- Some (fun k -> let remaining_quota = t.remaining_quota - 1 in @@ -119,78 +183,21 @@ let run_fiber ?quota ?fatal_exn_handler:(exnc : _ = raise) fiber main = else begin Mpscq.push t.ready (Return (Fiber.Maybe.to_fiber t.fiber, k)); next t - end) - and discontinue = + end); + t.discontinue <- Some (fun k -> let fiber = Fiber.Maybe.to_fiber t.fiber in - Fiber.continue fiber k ()) - and handler = { retc; exnc; effc } - and[@alert "-handler"] effc : - type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = - function - | Fiber.Current -> t.current - | Fiber.Spawn r -> - let fiber = Fiber.Maybe.to_fiber t.fiber in - if Fiber.is_canceled fiber then t.discontinue - else begin - t.num_alive_fibers <- t.num_alive_fibers + 1; - Mpscq.push t.ready (Spawn (r.fiber, r.main)); - t.return - end - | Fiber.Yield -> t.yield - | Computation.Cancel_after r -> begin - let fiber = Fiber.Maybe.to_fiber t.fiber in - if Fiber.is_canceled fiber then t.discontinue - else - match - Select.cancel_after r.computation ~seconds:r.seconds r.exn r.bt - with - | () -> t.return - | exception exn -> - let bt = Printexc.get_raw_backtrace () in - Some (fun k -> Effect.Deep.discontinue_with_backtrace k exn bt) - end - | Trigger.Await trigger -> - Some - (fun k -> - let fiber = Fiber.Maybe.to_fiber t.fiber in - if Fiber.try_suspend fiber trigger fiber k t.resume then next t - else - let remaining_quota = t.remaining_quota - 1 in - if 0 < remaining_quota then begin - t.remaining_quota <- remaining_quota; - Fiber.resume fiber k - end - else begin - Mpscq.push t.ready (Resume (fiber, k)); - next t - end) - | _ -> None - and retc () = - t.num_alive_fibers <- t.num_alive_fibers - 1; - next t - and resume trigger fiber k = - let resume = Resume (fiber, k) in - if Fiber.unsuspend fiber trigger then Mpscq.push t.ready resume - else Mpscq.push_head t.ready resume; - if - Atomic.get t.needs_wakeup - && Atomic.compare_and_set t.needs_wakeup true false - then begin - begin - match Mutex.lock t.mutex with - | () -> Mutex.unlock t.mutex - | exception Sys_error _ -> () - end; - Condition.broadcast t.condition - end - in + Fiber.continue fiber k ()); Effect.Deep.match_with main fiber t.handler -let run ?quota ?fatal_exn_handler ?(forbid = false) main = +let[@inline never] run ?quota ?fatal_exn_handler fiber main computation = + run_fiber ?quota ?fatal_exn_handler fiber main; + Computation.peek_exn computation + +let run ?quota ?fatal_exn_handler ?forbid main = + let forbid = match forbid with None -> false | Some forbid -> forbid in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in let main _ = Computation.capture computation main () in - run_fiber ?quota ?fatal_exn_handler fiber main; - Computation.peek_exn computation + run ?quota ?fatal_exn_handler fiber main computation diff --git a/lib/picos_mux.multififo/picos_mux_multififo.ml b/lib/picos_mux.multififo/picos_mux_multififo.ml index 1b87224e..141ff9d9 100644 --- a/lib/picos_mux.multififo/picos_mux_multififo.ml +++ b/lib/picos_mux.multififo/picos_mux_multififo.ml @@ -1,8 +1,8 @@ open Picos -let[@inline never] quota_non_positive () = invalid_arg "quota must be positive" +let[@inline never] quota_non_positive _ = invalid_arg "quota must be positive" let[@inline never] already_running () = invalid_arg "already running" -let[@inline never] not_worker () = invalid_arg "not a worker thread" +let[@inline never] not_worker _ = invalid_arg "not a worker thread" module Mpmcq = Picos_aux_mpmcq @@ -31,13 +31,14 @@ and _ tdt = | Nothing : [> `Nothing ] tdt | Per_thread : { ready : ready Mpmcq.t; - resume : + mutable resume : Trigger.t -> Fiber.t -> ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation -> unit; - return : ((unit, unit) Effect.Deep.continuation -> unit) option; - discontinue : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable discontinue : + ((unit, unit) Effect.Deep.continuation -> unit) option; context : t; mutable index : int; mutable num_started : int; @@ -53,12 +54,12 @@ let per_thread_key = Picos_thread.TLS.create () let[@inline] get_per_thread () : per_thread = match Picos_thread.TLS.get_exn per_thread_key with - | Nothing -> not_worker () + | Nothing as any -> not_worker any | Per_thread _ as pt -> pt let get_thread t i : per_thread = match Array.unsafe_get t.threads i with - | Nothing -> not_worker () + | Nothing as any -> not_worker any | Per_thread _ as pt -> pt let any_fibers_alive t = @@ -181,13 +182,13 @@ let default_fatal_exn_handler exn = let per_thread context = let ready = Mpmcq.create ~padded:true () in - let rec pt : per_thread = + let (Per_thread p as pt : per_thread) = Per_thread { ready; - resume; - return; - discontinue; + resume = Obj.magic (); + return = Obj.magic (); + discontinue = Obj.magic (); context; index = 0; num_started = 0; @@ -195,33 +196,37 @@ let per_thread context = fiber = Fiber.Maybe.nothing; remaining_quota = 0; } - and resume trigger fiber k = - let resume = Resume (fiber, k) in - let (Per_thread p_original) = pt in - match Picos_thread.TLS.get_exn per_thread_key with - | Per_thread p_current when p_original.context == p_current.context -> - (* We are running on a thread of this scheduler *) - if Fiber.unsuspend fiber trigger then Mpmcq.push p_current.ready resume - else Mpmcq.push_head p_current.ready resume; - relaxed_wakeup p_current.context ~known_not_empty:true p_current.ready - | _ | (exception Picos_thread.TLS.Not_set) -> - (* We are running on a foreign thread *) - if Fiber.unsuspend fiber trigger then Mpmcq.push p_original.ready resume - else Mpmcq.push_head p_original.ready resume; - let t = p_original.context in - let non_zero = - match Mutex.lock t.mutex with - | () -> - let non_zero = t.num_waiters_non_zero in - Mutex.unlock t.mutex; - non_zero - | exception Sys_error _ -> false - in - if non_zero then Condition.signal t.condition - and return = + in + p.resume <- + (fun trigger fiber k -> + let resume = Resume (fiber, k) in + let (Per_thread p_original) = (pt : per_thread) in + match Picos_thread.TLS.get_exn per_thread_key with + | Per_thread p_current when p_original.context == p_current.context -> + (* We are running on a thread of this scheduler *) + if Fiber.unsuspend fiber trigger then + Mpmcq.push p_current.ready resume + else Mpmcq.push_head p_current.ready resume; + relaxed_wakeup p_current.context ~known_not_empty:true p_current.ready + | _ | (exception Picos_thread.TLS.Not_set) -> + (* We are running on a foreign thread *) + if Fiber.unsuspend fiber trigger then + Mpmcq.push p_original.ready resume + else Mpmcq.push_head p_original.ready resume; + let t = p_original.context in + let non_zero = + match Mutex.lock t.mutex with + | () -> + let non_zero = t.num_waiters_non_zero in + Mutex.unlock t.mutex; + non_zero + | exception Sys_error _ -> false + in + if non_zero then Condition.signal t.condition); + p.return <- Some (fun k -> - let (Per_thread p) = pt in + let (Per_thread p) = (pt : per_thread) in let remaining_quota = p.remaining_quota - 1 in if 0 < remaining_quota then begin p.remaining_quota <- remaining_quota; @@ -230,15 +235,29 @@ let per_thread context = else begin Mpmcq.push p.ready (Return (Fiber.Maybe.to_fiber p.fiber, k)); next pt - end) - and discontinue = + end); + p.discontinue <- Some (fun k -> - let (Per_thread p) = pt in + let (Per_thread p) = (pt : per_thread) in let fiber = Fiber.Maybe.to_fiber p.fiber in - Fiber.continue fiber k ()) - in - pt + Fiber.continue fiber k ()); + (pt : per_thread) + +let[@inline never] returned value old_p = + (* TODO: maybe remove from [t.threads]? *) + Picos_thread.TLS.set per_thread_key old_p; + value + +let[@inline never] raised exn old_p = + let bt = Printexc.get_raw_backtrace () in + Picos_thread.TLS.set per_thread_key old_p; + Printexc.raise_with_backtrace exn bt + +let[@inline never] with_per_thread new_pt fn old_p = + match fn (new_pt : per_thread) with + | value -> returned value old_p + | exception exn -> raised exn old_p let with_per_thread t fn = let (Per_thread new_p as new_pt) = per_thread t in @@ -269,14 +288,7 @@ let with_per_thread t fn = with Picos_thread.TLS.Not_set -> Nothing in Picos_thread.TLS.set per_thread_key new_pt; - match fn (new_pt : per_thread) with - | value -> - (* TODO: maybe remove from [t.threads]? *) - Picos_thread.TLS.set per_thread_key old_p; - value - | exception exn -> - Picos_thread.TLS.set per_thread_key old_p; - raise exn + with_per_thread new_pt fn old_p let current = Some @@ -351,9 +363,7 @@ let context ?quota ?fatal_exn_handler () = let quota = match quota with | None -> Int.max_int - | Some quota -> - if quota <= 0 then quota_non_positive (); - quota + | Some quota -> if quota <= 0 then quota_non_positive quota else quota in let exnc = match fatal_exn_handler with @@ -368,21 +378,18 @@ let context ?quota ?fatal_exn_handler () = and condition = Condition.create () and num_waiters = ref 0 |> Multicore_magic.copy_as_padded and num_started = Atomic.make 0 |> Multicore_magic.copy_as_padded in - let rec context = - { - num_waiters_non_zero = false; - num_waiters; - num_started; - mutex; - condition; - handler; - quota; - run = false; - threads = Array.make 15 Nothing; - threads_num = 0; - } - and handler = { retc; exnc; effc } in - context + { + num_waiters_non_zero = false; + num_waiters; + num_started; + mutex; + condition; + handler = { retc; exnc; effc }; + quota; + run = false; + threads = Array.make 15 Nothing; + threads_num = 0; + } let runner_on_this_thread t = Select.check_configured (); @@ -398,18 +405,22 @@ let run_fiber ?context:t_opt fiber main = end else begin t.run <- true; - Mutex.unlock t.mutex - end; - p.remaining_quota <- t.quota; - p.fiber <- Fiber.Maybe.of_fiber fiber; - Effect.Deep.match_with main fiber t.handler + Mutex.unlock t.mutex; + p.remaining_quota <- t.quota; + p.fiber <- Fiber.Maybe.of_fiber fiber; + Effect.Deep.match_with main fiber t.handler + end + +let[@inline never] run ?context fiber main computation = + run_fiber ?context fiber main; + Computation.peek_exn computation -let run ?context ?(forbid = false) main = +let run ?context ?forbid main = + let forbid = match forbid with None -> false | Some forbid -> forbid in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in let main _ = Computation.capture computation main () in - run_fiber ?context fiber main; - Computation.peek_exn computation + run ?context fiber main computation let rec run_fiber_on n fiber main runner_main context = if n <= 1 then run_fiber ~context fiber main @@ -455,9 +466,14 @@ let run_fiber_on ?quota ?fatal_exn_handler ~n_domains fiber main = in run_fiber_on n_domains fiber main runner_main context -let run_on ?quota ?fatal_exn_handler ~n_domains ?(forbid = false) main = +let[@inline never] run_on ?quota ?fatal_exn_handler ~n_domains fiber main + computation = + run_fiber_on ?quota ?fatal_exn_handler ~n_domains fiber main; + Computation.peek_exn computation + +let run_on ?quota ?fatal_exn_handler ~n_domains ?forbid main = + let forbid = match forbid with None -> false | Some forbid -> forbid in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in let main _ = Computation.capture computation main () in - run_fiber_on ?quota ?fatal_exn_handler ~n_domains fiber main; - Computation.peek_exn computation + run_on ?quota ?fatal_exn_handler ~n_domains fiber main computation diff --git a/lib/picos_mux.random/picos_mux_random.ml b/lib/picos_mux.random/picos_mux_random.ml index 71827cc5..9948b416 100644 --- a/lib/picos_mux.random/picos_mux_random.ml +++ b/lib/picos_mux.random/picos_mux_random.ml @@ -43,7 +43,7 @@ type t = { ready : ready Collection.t; mutable num_waiters_non_zero : bool; num_alive_fibers : int Atomic.t; - resume : + mutable resume : Trigger.t -> Fiber.t -> ((exn * Printexc.raw_backtrace) option, unit) Effect.Deep.continuation -> @@ -51,10 +51,10 @@ type t = { num_waiters : int ref; condition : Condition.t; mutex : Mutex.t; - current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option; - yield : ((unit, unit) Effect.Deep.continuation -> unit) option; - return : ((unit, unit) Effect.Deep.continuation -> unit) option; - handler : (unit, unit) Effect.Deep.handler; + mutable current : ((Fiber.t, unit) Effect.Deep.continuation -> unit) option; + mutable yield : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable return : ((unit, unit) Effect.Deep.continuation -> unit) option; + mutable handler : (unit, unit) Effect.Deep.handler; mutable run : bool; } @@ -159,103 +159,111 @@ let context ?fatal_exn_handler () = handler exn; raise exn in - let rec t = + let t = { ready = Collection.create (); num_waiters_non_zero = false; num_alive_fibers = Atomic.make 1 |> Multicore_magic.copy_as_padded; - resume; + resume = Obj.magic (); num_waiters = ref 0 |> Multicore_magic.copy_as_padded; condition = Condition.create (); mutex = Mutex.create (); - current; - yield; - return; - handler; + current = Obj.magic (); + yield = Obj.magic (); + return = Obj.magic (); + handler = Obj.magic (); run = false; } - and resume trigger fiber k = - let resume = Resume (fiber, k) in - Fiber.unsuspend fiber trigger |> ignore; - Collection.push t.ready resume; - let non_zero = - match Mutex.lock t.mutex with - | () -> - let non_zero = t.num_waiters_non_zero in - Mutex.unlock t.mutex; - non_zero - | exception Sys_error _ -> false - in - if non_zero then Condition.signal t.condition - and current = + in + t.resume <- + (fun trigger fiber k -> + let resume = Resume (fiber, k) in + Fiber.unsuspend fiber trigger |> ignore; + Collection.push t.ready resume; + let non_zero = + match Mutex.lock t.mutex with + | () -> + let non_zero = t.num_waiters_non_zero in + Mutex.unlock t.mutex; + non_zero + | exception Sys_error _ -> false + in + if non_zero then Condition.signal t.condition); + t.current <- Some (fun k -> let p = Picos_thread.TLS.get_exn fiber_key in let fiber = Fiber.Maybe.to_fiber !p in Collection.push t.ready (Current (fiber, k)); - next p t) - and yield = + next p t); + t.yield <- Some (fun k -> let p = Picos_thread.TLS.get_exn fiber_key in let fiber = Fiber.Maybe.to_fiber !p in Collection.push t.ready (Continue (fiber, k)); - next p t) - and return = + next p t); + t.return <- Some (fun k -> let p = Picos_thread.TLS.get_exn fiber_key in let fiber = Fiber.Maybe.to_fiber !p in Collection.push t.ready (Return (fiber, k)); - next p t) - and handler = { retc; exnc; effc } - and[@alert "-handler"] effc : - type a. a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = - function - | Fiber.Current -> t.current - | Fiber.Spawn r -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in - if Fiber.is_canceled fiber then t.yield - else begin - Atomic.incr t.num_alive_fibers; - Collection.push t.ready (Spawn (r.fiber, r.main)); - relaxed_wakeup t ~known_not_empty:true; - t.return - end - | Fiber.Yield -> t.yield - | Computation.Cancel_after r -> begin - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in - if Fiber.is_canceled fiber then t.yield - else - match - Select.cancel_after r.computation ~seconds:r.seconds r.exn r.bt - with - | () -> t.return - | exception exn -> - let bt = Printexc.get_raw_backtrace () in + next p t); + t.handler <- + { + retc = + (fun () -> + Atomic.decr t.num_alive_fibers; + let p = Picos_thread.TLS.get_exn fiber_key in + next p t); + exnc; + effc = + (fun (type a) (e : a Effect.t) : + ((a, _) Effect.Deep.continuation -> _) option -> + match e with + | Fiber.Current -> t.current + | Fiber.Spawn r -> + let p = Picos_thread.TLS.get_exn fiber_key in + let fiber = Fiber.Maybe.to_fiber !p in + if Fiber.is_canceled fiber then t.yield + else begin + Atomic.incr t.num_alive_fibers; + Collection.push t.ready (Spawn (r.fiber, r.main)); + relaxed_wakeup t ~known_not_empty:true; + t.return + end + | Fiber.Yield -> t.yield + | Computation.Cancel_after r -> begin + let p = Picos_thread.TLS.get_exn fiber_key in + let fiber = Fiber.Maybe.to_fiber !p in + if Fiber.is_canceled fiber then t.yield + else + match + Select.cancel_after r.computation ~seconds:r.seconds r.exn + r.bt + with + | () -> t.return + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + Some + (fun k -> + Collection.push t.ready (Raise (fiber, k, exn, bt)); + next p t) + end + | Trigger.Await trigger -> Some (fun k -> - Collection.push t.ready (Raise (fiber, k, exn, bt)); - next p t) - end - | Trigger.Await trigger -> - Some - (fun k -> - let p = Picos_thread.TLS.get_exn fiber_key in - let fiber = Fiber.Maybe.to_fiber !p in - if Fiber.try_suspend fiber trigger fiber k t.resume then next p t - else begin - Collection.push t.ready (Resume (fiber, k)); - next p t - end) - | _ -> None - and retc () = - Atomic.decr t.num_alive_fibers; - let p = Picos_thread.TLS.get_exn fiber_key in - next p t - in + let p = Picos_thread.TLS.get_exn fiber_key in + let fiber = Fiber.Maybe.to_fiber !p in + if Fiber.try_suspend fiber trigger fiber k t.resume then + next p t + else begin + Collection.push t.ready (Resume (fiber, k)); + next p t + end) + | _ -> None); + }; t let runner_on_this_thread t = @@ -294,12 +302,16 @@ let run_fiber ?context:t_opt fiber main = await t end -let run ?context ?(forbid = false) main = +let[@inline never] run ?context fiber main computation = + run_fiber ?context fiber main; + Computation.peek_exn computation + +let run ?context ?forbid main = + let forbid = match forbid with None -> false | Some forbid -> forbid in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in let main _ = Computation.capture computation main () in - run_fiber ?context fiber main; - Computation.peek_exn computation + run ?context fiber main computation let rec run_fiber_on n fiber main runner_main context = if n <= 1 then run_fiber ~context fiber main @@ -343,12 +355,15 @@ let run_fiber_on ?fatal_exn_handler ~n_domains fiber main = let bt = Printexc.get_raw_backtrace () in Some (exn, bt) in - run_fiber_on n_domains fiber main runner_main context -let run_on ?fatal_exn_handler ~n_domains ?(forbid = false) main = +let[@inline never] run_on ?fatal_exn_handler ~n_domains fiber main computation = + run_fiber_on ?fatal_exn_handler ~n_domains fiber main; + Computation.peek_exn computation + +let run_on ?fatal_exn_handler ~n_domains ?forbid main = + let forbid = match forbid with None -> false | Some forbid -> forbid in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in let main _ = Computation.capture computation main () in - run_fiber_on ?fatal_exn_handler ~n_domains fiber main; - Computation.peek_exn computation + run_on ?fatal_exn_handler ~n_domains fiber main computation