Skip to content

Commit

Permalink
start working on merge
Browse files Browse the repository at this point in the history
  • Loading branch information
olivierlabayle committed Nov 29, 2023
1 parent c59ba13 commit 25d0496
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 139 deletions.
77 changes: 34 additions & 43 deletions src/merge.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,46 @@ end
read_output_with_types(file) =
CSV.read(file, DataFrame, types=Dict(key => String for key in joining_keys()))

function load_csv_files(data, files)
for file in files
new_data = read_output_with_types(file)
if size(new_data, 1) > 0
data = vcat(data, new_data)
end
end
return data
end

joining_keys() = ["PARAMETER_TYPE", "TREATMENTS", "CASE", "CONTROL", "OUTCOME", "CONFOUNDERS", "COVARIATES"]

function merge_csv_files(parsed_args)
tmle_files = files_matching_prefix_and_suffix(
parsed_args["tmle-prefix"],
".csv"

"""
make_summary(prefix; out="summary.json")
# Args
- `prefix`: Prefix to .hdf5 files to be used to create the summary file
# Options
- `-o, --out`: Ouptut JSON file
"""
@task function make_summary(prefix; output=JSONOutput(filename="summary.json"))
dirname_, prefix_ = splitdir(prefix)
dirname__ = dirname_ == "" ? "." : dirname_
files = filter(
x -> startswith(x, prefix_),
readdir(dirname__)
)
# Load tmle data
data = load_csv_files(empty_tmle_output(), tmle_files)
# Load sieve data
sieveprefix = parsed_args["sieve-prefix"]
if sieveprefix !== nothing
sieve_files = files_matching_prefix_and_suffix(
parsed_args["sieve-prefix"],
".csv"
)
sieve_data = load_csv_files(empty_sieve_output(), sieve_files)
if size(sieve_data, 1) > 0
data = leftjoin(data, sieve_data, on=joining_keys(), matchmissing=:equal)
# Initialize JSON output
initialize(output)
# Write all but last batch
for filename in files[1:end-1]
filepath = joinpath(dirname_, filename)
jldopen(filepath) do io
for batch_key in keys(io)
update_file(output, io[batch_key])
end
end
end

# Pvalue Adjustment by Target
for gp in groupby(data, :OUTCOME)
gp.TRAIT_ADJUSTED_TMLE_PVALUE = gp[:, :TMLE_PVALUE]
pvalues = collect(skipmissing(gp.TMLE_PVALUE))
if length(pvalues) > 0
adjusted_pvalues = adjust(pvalues, BenjaminiHochberg())
adjusted_pval_index = 1
for index in eachindex(gp.TRAIT_ADJUSTED_TMLE_PVALUE)
gp.TRAIT_ADJUSTED_TMLE_PVALUE[index] === missing && continue
gp.TRAIT_ADJUSTED_TMLE_PVALUE[index] = adjusted_pvalues[adjusted_pval_index]
adjusted_pval_index += 1
end
# Write last batch
filepath = joinpath(dirname_, files[end])
jldopen(filepath) do io
nkeys = length(keys(io))
for (batch_index, batch_key) in enumerate(keys(io))
finalize = batch_index == nkeys ? true : false
update_file(output, io[batch_key], finalize=finalize)
end
end

# Write to output file
CSV.write(parsed_args["out"], data)

return 0
end
97 changes: 1 addition & 96 deletions test/merge.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,102 +6,7 @@ using CSV
using DataFrames

@testset "Test merge_csv_files, no sieve file" begin
parsed_args = Dict(
"tmle-prefix" => joinpath("data", "merge", "tmle"),
"sieve-prefix" => nothing,
"out" => "output.csv"
)
merge_csv_files(parsed_args)
output = CSV.read(parsed_args["out"], DataFrame)
@test names(output) == [
"PARAMETER_TYPE", "TREATMENTS", "CASE",
"CONTROL", "OUTCOME", "CONFOUNDERS",
"COVARIATES", "INITIAL_ESTIMATE",
"TMLE_ESTIMATE", "TMLE_STD", "TMLE_PVALUE", "TMLE_LWB", "TMLE_UPB",
"ONESTEP_ESTIMATE", "ONESTEP_STD", "ONESTEP_PVALUE", "ONESTEP_LWB", "ONESTEP_UPB",
"LOG", "TRAIT_ADJUSTED_TMLE_PVALUE"
]
@test size(output, 1) == 8

for (pval, adjusted_pval) in zip(output.TMLE_PVALUE, output.TRAIT_ADJUSTED_TMLE_PVALUE)
if pval === missing
@test adjusted_pval === missing
else
@test pval <= adjusted_pval
end
end

@test output.PARAMETER_TYPE == [
"IATE", "IATE", "ATE",
"IATE", "IATE", "ATE",
"ATE", "CM"
]
rm(parsed_args["out"])
end

@testset "Test merge_csv_files, sieve file" begin
sieve_colnames = [
"PARAMETER_TYPE", "TREATMENTS", "CASE",
"CONTROL", "OUTCOME", "CONFOUNDERS",
"COVARIATES", "INITIAL_ESTIMATE",
"TMLE_ESTIMATE", "TMLE_STD", "TMLE_PVALUE", "TMLE_LWB", "TMLE_UPB",
"ONESTEP_ESTIMATE", "ONESTEP_STD", "ONESTEP_PVALUE", "ONESTEP_LWB", "ONESTEP_UPB",
"LOG", "SIEVE_STD", "SIEVE_PVALUE", "SIEVE_LWB", "SIEVE_UPB", "TRAIT_ADJUSTED_TMLE_PVALUE"
]
parsed_args = Dict(
"tmle-prefix" => joinpath("data", "merge", "tmle"),
"sieve-prefix" => joinpath("data", "merge", "sieve"),
"out" => "output.csv"
)
merge_csv_files(parsed_args)
output = CSV.read(parsed_args["out"], DataFrame)
@test names(output) == sieve_colnames
@test size(output, 1) == 8
@test output.SIEVE_STD isa Vector{Float64}
@test output.PARAMETER_TYPE == [
"IATE", "IATE", "ATE",
"IATE", "IATE", "ATE",
"ATE", "CM"
]

parsed_args = Dict(
"tmle-prefix" => joinpath("data", "merge", "tmle"),
"sieve-prefix" => joinpath("data", "merge", "sieve_output_2"),
"out" => "output.csv"
)
merge_csv_files(parsed_args)
output = CSV.read(parsed_args["out"], DataFrame)
@test names(output) == sieve_colnames
@test size(output, 1) == 8
@test all(x===missing for x in output.SIEVE_STD[3:end])

rm(parsed_args["out"])
end

@testset "Test merge_csv_files, empty sieve file" begin
parsed_args = Dict(
"tmle-prefix" => joinpath("data", "merge", "tmle"),
"sieve-prefix" => joinpath("data", "merge", "empty"),
"out" => "output.csv"
)
merge_csv_files(parsed_args)
output = CSV.read(parsed_args["out"], DataFrame)
@test names(output) == [
"PARAMETER_TYPE", "TREATMENTS", "CASE",
"CONTROL", "OUTCOME", "CONFOUNDERS",
"COVARIATES", "INITIAL_ESTIMATE",
"TMLE_ESTIMATE", "TMLE_STD", "TMLE_PVALUE", "TMLE_LWB", "TMLE_UPB",
"ONESTEP_ESTIMATE", "ONESTEP_STD", "ONESTEP_PVALUE", "ONESTEP_LWB", "ONESTEP_UPB",
"LOG", "TRAIT_ADJUSTED_TMLE_PVALUE"
]
@test size(output, 1) == 8
@test output.PARAMETER_TYPE == [
"IATE", "IATE", "ATE",
"IATE", "IATE", "ATE",
"ATE", "CM"
]

rm(parsed_args["out"])
make_summary("tmle_out")
end


Expand Down

0 comments on commit 25d0496

Please sign in to comment.