Skip to content

Commit

Permalink
fix: Fix row_index of batched reader (#19465)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 26, 2024
1 parent 48194d0 commit 7e9e784
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,43 @@ pub(crate) fn columns_to_projection(
Ok(prj)
}

#[cfg(debug_assertions)]
fn check_offsets(dfs: &[DataFrame]) {
dfs.windows(2).for_each(|s| {
let a = &s[0].get_columns()[0];
let b = &s[1].get_columns()[0];

let prev = a.get(a.len() - 1).unwrap().extract::<usize>().unwrap();
let next = b.get(0).unwrap().extract::<usize>().unwrap();
assert_eq!(prev + 1, next);
})
}

/// Because of threading every row starts from `0` or from `offset`.
/// We must correct that so that they are monotonically increasing.
#[cfg(any(feature = "csv", feature = "json"))]
pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
if !dfs.is_empty() {
let mut previous = dfs[0].height() as IdxSize + offset;
for df in &mut dfs[1..] {
let mut previous = offset;
for df in &mut *dfs {
if df.is_empty() {
continue;
}
let n_read = df.height() as IdxSize;
if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
*s = &*s + previous;
if let Ok(v) = s.get(0) {
if v.extract::<usize>().unwrap() != previous as usize {
*s = &*s + previous;
}
}
}
previous += n_read;
}
}
#[cfg(debug_assertions)]
{
check_offsets(dfs)
}
}

/// Because of threading every row starts from `0` or from `offset`.
Expand All @@ -95,15 +118,21 @@ pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], offset: IdxSize) {
assert_eq!(dfs.len(), heights.len());
if !dfs.is_empty() {
let mut previous = heights[0] + offset;
for i in 1..dfs.len() {
let mut previous = offset;
for i in 0..dfs.len() {
let df = &mut dfs[i];
let n_read = heights[i];
if df.is_empty() {
continue;
}

if let Some(s) = unsafe { df.get_columns_mut() }.get_mut(0) {
*s = &*s + previous;
if let Ok(v) = s.get(0) {
if v.extract::<usize>().unwrap() != previous as usize {
*s = &*s + previous;
}
}
}

let n_read = heights[i];
previous += n_read;
}
}
Expand Down

0 comments on commit 7e9e784

Please sign in to comment.