Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasIsensee committed Aug 26, 2024
1 parent 99782ad commit 4db6d39
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 149 deletions.
62 changes: 16 additions & 46 deletions src/datasets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ function read_empty(rr::ReadRepresentation{T}, f::JLDFile,
dimensions_attr.datatype == h5fieldtype(f, Int64, Int64, Val{true}) || throw(UnsupportedFeatureException())

seek(io, dimensions_attr.data_offset)
v = construct_array(io, T, Val(ndims))
v = construct_array(io, T, ndims)
if isconcretetype(T)
for i = 1:length(v)
@inbounds v[i] = jlconvert(rr, f, Ptr{Cvoid}(0), header_offset)
Expand Down Expand Up @@ -244,30 +244,12 @@ function get_ndims_offset(f::JLDFile, dataspace::ReadDataspace, attributes::Abst
end

"""
construct_array{T}(io::IO, ::Type{T}, ::Val{ndims})
construct_array(io::IO, eltype, ndims::Int)
Construct array by reading `ndims` dimensions from `io`. Assumes `io` has already been
seeked to the correct position.
"""
function construct_array(io::IO, ::Type{T}, ::Val{1}) where {T}
n = jlread(io, Int64)
Vector{T}(undef, n)
end

function construct_array(io::IO, ::Type{T}, ::Val{2}) where {T}
d2 = jlread(io, Int64)
d1 = jlread(io, Int64)
Matrix{T}(undef, d1, d2)
end

function construct_array(io::IO, ::Type{T}, ::Val{3}) where {T}
d3 = jlread(io, Int64)
d2 = jlread(io, Int64)
d1 = jlread(io, Int64)
Array{T,3}(undef, d1, d2, d3)
end

function construct_array(io::IO, ::Type{T}, ::Val{N})::Array{T,N} where {T,N}
function construct_array(io::IO, ::Type{T}, N::Int) where {T}
ds = reverse(ntuple(i->jlread(io, Int64), Val(N)))
Array{T,N}(undef, ds...)
end
Expand All @@ -283,7 +265,7 @@ end
ndims, offset = get_ndims_offset(f, dataspace, attributes)

seek(io, offset)
v = construct_array(io, T, Val(Int(ndims)))
v = construct_array(io, T, Int(ndims))
n = length(v)
seek(io, data_offset)
if iscompressed(filters)
Expand All @@ -296,7 +278,7 @@ end
else
ndims, offset = get_ndims_offset(f, dataspace, attributes)
seek(io, offset)
v = construct_array(io, T, Val(Int(ndims)))
v = construct_array(io, T, Int(ndims))
if layout.version == 3
# version 1 B-tree
# This version appears to be padding incomplete chunks
Expand Down Expand Up @@ -385,14 +367,17 @@ end
seek(io, header_offset)
f.end_of_data = header_offset + fullsz

if ismutabletype(typeof(data)) && !isa(wsession, JLDWriteSession{Union{}})
wsession.h5offset[objectid(data)] = h5offset(f, header_offset)
push!(wsession.objects, data)
end
track!(wsession, data, header_offset)

cio = begin_checksum_write(io, fullsz - 4)
write_object_header_and_dataspace_message(cio, f, psz, dataspace)
write_datatype_message(cio, datatype)
jlwrite(cio, ObjectStart(size_flag(psz)))
write_size(cio, psz)
write_header_message(cio, Val(HmFillValue); flags=0x09)
write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size)
for attr in dataspace.attributes
write_header_message(cio, f, attr)
end
write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt)

# Data storage layout
if layout_class == LcCompact
Expand Down Expand Up @@ -429,25 +414,10 @@ end
h5offset(f, header_offset)
end

function write_object_header_and_dataspace_message(cio::IO, f::JLDFile, psz::Int, dataspace::WriteDataspace)
jlwrite(cio, ObjectStart(size_flag(psz)))
write_size(cio, psz)
write_header_message(cio, Val(HmFillValue); flags=0x09)
write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size)
for attr in dataspace.attributes
write_header_message(cio, f, attr)
end
end

write_datatype_message(cio::IO, dt::H5Datatype) =
write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt)


