From 2e2b8330962d12c3ca1696e467ac9e2df86ae084 Mon Sep 17 00:00:00 2001 From: gbaraldi Date: Fri, 16 Aug 2024 16:43:11 -0300 Subject: [PATCH 1/3] Use atomics for the StickyWorkqueue array instead of a lock --- base/task.jl | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/base/task.jl b/base/task.jl index 5e4af6747f128..c1d3bec8deea9 100644 --- a/base/task.jl +++ b/base/task.jl @@ -921,30 +921,30 @@ function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T end const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} -global Workqueues::Vector{StickyWorkqueue} = [StickyWorkqueue()] -const Workqueues_lock = Threads.SpinLock() -const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable +global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) # TODO: Is the extra allocation here worth extra code +const Workqueue = (@atomic :acquire Base.Workqueues)[1] # default work queue is thread 1 // TODO: deprecate this variable function workqueue_for(tid::Int) - qs = Workqueues + qs = @atomic :acquire Base.Workqueues if length(qs) >= tid && isassigned(qs, tid) - return @inbounds qs[tid] + return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set + # all following Workqueue Memorys have the same queues at those indices end - # slow path to allocate it - @assert tid > 0 - l = Workqueues_lock - @lock l begin - qs = Workqueues - if length(qs) < tid - nt = Threads.maxthreadid() - @assert tid <= nt - global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) - end - if !isassigned(qs, tid) - @inbounds qs[tid] = StickyWorkqueue() + while length(qs) < tid + nt = Threads.maxthreadid() + @assert tid <= nt + new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) + if (@atomicreplace :acquire_release :monotonic Base.Workqueues qs => new_qs).success + qs = new_qs + break + else + qs = @atomic :acquire Base.Workqueues end - return @inbounds qs[tid] end + if !isassigned(qs, tid) + @inbounds qs[tid] = StickyWorkqueue() + end + return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set end function enq_work(t::Task) From 4f2105bdea4e95e12fae14fb041f840ae1155231 Mon Sep 17 00:00:00 2001 From: gbaraldi Date: Fri, 16 Aug 2024 17:45:34 -0300 Subject: [PATCH 2/3] Apply suggestions from code review --- base/task.jl | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/base/task.jl b/base/task.jl index c1d3bec8deea9..f3c7ad90219b5 100644 --- a/base/task.jl +++ b/base/task.jl @@ -921,30 +921,32 @@ function list_deletefirst!(W::IntrusiveLinkedListSynchronized{T}, t::T) where T end const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} -global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) # TODO: Is the extra allocation here worth extra code +global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) const Workqueue = (@atomic :acquire Base.Workqueues)[1] # default work queue is thread 1 // TODO: deprecate this variable function workqueue_for(tid::Int) qs = @atomic :acquire Base.Workqueues if length(qs) >= tid && isassigned(qs, tid) - return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set - # all following Workqueue Memorys have the same queues at those indices + return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index is set + # all following Workqueue Memorys have the same queues at those indices end while length(qs) < tid + # slow path to allocate it nt = Threads.maxthreadid() @assert tid <= nt - new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) - if (@atomicreplace :acquire_release :monotonic Base.Workqueues qs => new_qs).success + new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) # This overallocates by nt-1 + xchg = @atomicreplace :acquire_release :acquire Base.Workqueues qs => new_qs + if xchg.success qs = new_qs break else - qs = @atomic :acquire Base.Workqueues + qs = xchg.old end end if !isassigned(qs, tid) @inbounds qs[tid] = StickyWorkqueue() end - return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index in set + return @inbounds qs[tid] end function enq_work(t::Task) From e6ac5feedb62bafd17315c5667d139c2dfff68e8 Mon Sep 17 00:00:00 2001 From: Gabriel Baraldi Date: Tue, 20 Aug 2024 17:32:56 -0300 Subject: [PATCH 3/3] Revert changes except the switch to a Memory --- base/task.jl | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/base/task.jl b/base/task.jl index f3c7ad90219b5..721d8b929dc91 100644 --- a/base/task.jl +++ b/base/task.jl @@ -922,31 +922,29 @@ end const StickyWorkqueue = IntrusiveLinkedListSynchronized{Task} global Workqueues::Memory{StickyWorkqueue} = Memory{StickyWorkqueue}([StickyWorkqueue()]) -const Workqueue = (@atomic :acquire Base.Workqueues)[1] # default work queue is thread 1 // TODO: deprecate this variable +const Workqueues_lock = Threads.SpinLock() +const Workqueue = Workqueues[1] # default work queue is thread 1 // TODO: deprecate this variable function workqueue_for(tid::Int) - qs = @atomic :acquire Base.Workqueues + qs = Workqueues if length(qs) >= tid && isassigned(qs, tid) - return @inbounds qs[tid] # This assumes that threads don't get deleted and that once an index is set - # all following Workqueue Memorys have the same queues at those indices + return @inbounds qs[tid] end - while length(qs) < tid - # slow path to allocate it - nt = Threads.maxthreadid() - @assert tid <= nt - new_qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) # This overallocates by nt-1 - xchg = @atomicreplace :acquire_release :acquire Base.Workqueues qs => new_qs - if xchg.success - qs = new_qs - break - else - qs = xchg.old + # slow path to allocate it + @assert tid > 0 + l = Workqueues_lock + @lock l begin + qs = Workqueues + if length(qs) < tid + nt = Threads.maxthreadid() + @assert tid <= nt + global Workqueues = qs = copyto!(typeof(qs)(undef, length(qs) + nt - 1), qs) end + if !isassigned(qs, tid) + @inbounds qs[tid] = StickyWorkqueue() + end + return @inbounds qs[tid] end - if !isassigned(qs, tid) - @inbounds qs[tid] = StickyWorkqueue() - end - return @inbounds qs[tid] end function enq_work(t::Task)