Skip to content

Commit

Permalink
box async scan methods to avoid large futures on the stack
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Dec 31, 2024
1 parent fe355d6 commit cc08f25
Showing 1 changed file with 56 additions and 47 deletions.
103 changes: 56 additions & 47 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ use datafusion::physical_plan::{
use datafusion::scalar::ScalarValue;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
use futures::future::BoxFuture;
use futures::stream::{Stream, StreamExt};
use futures::TryStreamExt;
use futures::{FutureExt, TryStreamExt};
use lance_arrow::floats::{coerce_float_vector, FloatType};
use lance_arrow::DataTypeExt;
use lance_core::datatypes::{Field, OnMissing, Projection};
Expand Down Expand Up @@ -944,13 +945,17 @@ impl Scanner {

/// Create a stream from the Scanner.
#[instrument(skip_all)]
pub async fn try_into_stream(&self) -> Result<DatasetRecordBatchStream> {
let plan = self.create_plan().await?;

Ok(DatasetRecordBatchStream::new(execute_plan(
plan,
LanceExecutionOptions::default(),
)?))
pub fn try_into_stream(&self) -> BoxFuture<Result<DatasetRecordBatchStream>> {
// Future intentionally boxed here to avoid large futures on the stack
async move {
let plan = self.create_plan().await?;

Ok(DatasetRecordBatchStream::new(execute_plan(
plan,
LanceExecutionOptions::default(),
)?))
}
.boxed()
}

pub(crate) async fn try_into_dfstream(
Expand All @@ -970,46 +975,50 @@ impl Scanner {

/// Scan and return the number of matching rows
#[instrument(skip_all)]
pub async fn count_rows(&self) -> Result<u64> {
let plan = self.create_plan().await?;
// Datafusion interprets COUNT(*) as COUNT(1)
let one = Arc::new(Literal::new(ScalarValue::UInt8(Some(1))));

let input_phy_exprs: &[Arc<dyn PhysicalExpr>] = &[one];
let schema = plan.schema();

let mut builder = AggregateExprBuilder::new(count_udaf(), input_phy_exprs.to_vec());
builder = builder.schema(schema);
builder = builder.alias("count_rows".to_string());

let count_expr = builder.build()?;

let plan_schema = plan.schema();
let count_plan = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new_single(Vec::new()),
vec![count_expr],
vec![None],
plan,
plan_schema,
)?);
let mut stream = execute_plan(count_plan, LanceExecutionOptions::default())?;

// A count plan will always return a single batch with a single row.
if let Some(first_batch) = stream.next().await {
let batch = first_batch?;
let array = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(Error::io(
"Count plan did not return a UInt64Array".to_string(),
location!(),
))?;
Ok(array.value(0) as u64)
} else {
Ok(0)
pub fn count_rows(&self) -> BoxFuture<Result<u64>> {
// Future intentionally boxed here to avoid large futures on the stack
async move {
let plan = self.create_plan().await?;
// Datafusion interprets COUNT(*) as COUNT(1)
let one = Arc::new(Literal::new(ScalarValue::UInt8(Some(1))));

let input_phy_exprs: &[Arc<dyn PhysicalExpr>] = &[one];
let schema = plan.schema();

let mut builder = AggregateExprBuilder::new(count_udaf(), input_phy_exprs.to_vec());
builder = builder.schema(schema);
builder = builder.alias("count_rows".to_string());

let count_expr = builder.build()?;

let plan_schema = plan.schema();
let count_plan = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new_single(Vec::new()),
vec![count_expr],
vec![None],
plan,
plan_schema,
)?);
let mut stream = execute_plan(count_plan, LanceExecutionOptions::default())?;

// A count plan will always return a single batch with a single row.
if let Some(first_batch) = stream.next().await {
let batch = first_batch?;
let array = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or(Error::io(
"Count plan did not return a UInt64Array".to_string(),
location!(),
))?;
Ok(array.value(0) as u64)
} else {
Ok(0)
}
}
.boxed()
}

/// Given a base schema and a list of desired fields figure out which fields, if any, still need loaded
Expand Down

0 comments on commit cc08f25

Please sign in to comment.