From 9e18317ba9dfc8595be3fb6a515b6d8fa9161876 Mon Sep 17 00:00:00 2001 From: Ivan Utkin Date: Thu, 5 Oct 2023 19:00:52 +0200 Subject: [PATCH] Add pipelines --- scripts_future_API/test_pipeline.jl | 22 +++++++++++ src/FastIce.jl | 3 +- src/Utils/Utils.jl | 35 +++++++++++++++++ src/{utils.jl => Utils/extrapolate.jl} | 33 ---------------- src/Utils/pipelines.jl | 52 ++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 34 deletions(-) create mode 100644 scripts_future_API/test_pipeline.jl create mode 100644 src/Utils/Utils.jl rename src/{utils.jl => Utils/extrapolate.jl} (62%) create mode 100644 src/Utils/pipelines.jl diff --git a/scripts_future_API/test_pipeline.jl b/scripts_future_API/test_pipeline.jl new file mode 100644 index 00000000..6485bb4e --- /dev/null +++ b/scripts_future_API/test_pipeline.jl @@ -0,0 +1,22 @@ +using FastIce.Utils +using Profile + +function main() + pipes = [Pipeline() for _ in 1:6] + for iter in 1:100 + for ip in eachindex(pipes) + put!(pipes[ip]) do + sleep(1/60) + # println(" inside pipeline #$(ip)!") + end + end + sleep(1/10) + # println("outside, iter #$(iter)!") + take!.(pipes) + # println() + end + setdone!(pipe) + return +end + +main() diff --git a/src/FastIce.jl b/src/FastIce.jl index 72fb62ca..f1772e42 100644 --- a/src/FastIce.jl +++ b/src/FastIce.jl @@ -7,7 +7,8 @@ include("Grids/Grids.jl") include("grid_operators.jl") include("logging.jl") include("fields.jl") -include("utils.jl") + +include("Utils/Utils.jl") include("physics.jl") diff --git a/src/Utils/Utils.jl b/src/Utils/Utils.jl new file mode 100644 index 00000000..89dcf88d --- /dev/null +++ b/src/Utils/Utils.jl @@ -0,0 +1,35 @@ +module Utils + +export remove_dim, insert_dim +export Pipeline, setdone! +export extrapolate! + +using FastIce.Fields +using KernelAbstractions + +include("pipelines.jl") +include("extrapolate.jl") + +# Returns a copy of the tuple `A` with element in position `D` removed +@inline remove_dim(::Val{D}, A::NTuple{N}) where {D,N} = ntuple(I -> I < D ? A[I] : A[I+1], Val(N - 1)) +@inline remove_dim(::Val{1}, I::NTuple{1}) = 1 + +# Same as `remove_dim`, but for `CartesianIndex` +@inline remove_dim(dim, I::CartesianIndex) = remove_dim(dim, Tuple(I)) |> CartesianIndex + +# Returns a copy of tuple `A`, but inserts `i` into position `D` +@inline insert_dim(::Val{D}, A::NTuple{N}, i) where {D, N} = ntuple(Val(N + 1)) do I + if I < D + A[I] + elseif I == D + i + else + A[I-1] + end +end + +# Same as `insert_dim`, but for `CartesianIndex` +@inline insert_dim(dim, A::CartesianIndex, i) = insert_dim(dim, Tuple(A), i) |> CartesianIndex + + +end \ No newline at end of file diff --git a/src/utils.jl b/src/Utils/extrapolate.jl similarity index 62% rename from src/utils.jl rename to src/Utils/extrapolate.jl index eeae9d21..06513dea 100644 --- a/src/utils.jl +++ b/src/Utils/extrapolate.jl @@ -1,34 +1,3 @@ -module Utils - -export remove_dim, insert_dim - -# Returns a copy of the tuple `A` with element in position `D` removed -@inline remove_dim(::Val{D}, A::NTuple{N}) where {D,N} = ntuple(I -> I < D ? A[I] : A[I+1], Val(N - 1)) -@inline remove_dim(::Val{1}, I::NTuple{1}) = 1 - -# Same as `remove_dim`, but for `CartesianIndex` -@inline remove_dim(dim, I::CartesianIndex) = remove_dim(dim, Tuple(I)) |> CartesianIndex - -# Returns a copy of tuple `A`, but inserts `i` into position `D` -@inline insert_dim(::Val{D}, A::NTuple{N}, i) where {D, N} = ntuple(Val(N + 1)) do I - if I < D - A[I] - elseif I == D - i - else - A[I-1] - end -end - -# Same as `insert_dim`, but for `CartesianIndex` -@inline insert_dim(dim, A::CartesianIndex, i) = insert_dim(dim, Tuple(A), i) |> CartesianIndex - -export extrapolate! - -using FastIce.Fields - -using KernelAbstractions - @kernel function kernel_extrapolate_x!(A, I, N) iy, iz = @index(Global, NTuple) iy -= 1; iz -= 1 @@ -63,6 +32,4 @@ function extrapolate!(A::Field; async=true) kernel_extrapolate_z!(backend, 256, (size(A, 1)+2, size(A, 2)+2))(A, I[3], N[3]) async || synchronize(backend) return -end - end \ No newline at end of file diff --git a/src/Utils/pipelines.jl b/src/Utils/pipelines.jl new file mode 100644 index 00000000..8d24e4ea --- /dev/null +++ b/src/Utils/pipelines.jl @@ -0,0 +1,52 @@ +mutable struct Pipeline + @atomic done::Bool + in::Channel + out::Base.Event + task::Task + + function Pipeline(; pre=nothing, post=nothing) + in = Channel() + out = Base.Event(true) + this = new(false, in, out) + + this.task = Threads.@spawn begin + isnothing(pre) || pre() + try + while !(@atomic this.done) + work = take!(in) + work() + notify(out) + end + catch err + @atomic this.done = true + rethrow(err) + finally + isnothing(post) || post() + end + end + errormonitor(this.task) + return this + end +end + +setdone!(p::Pipeline) = @atomic p.done = true + +Base.isdone(p::Pipeline) = @atomic p.done + +function Base.put!(work::F, p::Pipeline) where {F} + if !(@atomic p.done) + put!(p.in, work) + else + error("Pipeline is not running") + end + return +end + +function Base.take!(p::Pipeline) + if !(@atomic p.done) + wait(p.out) + else + error("Pipeline is not running") + end + return +end