From 7f19d244771f05cb81e19743c223daa3c0cd2a34 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Wed, 2 Aug 2023 17:50:54 +0800 Subject: [PATCH] add retention example work with parquet file --- examples/retention_parquet.rs | 51 ++++++++++++++++++++++++++++++++++ src/retention/retention_sum.rs | 1 - 2 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 examples/retention_parquet.rs diff --git a/examples/retention_parquet.rs b/examples/retention_parquet.rs new file mode 100644 index 0000000..2dfb0f2 --- /dev/null +++ b/examples/retention_parquet.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use datafusion::arrow::util::pretty::print_batches; +use datafusion::datasource::MemTable; +use datafusion::error::Result; +use datafusion::prelude::*; +use datafusion_uba::retention::{create_retention_count, create_retention_sum}; +use datafusion_uba::test_util; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "event", + format!("{}/event.parquet", test_util::parquet_test_data()).as_str(), + Default::default(), + ) + .await + .unwrap(); + + ctx.register_udaf(create_retention_count()); + ctx.register_udaf(create_retention_sum()); + + let df = ctx + .sql( + "select distinct_id,retention_count(\ + case when xwhat='$startup' then true else false end,\ + case when xwhat='$pageview' then true else false end,\ + 20230107-20230101,\ + ds-20230101 \ + ) as stats \ + from event group by distinct_id order by distinct_id", + ) + .await?; + let results = df.clone().collect().await?; + // print_batches(&results); + + let provider = MemTable::try_new(df.schema().clone().into(), vec![results])?; + ctx.register_table("retention_count_result", Arc::new(provider))?; + + let results = ctx + .sql("select retention_sum(stats) from retention_count_result") + .await? + .collect() + .await?; + + print_batches(&results)?; + + Ok(()) +} diff --git a/src/retention/retention_sum.rs b/src/retention/retention_sum.rs index 32926a5..d781717 100644 --- a/src/retention/retention_sum.rs +++ b/src/retention/retention_sum.rs @@ -131,7 +131,6 @@ impl Accumulator for RetentionSum { fn evaluate(&self) -> Result { let arr_ref = &ScalarValue::iter_to_array(self.total_active.clone()).unwrap(); let mut final_result: Vec> = Vec::new(); - for index in 0..arr_ref.len() { if let ScalarValue::List(Some(per_user), _) = ScalarValue::try_from_array(arr_ref, index)?