Skip to content

Commit

Permalink
Add pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
utkinis committed Oct 5, 2023
1 parent 5eaf613 commit 9e18317
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 34 deletions.
22 changes: 22 additions & 0 deletions scripts_future_API/test_pipeline.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using FastIce.Utils
using Profile

function main()
pipes = [Pipeline() for _ in 1:6]
for iter in 1:100
for ip in eachindex(pipes)
put!(pipes[ip]) do
sleep(1/60)
# println(" inside pipeline #$(ip)!")
end
end
sleep(1/10)
# println("outside, iter #$(iter)!")
take!.(pipes)
# println()
end
setdone!(pipe)
return
end

main()
3 changes: 2 additions & 1 deletion src/FastIce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ include("Grids/Grids.jl")
include("grid_operators.jl")
include("logging.jl")
include("fields.jl")
include("utils.jl")

include("Utils/Utils.jl")

include("physics.jl")

Expand Down
35 changes: 35 additions & 0 deletions src/Utils/Utils.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module Utils

export remove_dim, insert_dim
export Pipeline, setdone!
export extrapolate!

using FastIce.Fields
using KernelAbstractions

include("pipelines.jl")
include("extrapolate.jl")

# Returns a copy of the tuple `A` with element in position `D` removed
@inline remove_dim(::Val{D}, A::NTuple{N}) where {D,N} = ntuple(I -> I < D ? A[I] : A[I+1], Val(N - 1))
@inline remove_dim(::Val{1}, I::NTuple{1}) = 1

# Same as `remove_dim`, but for `CartesianIndex`
@inline remove_dim(dim, I::CartesianIndex) = remove_dim(dim, Tuple(I)) |> CartesianIndex

# Returns a copy of tuple `A`, but inserts `i` into position `D`
@inline insert_dim(::Val{D}, A::NTuple{N}, i) where {D, N} = ntuple(Val(N + 1)) do I
if I < D
A[I]
elseif I == D
i
else
A[I-1]
end
end

# Same as `insert_dim`, but for `CartesianIndex`
@inline insert_dim(dim, A::CartesianIndex, i) = insert_dim(dim, Tuple(A), i) |> CartesianIndex


end
33 changes: 0 additions & 33 deletions src/utils.jl → src/Utils/extrapolate.jl
Original file line number Diff line number Diff line change
@@ -1,34 +1,3 @@
module Utils

export remove_dim, insert_dim

# Returns a copy of the tuple `A` with element in position `D` removed
@inline remove_dim(::Val{D}, A::NTuple{N}) where {D,N} = ntuple(I -> I < D ? A[I] : A[I+1], Val(N - 1))
@inline remove_dim(::Val{1}, I::NTuple{1}) = 1

# Same as `remove_dim`, but for `CartesianIndex`
@inline remove_dim(dim, I::CartesianIndex) = remove_dim(dim, Tuple(I)) |> CartesianIndex

# Returns a copy of tuple `A`, but inserts `i` into position `D`
@inline insert_dim(::Val{D}, A::NTuple{N}, i) where {D, N} = ntuple(Val(N + 1)) do I
if I < D
A[I]
elseif I == D
i
else
A[I-1]
end
end

# Same as `insert_dim`, but for `CartesianIndex`
@inline insert_dim(dim, A::CartesianIndex, i) = insert_dim(dim, Tuple(A), i) |> CartesianIndex

export extrapolate!

using FastIce.Fields

using KernelAbstractions

@kernel function kernel_extrapolate_x!(A, I, N)
iy, iz = @index(Global, NTuple)
iy -= 1; iz -= 1
Expand Down Expand Up @@ -63,6 +32,4 @@ function extrapolate!(A::Field; async=true)
kernel_extrapolate_z!(backend, 256, (size(A, 1)+2, size(A, 2)+2))(A, I[3], N[3])
async || synchronize(backend)
return
end

end
52 changes: 52 additions & 0 deletions src/Utils/pipelines.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
mutable struct Pipeline
@atomic done::Bool
in::Channel
out::Base.Event
task::Task

function Pipeline(; pre=nothing, post=nothing)
in = Channel()
out = Base.Event(true)
this = new(false, in, out)

this.task = Threads.@spawn begin
isnothing(pre) || pre()
try
while !(@atomic this.done)
work = take!(in)
work()
notify(out)
end
catch err
@atomic this.done = true
rethrow(err)
finally
isnothing(post) || post()
end
end
errormonitor(this.task)
return this
end
end

setdone!(p::Pipeline) = @atomic p.done = true

Base.isdone(p::Pipeline) = @atomic p.done

function Base.put!(work::F, p::Pipeline) where {F}
if !(@atomic p.done)
put!(p.in, work)
else
error("Pipeline is not running")
end
return
end

function Base.take!(p::Pipeline)
if !(@atomic p.done)
wait(p.out)
else
error("Pipeline is not running")
end
return
end

0 comments on commit 9e18317

Please sign in to comment.