Skip to content

Commit

Permalink
add retention sum udaf
Browse files Browse the repository at this point in the history
move retention to sub mod
  • Loading branch information
jiangzhx committed Jul 30, 2023
1 parent b0d7672 commit c8dab31
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 420 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ path = "tests/sqllogictests/src/main.rs"

[[example]]
name = "retention"
path = "examples/retention.rs"
path = "examples/retention.rs"
29 changes: 16 additions & 13 deletions examples/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use datafusion::arrow::util::pretty::print_batches;
use datafusion::arrow::{datatypes::DataType, record_batch::RecordBatch};
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_uba::retention::create_retention_count;
use datafusion_uba::retention::{create_retention_count, create_retention_sum};

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -15,15 +15,18 @@ async fn main() -> Result<()> {
ctx.table("event").await?;

ctx.register_udaf(create_retention_count());
ctx.register_udaf(create_retention_sum());

let results = ctx
.sql(
"select distinct_id,retention_count(\
case when event='add' and ds=20230101 then true else false end,\
case when event='buy' and ds between 20230101 and 20230102 then true else false end,\
20230102-20230101,\
ds-20230101 \
) as stats from event group by distinct_id",
"select retention_sum(stats) from (\
select distinct_id,retention_count(\
case when event='add' then true else false end,\
case when event='buy' then true else false end,\
20230102-20230101,\
ds-20230101 \
) as stats from event group by distinct_id order by distinct_id\
)",
)
.await?
.collect()
Expand All @@ -46,18 +49,18 @@ fn create_context() -> Result<SessionContext> {
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["add", "add", "add"])),
Arc::new(Int32Array::from(vec![20230101, 20230101, 20230101])),
Arc::new(Int32Array::from(vec![1, 1, 1])),
Arc::new(StringArray::from(vec!["add", "add", "buy"])),
Arc::new(Int32Array::from(vec![20230101, 20230102, 20230101])),
],
)?;

let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["buy", "buy", "buy"])),
Arc::new(Int32Array::from(vec![20230101, 20230101, 20230101])),
Arc::new(Int32Array::from(vec![2, 2])),
Arc::new(StringArray::from(vec!["add", "buy"])),
Arc::new(Int32Array::from(vec![20230101, 20230102])),
],
)?;

Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod retention;
pub mod simple_udaf;
180 changes: 0 additions & 180 deletions src/retention.rs

This file was deleted.

Loading

0 comments on commit c8dab31

Please sign in to comment.