Skip to content

Commit

Permalink
Changed memory calculation for TEvFreeItems (merge from main #6625) (#…
Browse files Browse the repository at this point in the history
…6694)

Co-authored-by: mregrock <[email protected]>
Co-authored-by: Egor Kulin <[email protected]>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent 14c79b8 commit dec025f
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions ydb/library/actors/interconnect/event_holder_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NActors {
TStackVec<THolder<IEventBase>, MaxEvents> Events;
TStackVec<THolder<TEventSerializedData>, MaxEvents> Buffers;
std::shared_ptr<std::atomic<TAtomicBase>> Counter;
ui64 NumBytes = 0;
ui64 NumBytes = sizeof(TEvFreeItems);

~TEvFreeItems() {
if (Counter) {
Expand Down Expand Up @@ -76,6 +76,7 @@ namespace NActors {
auto p = GetPendingEvent();
p->NumBytes += event->EventSerializedSize;
auto& events = p->Events;
p->NumBytes += sizeof(*ev);
events.push_back(std::move(ev));
trim = trim || events.size() >= TEvFreeItems::MaxEvents || p->NumBytes >= MaxBytesPerMessage;
}
Expand All @@ -85,19 +86,24 @@ namespace NActors {
auto p = GetPendingEvent();
p->NumBytes += event->EventSerializedSize;
auto& buffers = p->Buffers;
buffers.emplace_back(event->Buffer.Release());
auto&& bufferReleased = event->Buffer.Release();
p->NumBytes += sizeof(*bufferReleased);
buffers.emplace_back(std::move(bufferReleased));
trim = trim || buffers.size() >= TEvFreeItems::MaxEvents || p->NumBytes >= MaxBytesPerMessage;
}

// free event and trim the cache if its size is exceeded
event->Clear();
Cache.splice(Cache.end(), queue, event);
if (Cache.size() >= FreeQueueTrimThreshold) {
auto& freeQueue = GetPendingEvent()->FreeQueue;
auto p = GetPendingEvent();
auto& freeQueue = p->FreeQueue;
auto it = Cache.begin();
std::advance(it, Cache.size() - MaxFreeQueueItems);
size_t addSize = Cache.size() - MaxFreeQueueItems;
std::advance(it, addSize);
freeQueue.splice(freeQueue.end(), Cache, Cache.begin(), it);
trim = true;
p->NumBytes += (sizeof(TEventHolder) + 5 * sizeof(void*)) * addSize;
trim = trim || p->NumBytes >= MaxBytesPerMessage;
}

// release items if we have hit the limit
Expand Down

0 comments on commit dec025f

Please sign in to comment.