diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 6fcfbf0509..41a0c824a9 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -582,8 +582,12 @@ steps: command: julia --project=calibration/test calibration/test/interface.jl - label: "end to end test" command: julia --project=calibration/test calibration/test/e2e_test.jl + agents: + slurm_ntasks: 10 + slurm_cpus_per_task: 1 + slurm_mem: 96GB + slurm_time: "00:10:00" artifact_paths: "calibration_end_to_end_test/*" - soft_fail: true - group: "Diagnostic EDMFX" steps: diff --git a/.github/workflows/calibration_test.yml b/.github/workflows/calibration_test.yml deleted file mode 100644 index 2afece5204..0000000000 --- a/.github/workflows/calibration_test.yml +++ /dev/null @@ -1,31 +0,0 @@ -name: Calibration -on: - push: - tags: '*' - pull_request: - merge_group: - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -# Needed to allow julia-actions/cache to delete old caches that it has created -permissions: - actions: write - contents: read - -jobs: - test: - name: ClimaCalibrate E2E Test - runs-on: ubuntu-latest - timeout-minutes: 30 - steps: - - uses: julia-actions/cache@v2 - - uses: julia-actions/julia-buildpkg@v1 - - uses: actions/checkout@v4 - - uses: julia-actions/setup-julia@v2 - with: - version: '1.10' - - run: | - julia --project=calibration/test -e 'using Pkg; Pkg.develop(;path="."); Pkg.instantiate(;verbose=true)' - julia --project=calibration/test calibration/test/e2e_test.jl diff --git a/.gitignore b/.gitignore index ed7ca00073..e9b2aca027 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,7 @@ Artifacts.toml *.g *.lock *.loc +*.out # misc .DS_Store diff --git a/calibration/model_interface.jl b/calibration/model_interface.jl index 8ded249837..d239308ea4 100644 --- a/calibration/model_interface.jl +++ b/calibration/model_interface.jl @@ -10,39 +10,15 @@ import YAML import ClimaComms ClimaComms.@import_required_backends using ClimaUtilities.ClimaArtifacts -import ClimaCalibrate: - set_up_forward_model, - run_forward_model, - path_to_ensemble_member, - ExperimentConfig - -""" - set_up_forward_model(member, iteration, experiment_dir::AbstractString) - set_up_forward_model(member, iteration, ::ExperimentConfig; experiment_dir) - set_up_forward_model(member, iteration, config_dict::AbstractDict) - -Return an AtmosConfig object for the given member and iteration. - -Turns off default diagnostics and sets the TOML parameter file to the member's path. -This assumes that the config dictionary has an `output_dir` key. -""" -function set_up_forward_model( - member, - iteration, - ::ExperimentConfig; - experiment_dir = dirname(Base.active_project()), -) - # Assume experiment_dir is project dir - config_dict = YAML.load_file(joinpath(experiment_dir, "model_config.yml")) - set_up_forward_model(member, iteration, config_dict::AbstractDict) -end - -function set_up_forward_model(member, iteration, experiment_dir::AbstractString) - config_dict = YAML.load_file(joinpath(experiment_dir, "model_config.yml")) - set_up_forward_model(member, iteration, config_dict::AbstractDict) -end - -function set_up_forward_model(member, iteration, config_dict::AbstractDict) +import ClimaCalibrate: forward_model, path_to_ensemble_member +import ClimaCalibrate as CAL + +function CAL.forward_model(iteration, member, config_dict = nothing) + experiment_dir = dirname(Base.active_project()) + if isnothing(config_dict) + config_dict = + YAML.load_file(joinpath(experiment_dir, "model_config.yml")) + end output_dir = config_dict["output_dir"] member_path = path_to_ensemble_member(output_dir, iteration, member) config_dict["output_dir"] = member_path @@ -55,16 +31,10 @@ function set_up_forward_model(member, iteration, config_dict::AbstractDict) # Turn off default diagnostics config_dict["output_default_diagnostics"] = false - return CA.AtmosConfig(config_dict) -end - -""" - run_forward_model(atmos_config::CA.AtmosConfig) - -Run the atmosphere model with the given an AtmosConfig object. -Currently only has basic error handling. -""" -function run_forward_model(atmos_config::CA.AtmosConfig) + atmos_config = CA.AtmosConfig( + config_dict; + comms_ctx = ClimaComms.SingletonCommsContext(), + ) simulation = CA.get_simulation(atmos_config) sol_res = CA.solve_atmos!(simulation) if sol_res.ret_code == :simulation_crashed @@ -72,4 +42,5 @@ function run_forward_model(atmos_config::CA.AtmosConfig) "The ClimaAtmos simulation has crashed. See the stack trace for details.", ) end + return simulation end diff --git a/calibration/test/Project.toml b/calibration/test/Project.toml index f76461534a..71312f9ad8 100644 --- a/calibration/test/Project.toml +++ b/calibration/test/Project.toml @@ -5,6 +5,7 @@ ClimaAtmos = "b2c96348-7fb7-4fe0-8da9-78d88439e717" ClimaCalibrate = "4347a170-ebd6-470c-89d3-5c705c0cacc2" ClimaComms = "3a4d1b5c-c61d-41fd-a00a-5873ba7a1b0d" ClimaUtilities = "b3f4f4ca-9299-4f7f-bd9b-81e1242a7513" +ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f" EnsembleKalmanProcesses = "aa8a2aa5-91d8-4396-bcef-d4f2ec43552d" JLD2 = "033835bb-8acc-5ee8-8aae-3f567f8a3819" @@ -12,4 +13,4 @@ MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6" [compat] -ClimaCalibrate = "0.0.2 - 0.0.4" +ClimaCalibrate = "0.0.6" diff --git a/calibration/test/e2e_test.jl b/calibration/test/e2e_test.jl index 7ea3223e39..d9255bb5cb 100644 --- a/calibration/test/e2e_test.jl +++ b/calibration/test/e2e_test.jl @@ -2,66 +2,20 @@ Runs a perfect model calibration, calibrating on the parameter `astronomical_unit` with top-of-atmosphere radiative shortwave flux in the loss function. -The calibration is run twice, once on the backend obtained via `get_backend()` -and once on the `JuliaBackend`. The output of each calibration is tested individually -and compared to ensure reproducibility. +Currently uses ClimaCalibrate.SlurmManager, which integrates with Distributed.jl's workers. + +`addprocs(CAL.SlurmManager(10))` starts up an `srun` session consisting of +10 Julia workers with TCP connections to the host process. +`calibrate(CAL.WorkerBackend, ...)` uses `remotecall` to execute the `forward_model` +function (found in `calibration/model_interface.jl`) on the remote workers. + +Further documentation can be found at https://clima.github.io/ClimaCalibrate.jl/dev/backends/ =# +using Distributed import ClimaCalibrate as CAL -import ClimaAtmos as CA +using ClimaCalibrate import ClimaAnalysis: SimDir, get, slice, average_xy -import CairoMakie -import JLD2 -import LinearAlgebra: I -import EnsembleKalmanProcesses as EKP -import Statistics: var, mean -using Test - -using Dates -# Debug plots -function scatter_plot(eki::EKP.EnsembleKalmanProcess) - f = CairoMakie.Figure(resolution = (800, 600)) - ax = CairoMakie.Axis( - f[1, 1], - ylabel = "Parameter Value", - xlabel = "Top of atmosphere radiative SW flux", - ) - - g = vec.(EKP.get_g(eki; return_array = true)) - params = vec.((EKP.get_ϕ(prior, eki))) - - for (gg, uu) in zip(g, params) - CairoMakie.scatter!(ax, gg, uu) - end - - CairoMakie.hlines!(ax, [astronomical_unit], linestyle = :dash) - CairoMakie.vlines!(ax, observations, linestyle = :dash) - - output = joinpath(output_dir, "scatter.png") - CairoMakie.save(output, f) - return output -end - -function param_versus_iter_plot(eki::EKP.EnsembleKalmanProcess) - f = CairoMakie.Figure(resolution = (800, 600)) - ax = CairoMakie.Axis( - f[1, 1], - ylabel = "Parameter Value", - xlabel = "Iteration", - ) - params = EKP.get_ϕ(prior, eki) - for (i, param) in enumerate(params) - CairoMakie.scatter!(ax, fill(i, length(param)), vec(param)) - end - - CairoMakie.hlines!(ax, [astronomical_unit]; color = :red, linestyle = :dash) - - output = joinpath(output_dir, "param_vs_iter.png") - CairoMakie.save(output, f) - return output -end - -# Observation map function CAL.observation_map(iteration) single_member_dims = (1,) G_ensemble = Array{Float64}(undef, single_member_dims..., ensemble_size) @@ -86,113 +40,114 @@ function process_member_data(simdir::SimDir) return slice(average_xy(rsut); time = 30).data end -# EKI test -function minimal_eki_test(eki) - params = EKP.get_ϕ(prior, eki) - spread = map(var, params) - - # Spread should be heavily decreased as particles have converged - @test last(spread) / first(spread) < 0.1 - # Parameter should be close to true value - @test mean(last(params)) ≈ astronomical_unit rtol = 0.02 -end - -# Script: - -if !(@isdefined backend) - backend = CAL.get_backend() -end -# Check that the wait time for the last hour does not exceed 20 minutes. -# This test schedules many slurm jobs and will be prohibitively slow if the cluster is busy -if backend <: CAL.HPCBackend - wait_times = readchomp( - `sacct --allocations -u esmbuild --starttime now-1hour -o Submit,Start -n`, - ) - wait_times = split(wait_times, '\n', keepempty = false) - # Filter jobs that have not been submitted and started - filter!(x -> !(contains(x, "Unknown") || contains(x, "None")), wait_times) - - mean_wait_time_in_mins = - mapreduce(+, wait_times; init = 0) do line - t1_str, t2_str = split(line) - t1 = DateTime(t1_str, dateformat"yyyy-mm-ddTHH:MM:SS") - t2 = DateTime(t2_str, dateformat"yyyy-mm-ddTHH:MM:SS") - Dates.value(t2 - t1) / 1000 / 60 - end / length(wait_times) - - @show mean_wait_time_in_mins - - if mean_wait_time_in_mins > 10 - @warn """Average wait time for esmbuild is $(round(mean_wait_time_in_mins, digits=2)) minutes. \ - Cluster is too busy to run this test, exiting""" - exit() - end +addprocs(CAL.SlurmManager(10)) + +@everywhere begin + import ClimaCalibrate as CAL + import ClimaAtmos as CA + import JLD2 + import EnsembleKalmanProcesses: + I, ParameterDistributions.constrained_gaussian + + experiment_dir = joinpath(pkgdir(CA), "calibration", "test") + model_interface = joinpath(pkgdir(CA), "calibration", "model_interface.jl") + output_dir = "calibration_end_to_end_test" + include(model_interface) + + # Experiment Configuration + ensemble_size = 50 + n_iterations = 10 + astronomical_unit = 149_597_870_000 + noise = 0.1 * I + prior = constrained_gaussian("astronomical_unit", 6e10, 1e11, 2e5, Inf) + obs_path = joinpath(experiment_dir, "observations.jld2") end -# Paths and setup -const experiment_dir = joinpath(pkgdir(CA), "calibration", "test") -const model_interface = - joinpath(pkgdir(CA), "calibration", "model_interface.jl") -const output_dir = "calibration_end_to_end_test" -include(model_interface) -ensemble_size = 15 - -# Generate observations -obs_path = joinpath(experiment_dir, "observations.jld2") +# Generate observations if needed if !isfile(obs_path) + import JLD2 @info "Generating observations" - config = CA.AtmosConfig(joinpath(experiment_dir, "model_config.yml")) - simulation = CA.get_simulation(config) - CA.solve_atmos!(simulation) + @everywhere begin + comms_ctx = ClimaComms.SingletonCommsContext() + model_config = joinpath(experiment_dir, "model_config.yml") + atmos_config = CA.AtmosConfig(model_config; comms_ctx) + simulation = CA.get_simulation(atmos_config) + CA.solve_atmos!(simulation) + end observations = Vector{Float64}(undef, 1) observations .= process_member_data(SimDir(simulation.output_dir)) JLD2.save_object(obs_path, observations) end # Initialize experiment data -astronomical_unit = 149_597_870_000 -observations = JLD2.load_object(obs_path) -noise = 0.1 * I -n_iterations = 4 -prior = CAL.get_prior(joinpath(experiment_dir, "prior.toml")) -experiment_config = CAL.ExperimentConfig(; - n_iterations, +@everywhere observations = JLD2.load_object(obs_path) + +eki = CAL.calibrate( + CAL.WorkerBackend, ensemble_size, + n_iterations, observations, noise, - output_dir, prior, + output_dir, ) -@info "Running calibration E2E test" backend -if backend <: CAL.HPCBackend - test_eki = CAL.calibrate( - backend, - experiment_config; - hpc_kwargs = CAL.kwargs(time = 15), - model_interface, - verbose = true, +# TODO: Enable `calibrate` to checkpoint, rerunning from midway through calibration +# Postprocessing +import EnsembleKalmanProcesses as EKP +import Statistics: var, mean +using Test +import CairoMakie + +function scatter_plot(eki::EKP.EnsembleKalmanProcess) + f = CairoMakie.Figure(resolution = (800, 600)) + ax = CairoMakie.Axis( + f[1, 1], + ylabel = "Parameter Value", + xlabel = "Top of atmosphere radiative SW flux", ) -else - test_eki = CAL.calibrate(backend, experiment_config) -end -scatter_plot(test_eki) -param_versus_iter_plot(test_eki) + g = vec.(EKP.get_g(eki; return_array = true)) + params = vec.((EKP.get_ϕ(prior, eki))) -@testset "Test Calibration on $backend" begin - minimal_eki_test(test_eki) -end + for (gg, uu) in zip(g, params) + CairoMakie.scatter!(ax, gg, uu) + end -# Run calibration -julia_eki = CAL.calibrate(CAL.JuliaBackend, experiment_config) + CairoMakie.hlines!(ax, [astronomical_unit], linestyle = :dash) + CairoMakie.vlines!(ax, observations, linestyle = :dash) -@testset "Julia-only comparison calibration" begin - minimal_eki_test(julia_eki) + output = joinpath(output_dir, "scatter.png") + CairoMakie.save(output, f) + return output end -@testset "Compare $backend output to JuliaBackend" begin - for (uu, slurm_uu) in zip(EKP.get_u(julia_eki), EKP.get_u(test_eki)) - @test uu ≈ slurm_uu rtol = 0.02 +function param_versus_iter_plot(eki::EKP.EnsembleKalmanProcess) + f = CairoMakie.Figure(resolution = (800, 600)) + ax = CairoMakie.Axis( + f[1, 1], + ylabel = "Parameter Value", + xlabel = "Iteration", + ) + params = EKP.get_ϕ(prior, eki) + for (i, param) in enumerate(params) + CairoMakie.scatter!(ax, fill(i, length(param)), vec(param)) end + + CairoMakie.hlines!(ax, [astronomical_unit]; color = :red, linestyle = :dash) + + output = joinpath(output_dir, "param_vs_iter.png") + CairoMakie.save(output, f) + return output end + +scatter_plot(eki) +param_versus_iter_plot(eki) + +params = EKP.get_ϕ(prior, eki) +spread = map(var, params) + +# Spread should be heavily decreased as particles have converged +@test last(spread) / first(spread) < 0.1 +# Parameter should be close to true value +@test mean(last(params)) ≈ astronomical_unit rtol = 0.02 diff --git a/calibration/test/interface.jl b/calibration/test/interface.jl index 1f4ebbff1d..7d118c4790 100644 --- a/calibration/test/interface.jl +++ b/calibration/test/interface.jl @@ -14,20 +14,28 @@ mktempdir() do output_dir mkpath(member_path) # We need to check that a "base" TOML is support alongside the TOML sampled from the prior base_toml_file = touch(joinpath(output_dir, "default_parameters.toml")) - sampled_toml_file = touch(joinpath(member_path, "parameters.toml")) + # Write our own parameter file + open(joinpath(member_path, "parameters.toml"), "w") do file + toml_contents = """ + [gravitational_acceleration] + value = 10.0 + """ + write(file, toml_contents) + end config_dict = Dict( "output_default_diagnostics" => true, "moist" => "equil", "toml" => [base_toml_file], "output_dir" => output_dir, + "t_end" => "600secs", ) - (; parsed_args) = - ClimaCalibrate.set_up_forward_model(member, iter, config_dict) + simulation = ClimaCalibrate.forward_model(iter, member, config_dict) @testset "Atmos Configuration" begin - @test parsed_args["moist"] == "equil" - @test parsed_args["output_dir"] == member_path - @test parsed_args["output_default_diagnostics"] == false - @test parsed_args["toml"] == [base_toml_file, sampled_toml_file] + @test simulation.t_end == 600 + @test simulation.output_dir == joinpath(member_path, "output_0000") + @test simulation.integrator.p.atmos.moisture_model == + CA.EquilMoistModel() + @test simulation.integrator.p.params.rrtmgp_params.grav == 10.0 end end diff --git a/calibration/test/prior.toml b/calibration/test/prior.toml deleted file mode 100644 index fd3822c661..0000000000 --- a/calibration/test/prior.toml +++ /dev/null @@ -1,3 +0,0 @@ -[astronomical_unit] -prior = "constrained_gaussian(astronomical_unit, 60_000_000_000, 100_000_000_000, 200_000, Inf)" -type = "float"