Skip to content

Commit

Permalink
Faster parquet reading implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Viktor Petukhov committed Dec 10, 2024
1 parent 7e05ffa commit 386e312
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
OrderedCollections = "bac558e1-5e72-5ebc-8fee-abe8a469f55d"
PackageCompiler = "9b87118b-4619-50d2-8e1e-99f35a4d4d9d"
Parquet = "626c502c-15b0-58ad-a749-f091afb673ae"
Parquet2 = "98572fba-bba0-415d-956f-fa77e587d26d"
Pipe = "b98c9c47-44ae-5843-9183-064241ee97a0"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
Expand Down Expand Up @@ -82,7 +83,7 @@ MultivariateStats = "~0.10"
NearestNeighbors = "~0.4"
PackageCompiler = "^2.1.22"
Parquet = "~0.8"
ProgressMeter = "1.7 - 1.10"
ProgressMeter = "1.10" # Required for @showprogress @threads
StaticArrays = "1.5 - 1.9"
StatsBase = "0.33 - 0.34"
UMAP = "^0.1.9"
Expand Down
53 changes: 51 additions & 2 deletions src/data_loading/data.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,57 @@
import StatsBase
using Parquet: read_parquet
@lazy import Parquet2 = "98572fba-bba0-415d-956f-fa77e587d26d"

using ProgressMeter: @showprogress
using Base.Threads

## Internals

function copy_slice!(
target::AbstractVector{T1}, source::AbstractVector{T2} where T2 <: Union{Missing, T1}; start_id::Int
) where T1
for ri in eachindex(source)
target[start_id + ri] = source[ri]
end
end

function read_parquet!(target::Dict{Symbol, Vector}, dataset::Parquet2.Dataset; progress::Bool=true)
start_ids = vcat([0], cumsum(d.nrows for d in dataset));
col_names = collect(keys(target))

@showprogress enabled=progress for i in 1:length(dataset)
si = start_ids[i]
# cdf = DataFrame(dataset[i])
@threads for k in col_names
col = Parquet2.load(dataset[i], String(k))
copy_slice!(target[k], col; start_id=si)
# copy_slice!(data[k], cdf[!, k]; start_id=si)
end
end
end

function read_parquet_fast(
data_path::String; columns::Union{Vector{Symbol}, Nothing}=nothing, progress::Bool=true, normalize_types::Bool=true
)
dataset = Parquet2.Dataset(data_path);
(length(dataset) == 1) && return DataFrame(dataset);

if columns === nothing
columns = propertynames(dataset[1])
end

rdf = DataFrame(dataset[1]);
if normalize_types
normalize_df_types!(rdf)
end

n_rows_total = sum(d.nrows for d in dataset);
data = Dict(k => Vector{eltype(rdf[!, k])}(undef, n_rows_total) for k in columns);

read_parquet!(data, dataset; progress);

return DataFrame(data);
end

function match_gene_names(gene_masks::Vector{String}, gene_names::Vector{String})
matches = Set{String}()
missing_genes = String[]
Expand Down Expand Up @@ -47,7 +96,7 @@ function read_spatial_df(
if ext == "csv"
df_spatial = CSV.read(data_path, DataFrame);
elseif ext == "parquet"
df_spatial = DataFrame(read_parquet(data_path));
df_spatial = read_parquet_fast(data_path);
else
error("Unsupported file format: $ext. Please, provide a CSV or Parquet file")
end
Expand Down

0 comments on commit 386e312

Please sign in to comment.