Skip to content

Commit

Permalink
fill columns with nans, preserving original schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Schwarzam committed Jun 3, 2024
1 parent ccbcfbc commit b6ae52b
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/loaders/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ pub async fn process_and_return_parquet_file_lazy(
args.hive_options = HiveOptions{enabled:false, schema: None};

let lf = LazyFrame::scan_parquet(file_path, args).unwrap();

// Retrieve the schema of the LazyFrame
let schema = lf.schema()?;
let all_columns: Vec<(String, DataType)> = schema
.iter_fields()
.map(|field| (field.name().to_string(), field.data_type().clone()))
.collect();

let mut selected_cols = parse_params::parse_columns_from_params(&params).unwrap_or(Vec::new());
selected_cols = parse_params::parse_exclude_columns_from_params(&params, &lf).unwrap_or(selected_cols);

Expand Down Expand Up @@ -57,6 +65,14 @@ pub async fn process_and_return_parquet_file_lazy(
df = lf.drop(["_hipscat_index"]).collect()?;
}

for (col, dtype) in &all_columns {
if !df.get_column_names().contains(&col.as_str()) {
let series = Series::full_null(col, df.height(), &dtype);
df.with_column(series)?;
}
}
df = df.select(&all_columns.iter().map(|(col, _)| col.as_str()).collect::<Vec<_>>())?;

let mut buf = Vec::new();
ParquetWriter::new(&mut buf)
.finish(&mut df)?;
Expand Down

0 comments on commit b6ae52b

Please sign in to comment.