Skip to content

Commit

Permalink
Implement limit
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 9, 2023
1 parent dbf92e0 commit 15463b3
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 18 deletions.
91 changes: 74 additions & 17 deletions datafusion/core/src/physical_plan/sorts/topk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@

//! TopK: Combination of Sort / LIMIT

use std::{sync::Arc, cmp::Ordering};
use std::{cmp::Ordering, sync::Arc};

use arrow::row::{RowConverter, SortField, OwnedRow, Row};
use arrow_array::{RecordBatch, ArrayRef};
use arrow_schema::{Schema, SchemaRef};
use arrow::{
compute::interleave,
row::{OwnedRow, Row, RowConverter, SortField},
};
use arrow_array::{Array, ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
use datafusion_common::Result;
use datafusion_execution::{
memory_pool::{MemoryConsumer, MemoryReservation},
runtime_env::RuntimeEnv,
};
use datafusion_physical_expr::PhysicalSortExpr;

use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};

/// TopK
///
Expand Down Expand Up @@ -103,7 +106,7 @@ impl TopK {
.register(&runtime.memory_pool);

let expr: Arc<[PhysicalSortExpr]> = expr.into();
let storage = create_storage(k, expr.clone(), schema.as_ref())?;
let storage = create_storage(k, expr.clone(), schema.clone())?;

Ok(Self {
schema,
Expand All @@ -122,15 +125,39 @@ impl TopK {

/// Returns the top k results broken into batch_size record batches
pub fn emit(self) -> Result<SendableRecordBatchStream> {
todo!()
let Self {
schema,
expr,
k,
reservation,
batch_size,
mut storage,
} = self;
let mut batch = storage.emit()?;

// break into record batches as needed
let mut batches = vec![];
loop {
if batch.num_rows() < self.batch_size {
batches.push(Ok(batch));
break;
} else {
batches.push(Ok(batch.slice(0, self.batch_size)));
batch = batch.slice(self.batch_size, batch.num_rows());
}
}
Ok(Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(batches),
)))
}
}

/// Instantiates a specialized [`TopKStorage`] instance based on the exprs
fn create_storage(
k: usize,
expr: Arc<[PhysicalSortExpr]>,
schema: &Schema,
schema: SchemaRef,
) -> Result<Box<dyn TopKStorage + Send>> {
// TODO make specialized primitive versions.

Expand All @@ -143,7 +170,7 @@ trait TopKStorage {
fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>;

/// Returns the top k results in a single batch
fn emit(self) -> Result<RecordBatch>;
fn emit(&mut self) -> Result<RecordBatch>;

/// How many bytes of memory are used
fn size(&self) -> usize;
Expand All @@ -153,7 +180,9 @@ trait TopKStorage {
struct RowStorage {
k: usize,
expr: Arc<[PhysicalSortExpr]>,
// row converter, just for the sort keys
row_converter: RowConverter,
schema: SchemaRef,
/// stores the top k values, in order
top_k: Vec<TopKRow>,
}
Expand All @@ -162,21 +191,27 @@ impl RowStorage {
fn try_new(
k: usize,
expr: Arc<[PhysicalSortExpr]>,
schema: &Schema
schema: SchemaRef,
) -> Result<Self> {
let sort_fields: Vec<_> = expr
.iter()
.map(|e| {
Ok(SortField::new_with_options(
e.expr.data_type(schema)?,
e.expr.data_type(&schema)?,
e.options,
))
})
.collect::<Result<_>>()?;

let row_converter = RowConverter::new(sort_fields)?;

Ok(Self { k, expr, row_converter, top_k: vec![] })
Ok(Self {
k,
expr,
row_converter,
schema,
top_k: vec![],
})
}

/// Adds the specified row to internal storage and self.top_k
Expand Down Expand Up @@ -207,7 +242,6 @@ impl RowStorage {
self.top_k.pop().unwrap();
assert_eq!(self.k, self.top_k.len());
}

}

impl TopKStorage for RowStorage {
Expand Down Expand Up @@ -241,12 +275,35 @@ impl TopKStorage for RowStorage {
Ok(())
}

fn emit(self) -> Result<RecordBatch> {
todo!()
fn emit(&mut self) -> Result<RecordBatch> {
// the idea is we will interleave the values
let indicies: Vec<_> = self
.top_k
.iter()
.enumerate()
.map(|(i, k)| (i, k.index))
.collect();

let num_columns = self.top_k.get(0).expect("k > 0").batch.num_columns();

// build the output columns one at time
let output_columns: Vec<_> = (0..num_columns)
.map(|col| {
let input_arrays: Vec<_> = self
.top_k
.iter()
.map(|k| k.batch.column(col) as &dyn Array)
.collect();
Ok(interleave(&input_arrays, &indicies)?)
})
.collect::<Result<_>>()?;

Ok(RecordBatch::try_new(self.schema.clone(), output_columns)?)
}

fn size(&self) -> usize {
todo!()
// TODO include buffered batches (but don't double count them)
self.row_converter.size()
}
}

Expand All @@ -261,7 +318,7 @@ struct TopKRow {
batch: RecordBatch,
}

impl Eq for TopKRow{}
impl Eq for TopKRow {}

impl PartialOrd for TopKRow {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Expand Down
8 changes: 7 additions & 1 deletion datafusion/core/tests/sqllogictests/test_files/aal.slt
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,11 @@ GlobalLimitExec: skip=0, fetch=5
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true


query error DataFusion error: Arrow error: Invalid argument error: Incorrect number of arrays provided to RowConverter, expected 1 got 13
query TIIIIIIIITRRT
select * from aggregate_test_100 ORDER BY c13 desc limit 5;
----
b 5 -82 22080 1824882165 7373730676428214987 208 34331 3342719438 3330177516592499461 0.82634634 0.409753835253 Ig1QcuKsjHXkproePdERo2w0mYzIqd
d 5 -40 22614 706441268 -7542719935673075327 155 14337 3373581039 11720144131976083864 0.69632107 0.311471253986 C2GT5KVyOPZpgKVl110TyZO0NcJ434
b 1 29 -18218 994303988 5983957848665088916 204 9489 3275293996 14857091259186476033 0.53840446 0.179090351188 AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz
c 2 1 18109 2033001162 -6513304855495910254 25 43062 1491205016 5863949479783605708 0.110830784 0.929409733247 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW
a 1 -85 -15154 1171968280 1919439543497968449 77 52286 774637006 12101411955859039553 0.12285209 0.686439196277 0keZ5G8BffGwgF2RwQD59TFzMStxCB

0 comments on commit 15463b3

Please sign in to comment.