Skip to content

Commit

Permalink
deepcopy for FolderDict (#1007)
Browse files Browse the repository at this point in the history
* deepcopy for FolderDict

* fix haskey FolderDict

* add show FolderDict and test

* more todo dev docs for FolderDict
  • Loading branch information
dehann authored Feb 18, 2024
1 parent 854a350 commit 209e9f0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 2 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ Major changes and news in Caesar.jl.
- Updates for IncrementalInference upgrades relating to StaticArray variable values.
- Manifold updates to factors.
- Downstreamed std ROS handlers to PyCaesar.
- Fix `saveLAS` to use `Int32`.
- Several compat updates for dependencies.
- Restore Docs build, and update links for NavAbility at WhereWhen.ai Technologies Inc.
- Introduce `FolderDict` as data structure for lower memory consumption, also as potential BlobStore.

## Changes in v0.13

Expand Down
60 changes: 59 additions & 1 deletion src/dev/FolderDict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ using UUIDs
using DocStringExtensions
using Serialization

import Base: getindex, setindex!, delete!, keys
import Base: getindex, setindex!, delete!, keys, haskey, deepcopy, show

##

Expand All @@ -25,10 +25,13 @@ Special Features:
Developer Notes
- all keys must always be in `.keydict`, regardless of cache or priority
- pqueue is arbitor, so assumed that .cache will mirror happenings of pqueue
WIP Constraints:
- FIXME, had trouble inheriting from `Base.AbstractDict`
- TODO, better use of thread-safe locks/mutexes
- TODO, allow mapping to existing directory of elementals
- will only work for `key_to_id = (k::UUID) -> k`
"""
@kwdef struct FolderDict{K,V}
""" regular dict elements kept in memory for rapid access """
Expand All @@ -46,6 +49,12 @@ WIP Constraints:
readtasks::Dict{K, Task} = Dict{K, Task}()
""" write lock via Tasks """
writetasks::Dict{K, Task} = Dict{K, Task}()
""" event signal for deepcopy synchronization. Blocks new setindex! during a deepcopy """
copyevent::Base.Event = begin
_e = Base.Event()
notify(_e) # dont start with blocking event, requires a reset for use
_e
end
""" working directory where elemental files are stored """
wdir::String = begin
wdir_ = joinpath(tempdir(), "$(uuid4())")
Expand All @@ -61,6 +70,20 @@ end
##


function show(
io::IO,
sd::FolderDict{K,V}
) where {K,V}
println(io, "FolderDict{$K,$V} at $(sd.wdir)")
println(io, " with $(length(sd.pqueue)) of $(length(sd.keydict)) entries cached, e.g.:")
ks = collect(keys(sd.cache))
for i in 1:minimum((5,length(sd.cache)))
tk = ks[i]
println(io, " ",tk," => ", sd.cache[tk])
end
end
Base.show(io::IO, ::MIME"text/plain", fd::FolderDict) = show(io, fd)

function Base.getindex(
sd::FolderDict,
f
Expand Down Expand Up @@ -131,6 +154,8 @@ function setindex!(
v,
k
)
# don't start a new write if a copy is in progress
wait(sd.copyevent)
# first check if there is an ongoing reader on this key
if haskey(sd.readtasks, k)
# NOTE super remote possibility that a task is deleted before this dict lookup and wait starts
Expand Down Expand Up @@ -215,5 +240,38 @@ end


keys(sd::FolderDict) = keys(sd.keydict)
haskey(sd::FolderDict, k) = haskey(sd.keydict, k)

function deepcopy(
sd::FolderDict{K,V}
) where {K,V}
# block any new writes that want to start
reset(sd.copyevent)
# wait for any remaining write tasks to finish
for (k,t) in sd.writetasks
wait(t)
end
# actually make a full copy of the working folder
tsk = @async Base.Filesystem.cp(sd.wdir, sd_.wdir; force=true)

# copy or duplicate all but pqueue and cache, which must be newly cached in new copy of FolderDict (to ensure pqueue and cache remain in lock step)
sd_ = FolderDict{K,V}(;
keydict = deepcopy(sd.keydict),
cache_size = sd.cache_size,
key_to_id = sd.key_to_id,
serialize = sd.serialize,
deserialize = sd.deserialize,
)

# wait for storage copy to complete
wait(tsk)

# notify any pending writes
notify(sd.copyevent)

# return new deepcopy of FolderDict
return sd_
end


##
30 changes: 29 additions & 1 deletion test/testFolderDict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fd = FolderDict{Symbol, Int}(;cache_size=2)

fd[:a] = 1

@test haskey(fd, :a)
@test 1 == length(fd.keydict)
@test fd.keydict[:a] isa UUID
@test 1 == length(fd.pqueue)
Expand All @@ -25,6 +26,7 @@ fd[:a] = 1

fd[:b] = 2

@test haskey(fd, :b)
@test 2 == length(fd.keydict)
@test fd.keydict[:a] != fd.keydict[:b]
@test 2 == length(fd.pqueue)
Expand All @@ -34,16 +36,24 @@ fd[:b] = 2

fd[:c] = 3

@test haskey(fd, :c)
@test 3 == length(fd.keydict)
@test fd.keydict[:a] != fd.keydict[:c]
@test 2 == length(fd.pqueue)
@test 2 == length(fd.cache)
@test 3 == fd.cache[:c]
@test 3 == fd[:c] # all up test for getindex when key in cache

# make sure folder recovery works by fetching from all three keys, with cache_size set to 2
@test fd[:a] != fd[:b]
@test fd[:b] != fd[:c]

@show fd;

delete!(fd, :b)

# TODO check that the actual folder stored was deleted from permanent storage after `delete!( ,:b)`

@test 2 == length(fd.keydict)
@test fd.keydict[:a] != fd.keydict[:c]
@test 2 == length(fd.pqueue)
Expand All @@ -53,8 +63,26 @@ delete!(fd, :b)

@test_throws KeyError fd[:b]


@test 2 == length(intersect([:a; :c], collect(keys(fd))))
@test !haskey(fd, :b)
@test haskey(fd, :a)
@test haskey(fd, :c)


fd_copy = deepcopy(fd)

@show fd_copy;

@test !haskey(fd_copy, :b)
@test haskey(fd_copy, :a)
@test haskey(fd_copy, :c)

# make sure folder recovery works by fetching from all three keys, with cache_size set to 2
@test fd_copy[:a] != fd_copy[:c]

# make sure folder recovery works by fetching from all three keys, with cache_size set to 2
@test fd[:a] == fd_copy[:a]
@test fd[:c] == fd_copy[:c]

##
end

0 comments on commit 209e9f0

Please sign in to comment.