Skip to content

Commit

Permalink
[BugFix] Fix the problem that Stream/Routine Load memory statistics a…
Browse files Browse the repository at this point in the history
…re not accurate.

Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Jul 18, 2024
1 parent ca5a6db commit 6b8bbff
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 13 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ Status JsonReader::_read_file_stream() {
if (_file_stream_buffer->capacity < _file_stream_buffer->remaining() + simdjson::SIMDJSON_PADDING) {
// For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
// Hence, a re-allocation is needed if the space is not enough.
auto buf = ByteBuffer::allocate(_file_stream_buffer->remaining() + simdjson::SIMDJSON_PADDING);
auto buf = ByteBuffer::allocate_with_tracker(_file_stream_buffer->remaining() + simdjson::SIMDJSON_PADDING);
buf->put_bytes(_file_stream_buffer->ptr, _file_stream_buffer->remaining());
buf->flip();
std::swap(buf, _file_stream_buffer);
Expand Down
9 changes: 5 additions & 4 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
if (ctx->format == TFileFormatType::FORMAT_JSON) {
// Allocate buffer in advance, since the json payload cannot be parsed in stream mode.
// For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
ctx->buffer = ByteBuffer::allocate(ctx->body_bytes + simdjson::SIMDJSON_PADDING);
ctx->buffer = ByteBuffer::allocate_with_tracker(ctx->body_bytes + simdjson::SIMDJSON_PADDING);
}
} else {
#ifndef BE_TEST
Expand Down Expand Up @@ -356,14 +356,15 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
while ((len = evbuffer_get_length(evbuf)) > 0) {
if (ctx->buffer == nullptr) {
// Initialize buffer.
ctx->buffer = ByteBuffer::allocate(
ctx->buffer = ByteBuffer::allocate_with_tracker(
ctx->format == TFileFormatType::FORMAT_JSON ? std::max(len, ctx->kDefaultBufferSize) : len);

} else if (ctx->buffer->remaining() < len) {
if (ctx->format == TFileFormatType::FORMAT_JSON) {
// For json format, we need build a complete json before we push the buffer to the pipe.
// buffer capacity is not enough, so we try to expand the buffer.
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(ctx->buffer->pos + len));
ByteBufferPtr buf =
ByteBuffer::allocate_with_tracker(BitUtil::RoundUpToPowerOfTwo(ctx->buffer->pos + len));
buf->put_bytes(ctx->buffer->ptr, ctx->buffer->pos);
std::swap(buf, ctx->buffer);

Expand All @@ -378,7 +379,7 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
return;
}

ctx->buffer = ByteBuffer::allocate(std::max(len, ctx->kDefaultBufferSize));
ctx->buffer = ByteBuffer::allocate_with_tracker(std::max(len, ctx->kDefaultBufferSize));
}
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
while ((len = evbuffer_get_length(evbuf)) > 0) {
if (ctx->buffer == nullptr) {
// Initialize buffer.
ctx->buffer = ByteBuffer::allocate(
ctx->buffer = ByteBuffer::allocate_with_tracker(
ctx->format == TFileFormatType::FORMAT_JSON ? std::max(len, ctx->kDefaultBufferSize) : len);

} else if (ctx->buffer->remaining() < len) {
Expand All @@ -544,7 +544,7 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
ctx->status = Status::MemoryLimitExceeded(err_msg);
return;
}
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(data_sz));
ByteBufferPtr buf = ByteBuffer::allocate_with_tracker(BitUtil::RoundUpToPowerOfTwo(data_sz));
buf->put_bytes(ctx->buffer->ptr, ctx->buffer->pos);
std::swap(buf, ctx->buffer);

Expand All @@ -559,7 +559,7 @@ void TransactionStreamLoadAction::on_chunk_data(HttpRequest* req) {
return;
}

ctx->buffer = ByteBuffer::allocate(std::max(len, ctx->kDefaultBufferSize));
ctx->buffer = ByteBuffer::allocate_with_tracker(std::max(len, ctx->kDefaultBufferSize));
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/routine_load/kafka_consumer_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class KafkaConsumerPipe : public StreamLoadPipe {

Status append_json(const char* data, size_t size, char row_delimiter) {
// For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
auto buf = ByteBuffer::allocate(size + simdjson::SIMDJSON_PADDING);
auto buf = ByteBuffer::allocate_with_tracker(size + simdjson::SIMDJSON_PADDING);
buf->put_bytes(data, size);
buf->flip();
return append(std::move(buf));
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/stream_load/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
_write_buf = ByteBuffer::allocate(chunk_size);
_write_buf = ByteBuffer::allocate_with_tracker(chunk_size);
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down Expand Up @@ -195,7 +195,7 @@ Status StreamLoadPipe::no_block_read(uint8_t* data, size_t* data_size, bool* eof
// put back the read data to the buf_queue, read the data in the next time
size_t chunk_size = bytes_read;
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
ByteBufferPtr write_buf = ByteBuffer::allocate(chunk_size);
ByteBufferPtr write_buf = ByteBuffer::allocate_with_tracker(chunk_size);
write_buf->put_bytes((char*)data, bytes_read);
write_buf->flip();
// error happens iff pipe is cancelled
Expand Down Expand Up @@ -293,7 +293,7 @@ StatusOr<ByteBufferPtr> CompressedStreamLoadPipeReader::read() {
}

if (_decompressed_buffer == nullptr) {
_decompressed_buffer = ByteBuffer::allocate(buffer_size);
_decompressed_buffer = ByteBuffer::allocate_with_tracker(buffer_size);
}

ASSIGN_OR_RETURN(auto buf, StreamLoadPipeReader::read());
Expand All @@ -316,7 +316,7 @@ StatusOr<ByteBufferPtr> CompressedStreamLoadPipeReader::read() {
while (!stream_end) {
// buffer size grows exponentially
buffer_size = buffer_size < MAX_DECOMPRESS_BUFFER_SIZE ? buffer_size * 2 : MAX_DECOMPRESS_BUFFER_SIZE;
auto piece = ByteBuffer::allocate(buffer_size);
auto piece = ByteBuffer::allocate_with_tracker(buffer_size);
RETURN_IF_ERROR(_decompressor->decompress(
reinterpret_cast<uint8_t*>(buf->ptr) + total_bytes_read, buf->remaining() - total_bytes_read,
&bytes_read, reinterpret_cast<uint8_t*>(piece->ptr), piece->capacity, &bytes_written, &stream_end));
Expand Down
24 changes: 24 additions & 0 deletions be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,42 @@

#include "common/logging.h"
#include "gutil/strings/fastmem.h"
#include "runtime/current_thread.h"
#include "runtime/mem_tracker.h"

namespace starrocks {

struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;

struct MemTrackerDeleter {
MemTrackerDeleter(MemTracker* tracker_) : tracker(tracker_) { DCHECK(tracker_ != nullptr); }
MemTracker* tracker;
template <typename T>
void operator()(T* ptr) {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(tracker);
if (ptr) {
delete ptr;
}
}
};

struct ByteBuffer {
static ByteBufferPtr allocate(size_t size) {
ByteBufferPtr ptr(new ByteBuffer(size));
return ptr;
}

static ByteBufferPtr allocate(size_t size, MemTracker* tracker) {
if (tracker == nullptr) {
return allocate(size);
}
ByteBufferPtr ptr(new ByteBuffer(size), MemTrackerDeleter(tracker));
return ptr;
}

static ByteBufferPtr allocate_with_tracker(size_t size) { return allocate(size, CurrentThread::mem_tracker()); }

static ByteBufferPtr reallocate(const ByteBufferPtr& old_ptr, size_t new_size) {
if (new_size <= old_ptr->capacity) return old_ptr;

Expand Down

0 comments on commit 6b8bbff

Please sign in to comment.