Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decrease interface for aggregation #9737

Open
wants to merge 29 commits into
base: master
Choose a base branch
from

Conversation

xzhangxian1008
Copy link
Contributor

@xzhangxian1008 xzhangxian1008 commented Dec 20, 2024

What problem does this PR solve?

Issue Number: ref #7376

Problem Summary:

What is changed and how it works?

In order to support aggregation in window function, we need to add some interfaces for aggregation. In this pr, we add `decrease` interface for aggregation.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

None

@ti-chi-bot ti-chi-bot bot added the release-note-none Denotes a PR that doesn't merit a release note. label Dec 20, 2024
Copy link
Contributor

ti-chi-bot bot commented Dec 20, 2024

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign searise for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Dec 20, 2024
@xzhangxian1008
Copy link
Contributor Author

/cc @windtalker @guo-shaoge

{
this->data(place).sum -= static_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
}
--this->data(place).count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add assert(this->data(place).count >= 0)?

if constexpr (is_add)
nested_func->add(nested_state, nested, i, arena);
else
nested_func->decrease(nested_state, nested, i, arena);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just call nested_func->addOrDecrease<is_add>()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just call nested_func->addOrDecrease<is_add>()?

Because addOrDecrease<>() is not an interface in IAggregateFunction.

if constexpr (is_add)
this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena);
else
this->nested_function->decrease(this->nestedPlace(place), &nested_column, row_num, arena);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Because addOrDecrease<>() is not an interface in IAggregateFunction.

@@ -64,6 +64,16 @@ class AggregateFunctionMerge final : public IAggregateFunctionHelper<AggregateFu
nested_func->merge(place, static_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num], arena);
}

void decrease(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decrease is the same as add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decrease is the same as add?

AggregateFunctionMerge is impossible to be used in window agg. It's just a placeholder to pass the compilation. I will delete it.

if constexpr (is_add)
this->nested_function->add(this->nestedPlace(place), &nested_column, row_num, arena);
else
this->nested_function->decrease(this->nestedPlace(place), &nested_column, row_num, arena);
}
}
}
else
{
this->setFlag(place, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag actually has its meanings, you should maintain the flag for decrease

* It can only be allocated and destroyed.
* MemoryTracker is not used. AlignedBuffer is intended for small pieces of memory.
*/
class AlignedBuffer : private boost::noncopyable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this class is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this class is not used?

It's used in test in this pr. And will also be used in further pr.

@@ -43,9 +45,18 @@ struct SingleValueDataFixed
= false; /// We need to remember if at least one value has been passed. This is necessary for AggregateFunctionIf.
T value;

// It's only used in window aggregation
mutable std::deque<T> * saved_values;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to implement window_min/window_max function which support decrease.

if constexpr (IsDecimal<T>)
this->data(place).sum -= static_cast<const ColumnDecimal<T> &>(*columns[0]).getData()[row_num];
else
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove { }?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove { }?

okk


void AlignedBuffer::alloc(size_t size, size_t alignment)
{
void * new_buf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If alloc() is called two times without calling daealloc() or reset(), there will be memory leak.
Maybe we can add some check to avoid it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If alloc() is called two times without calling daealloc() or reset(), there will be memory leak. Maybe we can add some check to avoid it?

It's impossible to call alloc() two times. Because it's a private function and only be used in constructor and reset().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants