From f0646cf1036e6dda742d23f605571823632b67ac Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 23 Jan 2024 10:02:19 +0100 Subject: [PATCH 1/4] fix: ensure order is preserved if streaming from different sources --- .../src/executors/sinks/output/file_sink.rs | 3 +++ crates/polars-pipe/src/executors/sinks/sort/source.rs | 3 ++- crates/polars-pipe/src/executors/sources/csv.rs | 2 +- crates/polars-pipe/src/executors/sources/frame.rs | 10 ++++++++-- crates/polars-pipe/src/executors/sources/mod.rs | 11 +++++++++++ crates/polars-pipe/src/executors/sources/parquet.rs | 3 ++- 6 files changed, 27 insertions(+), 5 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/output/file_sink.rs b/crates/polars-pipe/src/executors/sinks/output/file_sink.rs index a4ee0bb3d07b..33bdd1953d87 100644 --- a/crates/polars-pipe/src/executors/sinks/output/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/output/file_sink.rs @@ -39,6 +39,9 @@ pub(super) fn init_writer_thread( if maintain_order { chunks.sort_by_key(|chunk| chunk.chunk_index); } + dbg!(maintain_order); + let idx = chunks.iter().map(|c| c.chunk_index).collect::>(); + dbg!(idx); for chunk in chunks.iter() { writer._write_batch(&chunk.data).unwrap() diff --git a/crates/polars-pipe/src/executors/sinks/sort/source.rs b/crates/polars-pipe/src/executors/sinks/sort/source.rs index 2c0414f50703..160e70152718 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/source.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/source.rs @@ -7,6 +7,7 @@ use rayon::prelude::*; use crate::executors::sinks::sort::ooc::read_df; use crate::executors::sinks::sort::sink::sort_accumulated; +use crate::executors::sources::get_source_offset; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; pub struct SortSource { @@ -41,7 +42,7 @@ impl SortSource { n_threads, sort_idx, descending, - chunk_offset: 0, + chunk_offset: get_source_offset(), slice, finished: false, } diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 11af352e991a..5c594f58def9 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -112,7 +112,7 @@ impl CsvSource { reader: None, batched_reader: None, n_threads: POOL.current_num_threads(), - chunk_index: 0, + chunk_index: get_source_offset(), path: Some(path), options: Some(options), file_options: Some(file_options), diff --git a/crates/polars-pipe/src/executors/sources/frame.rs b/crates/polars-pipe/src/executors/sources/frame.rs index b8a4ff4bae20..4c0d260c41ce 100644 --- a/crates/polars-pipe/src/executors/sources/frame.rs +++ b/crates/polars-pipe/src/executors/sources/frame.rs @@ -7,11 +7,13 @@ use polars_core::utils::split_df; use polars_core::POOL; use polars_utils::IdxSize; +use crate::executors::sources::get_source_offset; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; pub struct DataFrameSource { dfs: Enumerate>, n_threads: usize, + source_offset: IdxSize, } impl DataFrameSource { @@ -19,7 +21,11 @@ impl DataFrameSource { let n_threads = POOL.current_num_threads(); let dfs = split_df(&mut df, n_threads).unwrap(); let dfs = dfs.into_iter().enumerate(); - Self { dfs, n_threads } + Self { + dfs, + n_threads, + source_offset: get_source_offset(), + } } } @@ -27,7 +33,7 @@ impl Source for DataFrameSource { fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { let chunks = (&mut self.dfs) .map(|(chunk_index, data)| DataChunk { - chunk_index: chunk_index as IdxSize, + chunk_index: chunk_index as IdxSize + self.source_offset, data, }) .take(self.n_threads) diff --git a/crates/polars-pipe/src/executors/sources/mod.rs b/crates/polars-pipe/src/executors/sources/mod.rs index 72878b1911dc..e846945adc87 100644 --- a/crates/polars-pipe/src/executors/sources/mod.rs +++ b/crates/polars-pipe/src/executors/sources/mod.rs @@ -7,6 +7,8 @@ mod parquet; mod reproject; mod union; +use std::sync::atomic::{AtomicU32, Ordering}; + #[cfg(feature = "csv")] pub(crate) use csv::CsvSource; pub(crate) use frame::*; @@ -18,3 +20,12 @@ pub(crate) use union::*; #[cfg(feature = "csv")] use super::*; + +static CHUNK_INDEX: AtomicU32 = AtomicU32::new(0); + +pub(super) fn get_source_offset() -> IdxSize { + // New pipelines are ~1M chunks apart before they have the same count. + // We don't want chunks with the same count from different pipelines are fed + // into the same sink as we cannot determine the order. + (CHUNK_INDEX.fetch_add(1, Ordering::Relaxed).wrapping_shl(20)) as IdxSize +} diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 82c3f1858926..aab5f5e758e8 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -21,6 +21,7 @@ use polars_plan::logical_plan::FileInfo; use polars_plan::prelude::{FileScanOptions, ParquetOptions}; use polars_utils::IdxSize; +use crate::executors::sources::get_source_offset; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; use crate::pipeline::determine_chunk_size; @@ -209,7 +210,7 @@ impl ParquetSource { let mut source = ParquetSource { batched_readers: VecDeque::new(), n_threads, - chunk_index: 0, + chunk_index: get_source_offset(), processed_paths: 0, options, file_options, From 679a209ca224606a4921a5fab6d434677de16b88 Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 23 Jan 2024 10:11:24 +0100 Subject: [PATCH 2/4] clippy --- crates/polars-pipe/src/executors/sinks/output/file_sink.rs | 3 --- crates/polars-pipe/src/executors/sources/mod.rs | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/output/file_sink.rs b/crates/polars-pipe/src/executors/sinks/output/file_sink.rs index 33bdd1953d87..a4ee0bb3d07b 100644 --- a/crates/polars-pipe/src/executors/sinks/output/file_sink.rs +++ b/crates/polars-pipe/src/executors/sinks/output/file_sink.rs @@ -39,9 +39,6 @@ pub(super) fn init_writer_thread( if maintain_order { chunks.sort_by_key(|chunk| chunk.chunk_index); } - dbg!(maintain_order); - let idx = chunks.iter().map(|c| c.chunk_index).collect::>(); - dbg!(idx); for chunk in chunks.iter() { writer._write_batch(&chunk.data).unwrap() diff --git a/crates/polars-pipe/src/executors/sources/mod.rs b/crates/polars-pipe/src/executors/sources/mod.rs index e846945adc87..9d159ecfb3b0 100644 --- a/crates/polars-pipe/src/executors/sources/mod.rs +++ b/crates/polars-pipe/src/executors/sources/mod.rs @@ -15,6 +15,7 @@ pub(crate) use frame::*; pub(crate) use ipc_one_shot::*; #[cfg(feature = "parquet")] pub(crate) use parquet::*; +use polars_utils::IdxSize; pub(crate) use reproject::*; pub(crate) use union::*; From 30dab7a7bd8ec17df68afff9884d2b402cfb6fa7 Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 23 Jan 2024 11:40:59 +0100 Subject: [PATCH 3/4] fix --- .../src/executors/sinks/sort/source.rs | 4 +- .../polars-pipe/src/executors/sources/csv.rs | 25 ++++++------ .../src/executors/sources/frame.rs | 13 +++---- .../polars-pipe/src/executors/sources/mod.rs | 7 +--- .../src/executors/sources/parquet.rs | 39 ++++++++++--------- 5 files changed, 42 insertions(+), 46 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/sort/source.rs b/crates/polars-pipe/src/executors/sinks/sort/source.rs index 160e70152718..4d40efc8a5bb 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/source.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/source.rs @@ -7,7 +7,7 @@ use rayon::prelude::*; use crate::executors::sinks::sort::ooc::read_df; use crate::executors::sinks::sort::sink::sort_accumulated; -use crate::executors::sources::get_source_offset; +use crate::executors::sources::get_source_index; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; pub struct SortSource { @@ -42,7 +42,7 @@ impl SortSource { n_threads, sort_idx, descending, - chunk_offset: get_source_offset(), + chunk_offset: get_source_index(1) as IdxSize, slice, finished: false, } diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 5c594f58def9..d760f91ca633 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -7,6 +7,7 @@ use polars_io::csv::read_impl::{BatchedCsvReaderMmap, BatchedCsvReaderRead}; use polars_io::csv::{CsvEncoding, CsvReader}; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::prelude::{CsvParserOptions, FileScanOptions}; +use polars_utils::iter::EnumerateIdxTrait; use super::*; use crate::pipeline::determine_chunk_size; @@ -19,7 +20,6 @@ pub(crate) struct CsvSource { batched_reader: Option, *mut BatchedCsvReaderRead<'static>>>, n_threads: usize, - chunk_index: IdxSize, path: Option, options: Option, file_options: Option, @@ -112,7 +112,6 @@ impl CsvSource { reader: None, batched_reader: None, n_threads: POOL.current_num_threads(), - chunk_index: get_source_offset(), path: Some(path), options: Some(options), file_options: Some(file_options), @@ -164,19 +163,19 @@ impl Source for CsvSource { }; Ok(match batches { None => SourceResult::Finished, - Some(batches) => SourceResult::GotMoreData( - batches + Some(batches) => { + let index = get_source_index(0); + let out = batches .into_iter() - .map(|data| { - let out = DataChunk { - chunk_index: self.chunk_index, - data, - }; - self.chunk_index += 1; - out + .enumerate_u32() + .map(|(i, data)| DataChunk { + chunk_index: (index + i) as IdxSize, + data, }) - .collect(), - ), + .collect::>(); + get_source_index(out.len() as u32); + SourceResult::GotMoreData(out) + }, }) } fn fmt(&self) -> &str { diff --git a/crates/polars-pipe/src/executors/sources/frame.rs b/crates/polars-pipe/src/executors/sources/frame.rs index 4c0d260c41ce..a990751cf45b 100644 --- a/crates/polars-pipe/src/executors/sources/frame.rs +++ b/crates/polars-pipe/src/executors/sources/frame.rs @@ -7,13 +7,12 @@ use polars_core::utils::split_df; use polars_core::POOL; use polars_utils::IdxSize; -use crate::executors::sources::get_source_offset; +use crate::executors::sources::get_source_index; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; pub struct DataFrameSource { dfs: Enumerate>, n_threads: usize, - source_offset: IdxSize, } impl DataFrameSource { @@ -21,23 +20,21 @@ impl DataFrameSource { let n_threads = POOL.current_num_threads(); let dfs = split_df(&mut df, n_threads).unwrap(); let dfs = dfs.into_iter().enumerate(); - Self { - dfs, - n_threads, - source_offset: get_source_offset(), - } + Self { dfs, n_threads } } } impl Source for DataFrameSource { fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { + let idx_offset = get_source_index(0); let chunks = (&mut self.dfs) .map(|(chunk_index, data)| DataChunk { - chunk_index: chunk_index as IdxSize + self.source_offset, + chunk_index: (chunk_index as u32 + idx_offset) as IdxSize, data, }) .take(self.n_threads) .collect::>(); + get_source_index(chunks.len() as u32); if chunks.is_empty() { Ok(SourceResult::Finished) diff --git a/crates/polars-pipe/src/executors/sources/mod.rs b/crates/polars-pipe/src/executors/sources/mod.rs index 9d159ecfb3b0..1796f39f8ea1 100644 --- a/crates/polars-pipe/src/executors/sources/mod.rs +++ b/crates/polars-pipe/src/executors/sources/mod.rs @@ -24,9 +24,6 @@ use super::*; static CHUNK_INDEX: AtomicU32 = AtomicU32::new(0); -pub(super) fn get_source_offset() -> IdxSize { - // New pipelines are ~1M chunks apart before they have the same count. - // We don't want chunks with the same count from different pipelines are fed - // into the same sink as we cannot determine the order. - (CHUNK_INDEX.fetch_add(1, Ordering::Relaxed).wrapping_shl(20)) as IdxSize +pub(super) fn get_source_index(add: u32) -> u32 { + CHUNK_INDEX.fetch_add(add, Ordering::Relaxed) } diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index aab5f5e758e8..d12791137ca0 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -19,9 +19,10 @@ use polars_io::utils::check_projected_arrow_schema; use polars_io::{is_cloud_url, SerReader}; use polars_plan::logical_plan::FileInfo; use polars_plan::prelude::{FileScanOptions, ParquetOptions}; +use polars_utils::iter::EnumerateIdxTrait; use polars_utils::IdxSize; -use crate::executors::sources::get_source_offset; +use crate::executors::sources::get_source_index; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; use crate::pipeline::determine_chunk_size; @@ -29,7 +30,6 @@ pub struct ParquetSource { batched_readers: VecDeque, n_threads: usize, processed_paths: usize, - chunk_index: IdxSize, iter: Range, paths: Arc<[PathBuf]>, options: ParquetOptions, @@ -210,7 +210,6 @@ impl ParquetSource { let mut source = ParquetSource { batched_readers: VecDeque::new(), n_threads, - chunk_index: get_source_offset(), processed_paths: 0, options, file_options, @@ -290,21 +289,25 @@ impl Source for ParquetSource { return self.get_batches(_context); }, Some(batches) => { - let result = SourceResult::GotMoreData( - batches - .into_iter() - .map(|data| { - // Keep the row limit updated so the next reader will have a correct limit. - if let Some(n_rows) = &mut self.file_options.n_rows { - *n_rows = n_rows.saturating_sub(data.height()) - } - - let chunk_index = self.chunk_index; - self.chunk_index += 1; - DataChunk { chunk_index, data } - }) - .collect(), - ); + let idx_offset = get_source_index(0); + let out = batches + .into_iter() + .enumerate_u32() + .map(|(i, data)| { + // Keep the row limit updated so the next reader will have a correct limit. + if let Some(n_rows) = &mut self.file_options.n_rows { + *n_rows = n_rows.saturating_sub(data.height()) + } + + DataChunk { + chunk_index: (idx_offset + i) as IdxSize, + data, + } + }) + .collect::>(); + get_source_index(out.len() as u32); + + let result = SourceResult::GotMoreData(out); // We are not yet done with this reader. // Ensure it is used in next iteration. self.batched_readers.push_front(reader); From b4682685811a13cf9ed5561f84d6045c7652d3a5 Mon Sep 17 00:00:00 2001 From: ritchie Date: Tue, 23 Jan 2024 11:51:25 +0100 Subject: [PATCH 4/4] elide --- crates/polars-pipe/src/executors/sources/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/polars-pipe/src/executors/sources/mod.rs b/crates/polars-pipe/src/executors/sources/mod.rs index 1796f39f8ea1..8270685f23b6 100644 --- a/crates/polars-pipe/src/executors/sources/mod.rs +++ b/crates/polars-pipe/src/executors/sources/mod.rs @@ -15,7 +15,6 @@ pub(crate) use frame::*; pub(crate) use ipc_one_shot::*; #[cfg(feature = "parquet")] pub(crate) use parquet::*; -use polars_utils::IdxSize; pub(crate) use reproject::*; pub(crate) use union::*;