@nospecializeinfer function write_dataset(f::JLDFile, @nospecialize(x), wsession::JLDWriteSession)::RelOffset
if ismutabletype(typeof(x)) && !isa(wsession, JLDWriteSession{Union{}})
offset = get(wsession.h5offset, objectid(x), UNDEFINED_ADDRESS)
offset != UNDEFINED_ADDRESS && return offset
end
offset = get_tracked(wsession, x)
offset != UNDEFINED_ADDRESS && return offset
odr = objodr(x)
write_dataset(f, WriteDataspace(f, x, odr), h5type(f, x), odr, x, wsession)::RelOffset
end
Expand Down
113 changes: 17 additions & 96 deletions src/explicit_datasets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ end
Write data to file using metadata prepared in the `dataset`.
"""
function write_dataset(dataset::Dataset, data)
function write_dataset(dataset::Dataset, data, wsession::JLDWriteSession=JLDWriteSession())
f = dataset.parent.f
if dataset.offset != UNDEFINED_ADDRESS
throw(ArgumentError("Dataset has already been written to file"))
end
wsession = JLDWriteSession()
# first need to figure out if data type and dataspace are defined / correct
if isnothing(dataset.datatype)
dataset.datatype = h5type(f, data)
Expand All @@ -134,103 +133,24 @@ function write_dataset(dataset::Dataset, data)
dataset.dataspace = WriteDataspace(f, data, odr)
end
dataspace = dataset.dataspace
# Attributes
attributes = map(collect(dataset.attributes)) do (name, attr)
attr isa WrittenAttribute && return attr
return WrittenAttribute(dataset.parent.f, name, attr)
throw(ArgumentError("Invalid attribute: $a"))
end
io = f.io
odr = objodr(data)
datasz = odr_sizeof(odr)::Int * numel(dataspace)::Int

psz = payload_size_without_storage_message(dataspace, datatype)::Int

psz += sum(message_size.(attributes), init=0)

# minimum extra space for continuation message
psz += jlsizeof(HeaderMessage) + jlsizeof(RelOffset) + jlsizeof(Length)


# determine layout class
# DataLayout object is only available after the data is written
if datasz == 0 || (!(data isa Array) && datasz < 8192)
layout_class = LcCompact
psz += jlsizeof(Val(HmDataLayout); layout_class, data_size=datasz)
elseif !isnothing(dataset.chunk) || !isempty(dataset.filters.filters)
if !isempty(dataset.filters.filters)
filter_id = dataset.filters.filters[1].id
invoke_again, compressor = get_compressor(filter_id)
if invoke_again
return Base.invokelatest(write_dataset, dset, data)::RelOffset
end
# Do some additional checks on the data here
layout_class = LcChunked
# improve filter support here
psz += chunked_storage_message_size(ndims(data)) + pipeline_message_size(filter_id::UInt16)
else
layout_class = LcContiguous
psz += jlsizeof(Val(HmDataLayout); layout_class)
compressor = nothing
end
fullsz = jlsizeof(ObjectStart) + size_size(psz) + psz + 4
offset = write_dataset(f, dataspace, datatype, odr, data, wsession, compressor)
!isempty(dataset.name) && (dataset.parent[dataset.name] = offset)

header_offset = f.end_of_data
seek(io, header_offset)
f.end_of_data = header_offset + fullsz

if ismutabletype(typeof(data)) && !isa(wsession, JLDWriteSession{Union{}})
wsession.h5offset[objectid(data)] = h5offset(f, header_offset)
push!(wsession.objects, data)
end

cio = begin_checksum_write(io, fullsz - 4)
write_object_header_and_dataspace_message(cio, f, psz, dataspace)
write_datatype_message(cio, datatype)
for a in attributes
write_header_message(cio, f, a, wsession)
end
# Data storage layout
if layout_class == LcCompact
write_header_message(cio, Val(HmDataLayout); layout_class, data_size=datasz)
if datasz != 0
write_data(cio, f, data, odr, datamode(odr), wsession)
end
dataset.header_chunk_info = (header_offset, position(cio)+20, position(cio))
# Add NIL message replacable by continuation message
write_continuation_placeholder(cio)
jlwrite(io, end_checksum(cio))
elseif layout_class == LcChunked
write_filter_pipeline_message(cio, filter_id)

# deflate first
deflated = deflate_data(f, data, odr, wsession, compressor)

write_chunked_storage_message(cio, odr_sizeof(odr), size(data), length(deflated), h5offset(f, f.end_of_data))
dataset.header_chunk_info = (header_offset, position(cio)+20, position(cio))
write_continuation_placeholder(cio)
jlwrite(f.io, end_checksum(cio))

seek(f.io, f.end_of_data)
f.end_of_data += length(deflated)
jlwrite(f.io, deflated)
else
# Align contiguous chunk to 8 bytes in the file
address = f.end_of_data + 8 - mod1(f.end_of_data, 8)
data_address = h5offset(f, address)
write_header_message(cio, Val(HmDataLayout);
layout_class, data_address, data_size=datasz)

dataset.header_chunk_info = (header_offset, position(cio)+20, position(cio))
# Add NIL message replacable by continuation message
write_continuation_placeholder(cio)
jlwrite(io, end_checksum(cio))

f.end_of_data = address + datasz
seek(io, address)
write_data(io, f, data, odr, datamode(odr), wsession)
# Attributes
# TODO: this can be optimized by writing all attributes at once
for (name, attr) in pairs(dataset.attributes)
add_attribute(dataset, name, value, wsession)
end

offset = h5offset(f, header_offset)
!isempty(dataset.name) && (dataset.parent[dataset.name] = offset)
return offset
end

Expand Down Expand Up @@ -301,11 +221,6 @@ function get_dataset(f::JLDFile, offset::RelOffset, g=f.root_group, name="")
return dset
end

function add_attribute(dset::Dataset, name::String, data::Dataset)
# link an existing dataset as attribute
throw(UnsupportedFeatureException("Not implemented"))
end

# Attributes
message_size(msg::WrittenAttribute) = jlsizeof(HeaderMessage) + jlsizeof(msg)
function write_header_message(io,f::JLDFile, msg::WrittenAttribute, wsession=JLDWriteSession())
Expand Down Expand Up @@ -562,8 +477,14 @@ function allocate_early(dset::Dataset, T::DataType)
f.end_of_data = header_offset + fullsz

cio = begin_checksum_write(io, fullsz - 4)
write_object_header_and_dataspace_message(cio, f, psz, dataspace)
write_datatype_message(cio, datatype)
jlwrite(cio, ObjectStart(size_flag(psz)))
write_size(cio, psz)
write_header_message(cio, Val(HmFillValue); flags=0x09)
write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size)
for attr in dataspace.attributes
write_header_message(cio, f, attr)
end
write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt)
for a in attributes
write_header_message(cio, f, a, wsession)
end
Expand Down
8 changes: 3 additions & 5 deletions src/inlineunion.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ end

@nospecializeinfer function write_dataset(f::JLDFile, @nospecialize(x::Array), wsession::JLDWriteSession, @nospecialize(compress=f.compress))
T = eltype(x)
if !isa(wsession, JLDWriteSession{Union{}})
offset = get(wsession.h5offset, objectid(x), UNDEFINED_ADDRESS)
offset != UNDEFINED_ADDRESS && return offset
end
offset = get_tracked(wsession, x)
offset != UNDEFINED_ADDRESS && return offset
if T isa Union && writeasbits(T)
# Conversion has to be done earlier here because
# vectors are special cased in dispatch
Expand All @@ -60,7 +58,7 @@ function read_array(f::JLDFile, dataspace::ReadDataspace,
io = f.io
ndims, offset = get_ndims_offset(f, dataspace, attributes)
seek(io, offset)
v = construct_array(io, InlineUnionEl{T1,T2}, Val(Int(ndims)))
v = construct_array(io, InlineUnionEl{T1,T2}, Int(ndims))
n = length(v)
seek(io, layout.data_offset)
if iscompressed(filters)
Expand Down
17 changes: 15 additions & 2 deletions src/types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,21 @@ struct JLDWriteSession{T<:Union{Dict{UInt,RelOffset},Union{}}}
JLDWriteSession{T}(h5offset, objects) where T = new(h5offset, objects)
end
JLDWriteSession() = JLDWriteSession{Dict{UInt,RelOffset}}(Dict{UInt,RelOffset}(), Any[])


track!(s::JLDWriteSession{Union{}}, args...) = nothing
function track!(s::JLDWriteSession, data, offset::RelOffset)
if ismutabletype(typeof(data))
wsession.h5offset[objectid(data)] = h5offset(f, header_offset)
push!(wsession.objects, data)
end
nothing
end
get_tracked(wsession::JLDWriteSession{Union{}}, data) = UNDEFINED_ADDRESS
function get_tracked(wsession::JLDWriteSession, data)
if ismutabletype(typeof(data))
return get(wsession.h5offset, objectid(data), UNDEFINED_ADDRESS)
end
return UNDEFINED_ADDRESS
end
"""
GlobalHeap
Expand Down

0 comments on commit 4db6d39

Please sign in to comment.