-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix record batch memory size double counting #13377
Conversation
This PR does indeed fix #10511 😀. I just tested the branch, and the code that crashes in main works perfectly here |
// Count all children `ArrayData` recursively | ||
for child in array_data.child_data() { | ||
count_array_data_memory_size(child, counted_buffers, total_size); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to use #[recursive]
to protect from cases with large nested data types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've learned something new today.
Maybe apache/datafusion-sqlparser-rs#984 can be fixed with this attribute.
But this attribute come with performance overhead 🤔 https://docs.rs/recursive/latest/recursive/ I think stack overflow will happen after 10s of layers of recursion, which is likely for expression but I am not sure arrays can also have such deep nesting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I agree we don't need to annoate all recursive function calls -- only the ones that will become very large/deep
counted_buffers: &mut HashSet<NonNull<u8>>, | ||
total_size: &mut usize, | ||
) { | ||
// Count memory usage for `array_data` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but you can probably add size of array_data.data_type
itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach also missed several other metadata's memory size (like datatype, buffer pointers), they will be included in the more-comprehensive fix in arrow side.
For memory counting in large memory consumer, it's allowed to have certain inaccuracy, as long as major consumption is counted. However I agree this should be better documented.
/// buffer memory size if multiple arrays within the batch are sharing the same | ||
/// `Buffer`. This method provides temporary fix until the issue is resolved: | ||
/// <https://github.com/apache/arrow-rs/issues/6439> | ||
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in TopK, RecordBatchStore
still uses get_array_memory_size
, do you think we should switch to get_record_batch_memory_size
there as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think they should all be changed, however after changing them in TopK
, some existing test cases might be tricky to fix, and more end-to-end tests should be added. So I plan to do it incrementally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool -- can you possibly file a ticket to track any work that you know about? I can help file it / with the explanation as well
|
||
let size = get_record_batch_memory_size(&batch); | ||
assert_eq!(size, 8320); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
Also I believe there are some tools can automatically do similar checks (mutate code and make sure some test case will fail, if don't then there is some issue with test coverage), like https://mutants.rs/
We can investigate how to integrate them into the project 😄
.unwrap(); | ||
|
||
let size = get_record_batch_memory_size(&batch); | ||
assert_eq!(size, 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern with this PR is that the result of get_record_batch_memory_size
differs from get_array_memory_size
. For example, here batch.get_array_memory_size()
would return 252 instead of 60.
This could be dangerous because the project would end up with two different methods of calculating memory sizes. I can imagine a scenario in the future, where we reserve memory based on one calculation method and shrink it using the result from the other. While the difference may not be large each time, over many repetitions or a large dataset, it could behave almost like a memory leak (but without actual memory), making debugging very challenging...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we completely switch to the new method, blocking the usage of the old one? Should we try to make two numbers match closely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great point. I also feel that this manual memory accounting is complex and error-prone. We’d better change all of it. (Maybe also use some RAII in the implementation, instead of manually growing and shrinking memory usage as we’re doing right now.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finding a way to automatically update the memory accounting is certainly a good idea in my mind. As we have mentioned, I think the most important thing will be to find a way to account for arrow buffers completely Then we can work it into DataFusion
Thanks @2010YOUY01 -- will look at this later today or tomorrow |
|
||
// Merge operation needs extra memory to do row conversion, so make the | ||
// memory limit larger. | ||
let mem_limit = partition_size * 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be introduced as DataFusion parameter so the user can configure the memory allocation realm. I got some feeling the mem is data dependent, depending on datatypes/data being processed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really love the documentation, so no need to go through the code.
One thing to mentioned is how fast this method is? as I believe the method will be called frequently
This is a very good point, I think when doing the same fix at arrow side, we should cache the result inside |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 and @blaginin -- this PR makes a lot of sense to me
Filing follow on tickets would be a good idea in my mind
@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size( | |||
Ok(()) | |||
} | |||
|
|||
/// Calculate total used memory of this batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 for this comment
/// buffer memory size if multiple arrays within the batch are sharing the same | ||
/// `Buffer`. This method provides temporary fix until the issue is resolved: | ||
/// <https://github.com/apache/arrow-rs/issues/6439> | ||
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool -- can you possibly file a ticket to track any work that you know about? I can help file it / with the explanation as well
// Count all children `ArrayData` recursively | ||
for child in array_data.child_data() { | ||
count_array_data_memory_size(child, counted_buffers, total_size); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I agree we don't need to annoate all recursive function calls -- only the ones that will become very large/deep
.unwrap(); | ||
|
||
let size = get_record_batch_memory_size(&batch); | ||
assert_eq!(size, 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finding a way to automatically update the memory accounting is certainly a good idea in my mind. As we have mentioned, I think the most important thing will be to find a way to account for arrow buffers completely Then we can work it into DataFusion
let slice1 = original.slice(0, 3); | ||
let slice2 = original.slice(2, 3); | ||
|
||
let batch = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
.unwrap(); | ||
|
||
let size = get_record_batch_memory_size(&batch); | ||
// The size should only count the shared buffer once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good in my mind to change this test so that rather than testing a hard coded size, it would compute the size of a single slice and verify that is the same
that way the test would verify the actual invariant (that the sizes are the same) rather than relying on keeping the two values in sync
Thank you all for the feedbacks! I've updated the followings:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @2010YOUY01
Which issue does this PR close?
First step to fix #13089
Rationale for this change
Now
record_batch.get_array_memory_size()
will overestimate memory usage: If multiple array are pointing to the same underlying buffer, they will be counted repeatedly.A more detailed explanation can be found in this PR's comment:
This function is used for spilled execution to estimate physical memory usage, this overestimation caused many bugs in memory-limited sort/aggregation/join. For example, if there is a
RecordBatch
with 10 columns, all of 10 columns are sharing the sameBuffer
, thenrecord_batch.get_array_memory_size()
will return a 10X estimation, to make memory-limited query fail quite easily.I believe #13089 is caused by this issue, and likely #9417 #10511 #12136 #11390
What changes are included in this PR?
Introduced a new
get_record_batch_memory_size()
to avoid double count, by using a internalHashSet
to recognize reused buffers.While @waynexia is working on a comprehensive solution in
arrow-rs
apache/arrow-rs#6439, I think it's useful to introduce this temporary fix in DataFusion due to:record_batch.get_array_memory_size()
with memory overcounting, it's non trivial to fix all tests at once (manual memory tracking is tricky, when I was trying to make one external aggregate query to run, it took me a while to figure out why one test case fail after a change)record_batch.get_array_memory_size()
, and add regression tests for memory-limited query bugs. After we have a fix in arrow, the temporary fix function can be deprecated and replace with the origin one more easily.Are these changes tested?
Yes
Are there any user-facing changes?
No