Skip to content

Commit

Permalink
First version of FolderDict (#1005)
Browse files Browse the repository at this point in the history
* wip building FolderDict

* fix insert of FolderDict

* best standardize FolderDict keys and id, api

* add delete! to FolderDict, fix KeyError

* add docs to FolderDict

* FolderDict better docs

* fis FolderDict test

* FolderDict experimental support multithreaded

* FolderDict better thread safety
  • Loading branch information
dehann authored Feb 17, 2024
1 parent bfa158a commit 854a350
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/Caesar.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ include("objects/ObjectAffordanceSubcloud.jl")
# ImageDraw functionality, used by many extensions and therefore a regular (but minimum able) dependency
include("images/imagedraw.jl")

# experimentals
include("dev/FolderDict.jl")

# weakdeps
include("../ext/factors/Pose2AprilTag4Corners.jl")
include("../ext/factors/ScanMatcherPose2.jl")
Expand Down
219 changes: 219 additions & 0 deletions src/dev/FolderDict.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# large dict using some store to hold values for reducing RAM utilization


using DataStructures
using UUIDs
using DocStringExtensions
using Serialization

import Base: getindex, setindex!, delete!, keys

##

"""
$TYPEDEF
Walks and talks like a Dict but actually maps most of the data volume to a folder.
Includes semi-intelligent cache management for faster operations.
Special Features:
- User can set cache_size
- User can set working folder for storage
- User can set serialization and deserialization functions, e.g. use JSON3 or Serialization
- User can set how Dict keys map to stored id's (see DFG)
- EXPERIMENTAL: thread safe
Developer Notes
- all keys must always be in `.keydict`, regardless of cache or priority
WIP Constraints:
- FIXME, had trouble inheriting from `Base.AbstractDict`
- TODO, better use of thread-safe locks/mutexes
"""
@kwdef struct FolderDict{K,V}
""" regular dict elements kept in memory for rapid access """
cache::Dict{K,V} = Dict{K,V}()
""" priority queue for managing most important cache elements """
pqueue::PriorityQueue{K, Int} = PriorityQueue{K,Int}()
""" cache size """
cache_size::Int = 100
""" unique labels for dict elements sent to the store """
keydict::Dict{K, UUID} = Dict{K, UUID}()
""" mapping keys and ids for different use cases, default is always new uuid.
overwrite with `(k) -> k` to make keys and ids identical """
key_to_id::Function = (k) -> uuid4()
""" read lock via Tasks """
readtasks::Dict{K, Task} = Dict{K, Task}()
""" write lock via Tasks """
writetasks::Dict{K, Task} = Dict{K, Task}()
""" working directory where elemental files are stored """
wdir::String = begin
wdir_ = joinpath(tempdir(), "$(uuid4())")
mkpath(wdir_)
wdir_
end
""" serialization function with default """
serialize::Function = Serialization.serialize
""" deserialization function with default """
deserialize::Function = Serialization.deserialize
end

##


function Base.getindex(
sd::FolderDict,
f
)
# first check if there is an ongoing reader on this key
if haskey(sd.readtasks, f)
# NOTE super remote possibility that a task is deleted before this dict lookup and wait starts
wait(sd.readtasks[f])
# values should now be cached for multithreaded reads
end
# also check if there is an ongoing writetask on this key
if haskey(sd.writetasks, f)
wait(sd.writetasks[f])
# now it is safe to proceed in reading newly written value to this key
# TODO slightly excessive lock, since only unlocks once storage write is done, but cache was available sooner.
end

# if already cached, and assuming a write task has not deleted the cache element yet (MUST delete from pqueue first)
if haskey(sd.pqueue, f)
# increase this priority, but be wary of over emphasis for newcomer keys
sd.pqueue[f] += 1
# Assume key must be in the cache
return sd.cache[f]
end
# get id associated with this key (also throw KeyError if not present)
flb = sd.keydict[f]
# performance trick, start async load from IO resources while doing some housekeeping
sd.readtasks[f] = @async begin
# All keys must always be present in keydict
toload = joinpath(sd.wdir, "$flb")
# fetch from cold storage
sd.deserialize(toload)
end

# you've hit a cache miss, so start making space or new elements
_typemin(::PriorityQueue{P,T}) where {P,T} = typemin(T)
maxpriority = _typemin(sd.pqueue)
for k in keys(sd.pqueue)
# decrement all priorities by one - to help newcomer keys compete for priority
sd.pqueue[k] -= 1
# preemptively find the maxpriority value for later use
maxpriority = maxpriority < sd.pqueue[k] ? sd.pqueue[k] : maxpriority
end

# remove excess cache if necessary
if sd.cache_size <= length(sd.pqueue)
# by dropping lowest priority cache element
dropkey = first(sd.pqueue)[1]
delete!(sd.pqueue, dropkey)
delete!(sd.cache, dropkey)
end

# assume middle of the pack priority for this cache-miss
sd.pqueue[f] = round(Int, maxpriority/2) # pqueue is arbitor, hence populated last
# block on IO resources fetching data
# add to previously missed data to cache and pqueue
sd.cache[f] = fetch(sd.readtasks[f])
# TODO, possible race condition in slight delay betweem writing to cache[f] after fetching data unblocks.
delete!(sd.readtasks, f)

# return data to user
return sd.cache[f]
end


function setindex!(
sd::FolderDict,
v,
k
)
# first check if there is an ongoing reader on this key
if haskey(sd.readtasks, k)
# NOTE super remote possibility that a task is deleted before this dict lookup and wait starts
wait(sd.readtasks[k])
end
# also check if there is an ongoing writetask on this key
if haskey(sd.writetasks, k)
wait(sd.writetasks[k])
# now it is safe to proceed in reading newly written value to this key
# TODO slightly excessive lock, since only unlocks once storage write is done, but cache was available sooner.
end

# immediately/always insert new data into folder store with a unique id
id = sd.key_to_id(k)
flb = joinpath(sd.wdir, "$id")
sd.writetasks[k] = @async sd.serialize(flb, v) # for sluggish IO

# should any obsolete files be deleted from the filesystem?
dtsk = if haskey(sd.keydict, k)
# delete this store location
dlb = sd.keydict[k]
delete!(sd.keydict, k)
@async Base.Filesystem.rm(joinpath(sd.wdir, "$dlb")) # for sluggish IO
else
# dummy task for type stability
@async nothing
end
# set new uuid in keydict only after potential overwrite delete
sd.keydict[k] = id

# ensure pqueue has correct value in all cases
prk = collect(keys(sd.pqueue))
maxpriority = 0
# truncate for cache_size, and also allow for cache_size==0
if 0 < sd.cache_size <= length(sd.pqueue)
maxpriority = sd.pqueue[prk[end]]
rmk = prk[1]
delete!(sd.pqueue,rmk)
delete!(sd.cache, rmk)
end
# assume middle of the pack priority for this cache-miss
sd.pqueue[k] = round(Int, maxpriority/2)
# Reminder, pqueue is arbitor over cache
sd.cache[k] = v

# wait for any disk mutations to finish
wait(dtsk)
# last thing is to wait and free write task locks, assuming waiting readers
wait(sd.writetasks[k])
delete!(sd.writetasks, k)

# return the value
return v
end


# # if multiple access occurs (i.e. shared memory)
# if haskey(sd.writetask, f)
# # wait until the underlying task is complete
# wait(sd.writetask[f]) # COUNTER MUST USE FETCH
# end


function delete!(
sd::FolderDict,
k
)
dlb = sd.keydict[k]
delete!(sd.keydict, k)
dtsk = @async Base.Filesystem.rm(joinpath(sd.wdir, "$dlb")) # for sluggish IO

if haskey(sd.pqueue, k)
delete!(sd.pqueue,k)
delete!(sd.cache, k)
end

wait(dtsk)

# TBD unusual Julia return of full collection
return sd
end


keys(sd::FolderDict) = keys(sd.keydict)

##
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ TEST_GROUP = get(ENV, "IIF_TEST_GROUP", "all")
if TEST_GROUP in ["all", "basic_functional_group"]
println("Starting tests...")
# highly multipackage tests that don't fit well in specific library dependencies.
include("testFolderDict.jl")
include("testScatterAlignParched.jl")
include("testScatterAlignPose2.jl")
include("testScatterAlignPose3.jl")
Expand Down
60 changes: 60 additions & 0 deletions test/testFolderDict.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

## test
using Test
using UUIDs
using Caesar
import Caesar: FolderDict

##

@testset "Basic functional tests of FolderDict" begin
##

fd = FolderDict{Symbol, Int}(;cache_size=2)

@show fd.wdir

fd[:a] = 1

@test 1 == length(fd.keydict)
@test fd.keydict[:a] isa UUID
@test 1 == length(fd.pqueue)
@test 1 == length(fd.cache)
@test 1 == fd.cache[:a]
@test 1 == fd[:a] # all up test for getindex when key in cache

fd[:b] = 2

@test 2 == length(fd.keydict)
@test fd.keydict[:a] != fd.keydict[:b]
@test 2 == length(fd.pqueue)
@test 2 == length(fd.cache)
@test 2 == fd.cache[:b]
@test 2 == fd[:b] # all up test for getindex when key in cache

fd[:c] = 3

@test 3 == length(fd.keydict)
@test fd.keydict[:a] != fd.keydict[:c]
@test 2 == length(fd.pqueue)
@test 2 == length(fd.cache)
@test 3 == fd.cache[:c]
@test 3 == fd[:c] # all up test for getindex when key in cache


delete!(fd, :b)

@test 2 == length(fd.keydict)
@test fd.keydict[:a] != fd.keydict[:c]
@test 2 == length(fd.pqueue)
@test 2 == length(fd.cache)
@test 3 == fd.cache[:c]
@test 3 == fd[:c] # all up test for getindex when key in cache

@test_throws KeyError fd[:b]


@test 2 == length(intersect([:a; :c], collect(keys(fd))))

##
end

0 comments on commit 854a350

Please sign in to comment.