Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

target_partitions execution option is ignored when the input has 1 partition #12611

Closed
palaska opened this issue Sep 25, 2024 · 4 comments
Closed
Labels
bug Something isn't working

Comments

@palaska
Copy link
Contributor

palaska commented Sep 25, 2024

Describe the bug

target_partitions execution option is ignored when the input has 1 partition. The introduced condition here is the root cause.

To Reproduce

use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::prelude::col;
use datafusion::functions_aggregate::sum::sum;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::test_util::scan_empty;

let config = SessionConfig::new().with_target_partitions(4);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();

let schema = Schema::new(vec![
    Field::new("a", DataType::Utf8, false),
    Field::new("b", DataType::UInt64, false),
]);

let logical_plan = scan_empty(None, &schema, Some(vec![0, 1]))
    .unwrap()
    .aggregate(vec![col("a")], vec![sum(col("b"))])
    .unwrap()
    .build()
    .unwrap();

let optimized_plan = session_state.optimize(&logical_plan).unwrap();

let plan = session_state
    .create_physical_plan(&optimized_plan)
    .await
    .unwrap();

println!(
    "{}",
    DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);

Above code produces a physical plan with a single output partition and the plan doesn't contain a RepartitionExec.

use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::prelude::col;
use datafusion::functions_aggregate::sum::sum;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::test_util::scan_empty_with_partitions;

let config = SessionConfig::new().with_target_partitions(4);
let ctx = Arc::new(SessionContext::new_with_config(config));
let session_state = ctx.state();

let schema = Schema::new(vec![
    Field::new("a", DataType::Utf8, false),
    Field::new("b", DataType::UInt64, false),
]);

// setting the number of input partitions to 2 instead of 1
let logical_plan = scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), 2)
    .unwrap()
    .aggregate(vec![col("a")], vec![sum(col("b"))])
    .unwrap()
    .build()
    .unwrap();

let optimized_plan = session_state.optimize(&logical_plan).unwrap();

let plan = session_state
    .create_physical_plan(&optimized_plan)
    .await
    .unwrap();

println!(
    "{}",
    DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);

When the input partition number is set to a value > 1, it works as expected. (multi_partitions becomes true here)

Expected behavior

No response

Additional context

No response

@palaska palaska added the bug Something isn't working label Sep 25, 2024
@akurmustafa
Copy link
Contributor

In datafusion, target_partition argument doesn't necessarily increase partition count each time. If DataFusion thinks that executing the query in single partition is better in terms of performance, it will do so even if target_partition number is larger than 1. Do you think, parallelism will improve the performance for this query, if you think so we should definitely increase partition for this query. What is your thoughts in this regard?

In short, setting target_partitions to larger than 1 doesn't necessarily increase partition in datafusion.

@palaska
Copy link
Contributor Author

palaska commented Sep 27, 2024

In datafusion, target_partition argument doesn't necessarily increase partition count each time. If DataFusion thinks that executing the query in single partition is better in terms of performance, it will do so even if target_partition number is larger than 1. Do you think, parallelism will improve the performance for this query, if you think so we should definitely increase partition for this query. What is your thoughts in this regard?

In short, setting target_partitions to larger than 1 doesn't necessarily increase partition in datafusion.

Thanks for the explanation! I agree that optimizing for performance makes sense, as long as it doesn't compromise guarantees or hurt system predictability. In Ballista, a "task" is generated for each partition, and changing this behavior has caused some unit test assertions to fail. However, I don't think this is a major issue for Ballista. I’m not familiar with how this flag is being used in other systems, but @alamb might have some insights to share.

@alamb
Copy link
Contributor

alamb commented Sep 29, 2024

I think the reason it is called "target_partitions" is that is is not a guarantee but instead is a target used for performance optimizations as @akurmustafa mentions

If you need more than one partition you can always modify the plan / set the required input partitions

@palaska
Copy link
Contributor Author

palaska commented Sep 29, 2024

Thanks for the clarifications, closing this one.

@palaska palaska closed this as completed Sep 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants