From 051bab36c309be3a9ef09ed5b464cf67fec4ee0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Kr=C3=B6pelin?= <42342396+andreasKroepelin@users.noreply.github.com> Date: Fri, 20 Jan 2023 21:51:44 +0100 Subject: [PATCH] Use internal buffer for vecorized writing and do not close IO after writing (#12) * Use buffer for writing * Allow providing predefined buffer * Finalize transcoding stream without closing underlying io * Add tests for buffered writing * Update src/io.jl Co-authored-by: Seth Axen * Update src/io.jl Co-authored-by: Seth Axen * Update src/io.jl Co-authored-by: Seth Axen * Move @inbounds and @views outwards * Update test/io.jl Co-authored-by: Seth Axen * Rename `unit_vsize` to `buffer_size` * Extend test * Run formatter * Change `unit_vsize` to `buffer_size` in documentation * Use flush instead of closewrite * Add Manifest.toml to .gitignore Co-authored-by: Seth Axen --- .gitignore | 1 + src/io.jl | 58 +++++++++++++++++++++++++++++++++++++++----------- test/header.jl | 4 ++-- test/io.jl | 40 ++++++++++++++++++++++++++++++++-- 4 files changed, 86 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 3af67b1..aec6d51 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ *.jl.cov *.jl.mem /docs/build/ +Manifest.toml \ No newline at end of file diff --git a/src/io.jl b/src/io.jl index 34a4717..d769ba1 100644 --- a/src/io.jl +++ b/src/io.jl @@ -74,8 +74,8 @@ function read_mmap(path::AbstractString, T::Type{MRCData}) end """ - write(io::IO, ::MRCData; compress = :none) - write(fn::AbstractString, ::MRCData; compress = :auto) + write(io::IO, ::MRCData; compress = :none, buffer_size = 4096, buffer = nothing) + write(fn::AbstractString, ::MRCData; compress = :auto, buffer_size = 4096, buffer = nothing) Write an instance of [`MRCData`](@ref) to an IO stream or new file. Use `compress` to specify the compression with the following options: @@ -84,9 +84,22 @@ Use `compress` to specify the compression with the following options: - `:bz2`: BZ2 - `:xz`: XZ - `:none`: no compression + +The parameter `buffer_size` specifies the size (in bytes) of an intermediate +buffer that is used to speed up the writing by utilizing vectorized writes. + +You can also directly provide a preallocated buffer as a `Vector`. +In that case, `buffer_size` has no effect. +Note that `eltype(buffer)` must match the data type of the MRC data. """ write(::Any, ::MRCData) -function Base.write(io::IO, d::MRCData; compress=:none, unit_vsize=4096) +function Base.write( + io::IO, + d::MRCData; + compress=:none, + buffer_size=4096, + buffer::Union{Nothing,Vector}=nothing, +) newio = compressstream(io, compress) h = header(d) sz = write(newio, h) @@ -94,24 +107,43 @@ function Base.write(io::IO, d::MRCData; compress=:none, unit_vsize=4096) T = datatype(h) data = parent(d) fswap = bswapfromh(h.machst) - unit_vsize = div(unit_vsize, sizeof(T)) - vlen = length(data) - vrem = vlen % unit_vsize - if vrem != 0 - @inbounds @views sz += write(newio, fswap.(T.(data[1:vrem]))) + buffer_size = div(buffer_size, sizeof(T)) + if buffer === nothing + buffer = Vector{T}(undef, buffer_size) + elseif !(buffer isa Vector{T}) + throw( + ArgumentError( + "`buffer` must be `nothing` or a `Vector{$T}`, but `$(typeof(buffer))` was provided", + ), + ) end - for i in (vrem + 1):unit_vsize:vlen - @inbounds @views sz += write(newio, fswap.(T.(data[i:(i + unit_vsize - 1)]))) + # If `buffer` was provided as a parameter then `buffer_size` is redundant and + # we must make sure that it matches `buffer`. + buffer_size = length(buffer) + vlen = length(data) + vrem = vlen % buffer_size + @inbounds @views begin + if vrem != 0 + buffer[1:vrem] .= fswap.(T.(data[1:vrem])) + sz += write(newio, buffer[1:vrem]) + end + for i in (vrem + 1):buffer_size:vlen + buffer .= fswap.(T.(data[i:(i + buffer_size - 1)])) + sz += write(newio, buffer) + end end - close(newio) + write(newio, TranscodingStreams.TOKEN_END) + flush(newio) return sz end -function Base.write(fn::AbstractString, object::T; compress=:auto, unit_vsize=4096) where {T<:Union{MRCData}} +function Base.write( + fn::AbstractString, object::T; compress=:auto, kwargs... +) where {T<:Union{MRCData}} if compress == :auto compress = checkextension(fn) end return open(fn; write=true) do io - return write(io, object; compress=compress, unit_vsize=unit_vsize) + return write(io, object; compress=compress, kwargs...) end end diff --git a/test/header.jl b/test/header.jl index 0193613..d222cef 100644 --- a/test/header.jl +++ b/test/header.jl @@ -128,11 +128,11 @@ end @test MRC.entrytobytes(:map, "abc") == [0x61, 0x62, 0x63, 0x00] @test MRC.entrytobytes(:map, "abcde") == [0x61, 0x62, 0x63, 0x64] @test MRC.entrytobytes(:label, ("abcd", ntuple(_ -> "", 9)...))[1:80] == - [0x61; 0x62; 0x63; 0x64; zeros(UInt8, 76)] + [0x61; 0x62; 0x63; 0x64; zeros(UInt8, 76)] @test MRC.entrytobytes(:testfloat, Float32(3)) == reinterpret(UInt8, [Float32(3)]) @test MRC.entrytobytes(:testint, Int32(3)) == reinterpret(UInt8, [Int32(3)]) @test MRC.entrytobytes(:testuintbool, (0x01, 0x02, 0x03, 0x04)) == - [0x01, 0x02, 0x03, 0x04] + [0x01, 0x02, 0x03, 0x04] end @testset "fieldoffsets" begin diff --git a/test/io.jl b/test/io.jl index bdd0307..0fcac91 100644 --- a/test/io.jl +++ b/test/io.jl @@ -43,7 +43,8 @@ @test h.machst === (0x44, 0x41, 0x0, 0x0) @test h.rms === Float32(0.15705723) @test h.nlabl === Int32(1) - @test h.label == ("::::EMDATABANK.org::::EMD-3001::::", "", "", "", "", "", "", "", "", "") + @test h.label == + ("::::EMDATABANK.org::::EMD-3001::::", "", "", "", "", "", "", "", "", "") end @testset "emd3197.map" begin @@ -89,7 +90,42 @@ @test h.machst === (0x44, 0x41, 0x0, 0x0) @test h.rms === Float32(2.399953) @test h.nlabl === Int32(1) - @test h.label == ("::::EMDATABANK.org::::EMD-3197::::", "", "", "", "", "", "", "", "", "") + @test h.label == + ("::::EMDATABANK.org::::EMD-3197::::", "", "", "", "", "", "", "", "", "") end end end + +@testset "write" begin + @testset "buffer size has no effect on data" begin + emd3001 = read("$(@__DIR__)/testdata/emd_3001.map", MRCData) + buffer_sizes = [1024, 2048, 4096, 100, 42] + + for buffer_size in buffer_sizes + @testset "buffer size: $buffer_size" begin + # no preallocation + io = IOBuffer(; read=true, write=true) + write(io, emd3001; buffer_size=buffer_size) + flush(io) + seekstart(io) + @test read(io, MRCData) == emd3001 + close(io) + + # with preallocation + io = IOBuffer(; read=true, write=true) + buffer = Vector{Float32}(undef, buffer_size) + write(io, emd3001; buffer=buffer) + flush(io) + seekstart(io) + @test read(io, MRCData) == emd3001 + close(io) + end + end + end + + @testset "buffer eltype must match data type" begin + emd3001 = read("$(@__DIR__)/testdata/emd_3001.map", MRCData) + buffer = Vector{Float64}(undef, 1) + @test_throws ArgumentError write(IOBuffer(), emd3001; buffer=buffer) + end +end