-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sketch for aggregation intermediate results blocked management #11943
base: main
Are you sure you want to change the base?
Sketch for aggregation intermediate results blocked management #11943
Conversation
fdb1789
to
f087efe
Compare
Thank you @Rachelint -- I hope to look at this more carefully tomorrow |
The benchmark after impl blocked version And after we impl blocked version for all
|
What is the difference between blocked approach and Emit::First with block size? At the end, there are only AllBlocks and FirstBlocks? |
} | ||
} | ||
|
||
// emit reduces the memory usage. Ignore Err from update_memory_reservation. Even if it is | ||
// over the target memory size after emission, we can emit again rather than returning Err. | ||
let _ = self.update_memory_reservation(); | ||
let batch = RecordBatch::try_new(schema, output)?; | ||
Ok(batch) | ||
let batches = outputs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let batches = outputs | |
outputs | |
.into_iter() | |
.map(|o| { | |
RecordBatch::try_new(Arc::clone(&schema), o) | |
.map_err(|e| DataFusionError::ArrowError(e, None)) | |
}) | |
.collect::<Result<VecDeque<_>>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such codes may be stale now.
.into_iter() | ||
.map(|o| { | ||
RecordBatch::try_new(Arc::clone(&schema), o) | ||
.map_err(|e| DataFusionError::ArrowError(e, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use macro for error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such codes may be stale now.
@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream { | |||
|
|||
/// scratch space for the current input [`RecordBatch`] being | |||
/// processed. Reused across batches here to avoid reallocations | |||
current_group_indices: Vec<usize>, | |||
current_group_indices: Vec<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason to use u64 instead of usize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason to use u64 instead of usize
I think the clearer u64 is needed when we make the blocked impls,
we need to split the group_idx to two parts:
- high 32bits used to represent the
block id
- low 32bits used to represent the
block offset
So for reusing the same current_group_indices
buffer in both flat
and blocked
mode, I modify all related group_idx
to u64.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission
@jayzhan211 Yes, still not any detailed blocked impls now, and just make benchmark for ensuring the sketch will not decrease the performance. I guess the blocked apporach may be not related much to Actually the blocked approach's main traget is that it manages the data block by block in the |
I think I'm not so familiar with the Emit::First and there is no block implementation done yet. Could we emit every block size of values we have? Something like Emit::First(block size). We have fn emit_early_if_necessary(&mut self) -> Result<()> {
if self.group_values.len() >= self.batch_size
&& matches!(self.group_ordering, GroupOrdering::None)
&& matches!(self.mode, AggregateMode::Partial)
&& self.update_memory_reservation().is_err()
{
let n = self.group_values.len() / self.batch_size * self.batch_size;
let batch = self.emit(EmitTo::First(n), false)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
}
Ok(())
} If we emit every block size we accumulated, is it something similar to the block approach? If not, what is the difference? Upd: One difference I can think of is that in block approach, we have all the accumulated values, and we can optimize it based on all the values we have, while in Emit::First mode, we early emit partial values, therefore, we loss the change if we want to do optimization based on all the values 🤔 ? |
Ok, I think I got it now, if we constantly But
And in others, we need to poll to end, then
And in such cases, blocked approach may be effecient for both memory and cpu as stated above? |
I think maybe we should keep them both, the The When will the blocked mode be enabled maybe can see in: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THank you @Rachelint -- I took a look at this PR and here is some feedback:
-
I think it is important to spend time actually showing this approach makes some queries faster (e.g. we should try and update one accumulator and one implementation of groups to show it makes a difference)
-
I think it is important to actually chunk saving the intermediate state (e.g. in a
Vec<...>
rather than...
to realize the benefit of this chunked approach -
Thank you for working on this. Very cool
@@ -123,7 +151,7 @@ pub trait GroupsAccumulator: Send { | |||
/// future use. The group_indices on subsequent calls to | |||
/// `update_batch` or `merge_batch` will be shifted down by | |||
/// `n`. See [`EmitTo::First`] for more details. | |||
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>; | |||
fn evaluate(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would help to document what expectations are on the Vec of array refs
/// Emit all groups managed by blocks | ||
AllBlocks, | ||
/// Emit only the first `n` group blocks, | ||
/// similar as `First`, but used in blocked `GroupValues` and `GroupAccumulator`. | ||
/// | ||
/// For example, `n=3`, `block size=4`, finally 12 groups will be returned. | ||
FirstBlocks(usize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having two parallel emission modes for blocked output, I wonder if we could have some sort of "take" mode whose semantics did not shift the existing values down
For example, what if we introduced a notion of "block" across the group keys and and aggregators
pub enum EmitTo {
/// Same
All,
/// Same
First(usize),
/// Takes the N'th block of rows from this accumulator
/// it is an error to take the same batch twice or to emit `All` or `First`
/// after any TakeBatch(usize)
TakeBlock(usize)
}
And then we would for example, make sure the group values and aggregators all saved data using blocks of 100K rows
Then to emit 1M rows, the accumulators would emit like
EmitTo::TakeBlock(0)
EmitTo::TakeBlock(1)
EmitTo::TakeBlock(2)
...
EmitTo::TakeBlock(9)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a great idea for reducing code changes(we dont need to refactor the returned emit value for T to Vec).
But with Emit::TakeBlock(idx)
, seems we need to record the current block id in outer, maybe a bit complicated?
we just define the Emit::CurrentBlock
, and use the iterator approach to impl AllBlocks
and FirstBlocks
defined now?
If this makes sense, I can try switch the sketch to this way.
pub enum EmitTo {
/// Same
All,
/// Same
First(usize),
/// Return the current block of rows from this accumulator
/// We should always use this emit mode in blocked mode accumulator.
CurrentBlock,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the iterator approach to impl AllBlocks and FirstBlocks defined now
Not sure how does this work, but it looks like a neat idea. If we apply the same idea to "element" (First and All), and consider it as a specialized case with block_size = 1, I think we could end up a pretty nice abstraction. Probably we just need EmitTo::Block(block_size)
🤔 However, it is too far way from now. 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the iterator approach to impl AllBlocks and FirstBlocks defined now
Not sure how does this work, but it looks like a neat idea. If we apply the same idea to "element" (First and All), and consider it as a specialized case with block_size = 1, I think we could end up a pretty nice abstraction. Probably we just need
EmitTo::Block(block_size)
🤔 However, it is too far way from now. 😆
🤔 Yes, other emit mode can indeed seen as a case with specialized blocke size in the iterator approach. But considered about performance, it is better to let batch_size == block_size
.
After introduce the iterator approach, just 200+ codes to finished the sketch, compared to the stale version sketch with 600+.
The main work is just to add a stream state ExecutionState::ProducingBlocks(blocks)
.
https://github.com/Rachelint/arrow-datafusion/blob/d79d912d1677549c825cafc405911973ace0df46/datafusion/physical-plan/src/aggregates/row_hash.rs#L728
Maybe it can show how the blocked optimzation works.
@@ -68,11 +70,21 @@ where | |||
fn update_batch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to realize the benefit of this blocked implementation I think you will need to change the state of the accumulators so that instead of a single large buffer
/// values per group
values: BooleanBufferBuilder,
The state is held in chunks like
/// blocks of values per group
values: Vec<BooleanBufferBuilder>
(or possibly this to support taking them out individually)
/// blocks of values per group, None when taken
values: Vec<Option<BooleanBufferBuilder>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree.
This sketch originally want to support some combinations like:
- Single GroupValues + single GroupAccumulator
- Blocked GroupValues + single GroupAccumulator
- Blocked GroupValues + blocked GroupAccumulator
But after considering, it may just make the codes so complicated, and maybe can't have obvious improvement in Blocked GroupValues + single GroupAccumulator
mode (constantly slice
call still exist, or some even more expansive operations will be introduced if we impl it without slice
).
@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream { | |||
|
|||
/// scratch space for the current input [`RecordBatch`] being | |||
/// processed. Reused across batches here to avoid reallocations | |||
current_group_indices: Vec<usize>, | |||
current_group_indices: Vec<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission
I still don't understand this Also thanks for pushing this forward, I think this approach is promising for performance |
@2010YOUY01 make sense, it seems But the For example:
Such a groups movement will obviously lead to much cpu cost... Actually we can remove the For example, in But I think if we impl like this, it will be so confused, and for making it clear, I introduce the two new blocked emission mode |
Thanks, I have finished a blocked style |
@2010YOUY01 After checking the codes about memory contorl, I think I got it.
They all serve for the spilling. And the logic may be like this:
|
Thanks, now I figured out the high-level idea of spilling in aggregation and how However there exists other code that does early emit in aggregation, and I'm still trying to figure out how they work, do you have any pointer for that? I'm guessing it's used in streaming aggregation or some pushed-down limits datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs Lines 605 to 611 in 482ef45
|
Yes, you are right, there are two early emission cases, one is for spilling mentioned above, and another here is about streaming. |
076d88e
to
ab92626
Compare
872192b
to
ef91012
Compare
128b5ff
to
5d2ac01
Compare
2a1be53
to
318c650
Compare
Hi @alamb main comments #11943 (review) for this pr have been fixed, minding have a quick look? It would be appreciated. The detail about main progress:
Things planned in next prs:
// current
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>;
// new
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[BlockedGroupIndex],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()>;
|
Sorry for the delay -- I am back now full time and will review this PR over the next few days |
Marking as draft as I don't think this is waiting on review and I am trying to keep the review backlog under control |
Which issue does this PR close?
Part of #11931 , part of #7065
Rationale for this change
Detail can see #11931
Are these changes tested?
By exist tests.
Are there any user-facing changes?
Two functions are added to
GroupValues
andGroupAccumulator
trait.