Skip to content

Commit

Permalink
update with newer version of polars
Browse files Browse the repository at this point in the history
  • Loading branch information
Boruch Chalk committed Jun 30, 2024
1 parent 0896630 commit 3b8398b
Show file tree
Hide file tree
Showing 13 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ strength_reduce = "0.2"
strum_macros = "0.26"
thiserror = "1"
tokio = "1.26"
tokio-util = "0.7.8"
tokio-stream = "0.1.15"
tokio-util = "0.7.8"
unicode-reverse = "1.0.8"
url = "2.4"
uuid = { version = "1.7.0", features = ["v4"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,4 @@ impl Clone for ExecutionState {
stop: self.stop.clone(),
}
}
}
}
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ async fn prune_row_groups(

let mut df = unsafe { DataFrame::new_no_checks(columns) };

materialize_hive_partitions(&mut df, hive_partition_columns, md.num_rows());
materialize_hive_partitions(&mut df, &schema, hive_partition_columns, md.num_rows());
apply_predicate(&mut df, predicate.as_deref(), false).unwrap();

let row_count = df.height();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ polars-stream = { workspace = true, optional = true }
polars-time = { workspace = true, optional = true }
polars-utils = { workspace = true }

crossbeam-channel = { workspace = true }
ahash = { workspace = true }
bitflags = { workspace = true }
crossbeam-channel = { workspace = true }
glob = { version = "0.3" }
memchr = { workspace = true }
once_cell = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub use ndjson::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use polars_core::prelude::*;
use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_core::POOL;
use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
use polars_ops::frame::JoinCoalesce;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ fn create_physical_plan_impl(
SinkType::Cloud { .. } => {
polars_bail!(InvalidOperation: "cloud sink not supported in standard engine.")
},
SinkType::Batch { .. } => {
SinkType::Batch { .. } => {
polars_bail!(InvalidOperation: "batch sink not supported in the standard engine")
}
},
},
Union { inputs, options } => {
let inputs = inputs
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-pipe/src/executors/sinks/output/batch_sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::any::Any;

use polars_core::prelude::*;
use crossbeam_channel::Sender;
use polars_core::prelude::*;

use crate::operators::{
chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult,
};

#[derive(Clone)]
pub struct BatchSink {
sender: Sender<DataFrame>
sender: Sender<DataFrame>,
}

impl BatchSink {
Expand All @@ -24,7 +24,7 @@ impl Sink for BatchSink {
let result = self.sender.send(df);
match result {
Ok(..) => Ok(SinkResult::CanHaveMoreInput),
Err(..) => Ok(SinkResult::Finished)
Err(..) => Ok(SinkResult::Finished),
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/polars-pipe/src/executors/sinks/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod json;
#[cfg(feature = "parquet")]
mod parquet;

pub use batch_sink::*;
#[cfg(feature = "csv")]
pub use csv::*;
#[cfg(feature = "ipc")]
Expand All @@ -23,5 +24,3 @@ pub use ipc::*;
pub use json::*;
#[cfg(feature = "parquet")]
pub use parquet::*;

pub use batch_sink::*;
2 changes: 1 addition & 1 deletion crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ polars-parquet = { workspace = true, optional = true }
polars-time = { workspace = true, optional = true }
polars-utils = { workspace = true }

crossbeam-channel = { workspace = true }
ahash = { workspace = true }
arrow = { workspace = true }
bytemuck = { workspace = true }
chrono = { workspace = true, optional = true }
chrono-tz = { workspace = true, optional = true }
ciborium = { workspace = true, optional = true }
crossbeam-channel = { workspace = true }
either = { workspace = true }
futures = { workspace = true, optional = true }
hashbrown = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl<'a> IRDisplay<'a> {
Sink { input, payload, .. } => {
let name = match payload {
SinkType::Memory => "SINK (memory)",
SinkType::Batch { .. }=> "SINK (batch)",
SinkType::Batch { .. } => "SINK (batch)",
SinkType::File { .. } => "SINK (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/ir/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl IR {
ExtContext { .. } => "ext_context",
Sink { payload, .. } => match payload {
SinkType::Memory => "sink (memory)",
SinkType::Batch { .. } => "sink (batch)",
SinkType::Batch { .. } => "sink (batch)",
SinkType::File { .. } => "sink (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "sink (cloud)",
Expand Down
12 changes: 5 additions & 7 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::hash::{Hash, Hasher};
#[cfg(feature = "json")]
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::{hash::{Hash, Hasher}, path::PathBuf};

use crossbeam_channel::{bounded, Receiver, Sender};

use crossbeam_channel::{bounded, Sender};
use polars_core::prelude::*;
use polars_core::utils::SuperTypeOptions;
#[cfg(feature = "csv")]
Expand Down Expand Up @@ -237,8 +236,8 @@ pub struct BatchSender {

impl Default for BatchSender {
fn default() -> Self {
let (sender, receiver) = bounded(1);
Self{ id: 0, sender: sender}
let (sender, _receiver) = bounded(1);
Self { id: 0, sender }
}
}

Expand All @@ -248,15 +247,14 @@ impl PartialEq for BatchSender {
}
}

impl Eq for BatchSender{}
impl Eq for BatchSender {}

impl Hash for BatchSender {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state)
}
}


#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SinkType {
Expand Down
2 changes: 2 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,8 @@ def test_parquet_record_batches_pyarrow_fixed_size_list_16614(tmp_path: Path) ->

assert b["x"].shape[0] == n
assert_frame_equal(b, x)


def test_skip_full_load_of_rgs_using_predicate(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capfd: Any
) -> None:
Expand Down

0 comments on commit 3b8398b

Please sign in to comment.