From b6ae52b6cd05896691183babcf0fdbe8097f185d Mon Sep 17 00:00:00 2001 From: schwarzam Date: Mon, 3 Jun 2024 18:31:17 +0000 Subject: [PATCH] fill columns with nans, preserving original schema --- src/loaders/parquet.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/loaders/parquet.rs b/src/loaders/parquet.rs index 6527498..c6831ef 100644 --- a/src/loaders/parquet.rs +++ b/src/loaders/parquet.rs @@ -16,6 +16,14 @@ pub async fn process_and_return_parquet_file_lazy( args.hive_options = HiveOptions{enabled:false, schema: None}; let lf = LazyFrame::scan_parquet(file_path, args).unwrap(); + + // Retrieve the schema of the LazyFrame + let schema = lf.schema()?; + let all_columns: Vec<(String, DataType)> = schema + .iter_fields() + .map(|field| (field.name().to_string(), field.data_type().clone())) + .collect(); + let mut selected_cols = parse_params::parse_columns_from_params(¶ms).unwrap_or(Vec::new()); selected_cols = parse_params::parse_exclude_columns_from_params(¶ms, &lf).unwrap_or(selected_cols); @@ -57,6 +65,14 @@ pub async fn process_and_return_parquet_file_lazy( df = lf.drop(["_hipscat_index"]).collect()?; } + for (col, dtype) in &all_columns { + if !df.get_column_names().contains(&col.as_str()) { + let series = Series::full_null(col, df.height(), &dtype); + df.with_column(series)?; + } + } + df = df.select(&all_columns.iter().map(|(col, _)| col.as_str()).collect::>())?; + let mut buf = Vec::new(); ParquetWriter::new(&mut buf) .finish(&mut df)?;