Skip to content

Commit

Permalink
ruby: Implement a buffer pool
Browse files Browse the repository at this point in the history
The trilogy client eagerly allocate a 32kiB buffer, and grows
it as needed. It's never freed not shrunk until the connection
is closed. Since by default the MySQL `max_allowed_packet` is
16MiB, long living connections will progressively grow to that
size.

For basic usage it's not a big deal, but some applications may
have dozens if not hundreds of connections that are mostly idle.

A common case being multi-tenant applications with horizontal
sharding. In such cases you only ever query one database but
have open connections to many databases.

This situation might lead to a lot of memory retained by trilogy
connections and never really released, looking very much like a
memory leak.

This can be reproduced with a simple script:

```ruby
require 'trilogy'
connection_pool = []

50.times do
  t = Trilogy.new(database: "test")
  t.query("select '#{"a" * 16_000_000}' as a")
  connection_pool << t
end

puts "#{`ps -o rss= -p #{$$}`} kiB"
```

```
$ ruby /tmp/trilogy-leak.rb
927120 kiB
```

If we instead take over the buffer lifetime management, we
can implement some pooling for the buffers, we can limit the total
number of buffer to as many connections are actually in use
concurrently.

The same reproduction script with the current branch:

```
$ ruby -Ilib:ext /tmp/trilogy-leak.rb
108144 kiB
```
  • Loading branch information
byroot committed Dec 14, 2024
1 parent 857c167 commit c080567
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 12 deletions.
232 changes: 229 additions & 3 deletions contrib/ruby/ext/trilogy-ruby/cext.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,161 @@

#include "trilogy-ruby.h"

typedef struct _buffer_pool_entry_struct {
size_t cap;
uint8_t *buff;
} buffer_pool_entry;

typedef struct _buffer_pool_struct {
size_t capa;
size_t len;
buffer_pool_entry *entries;
} buffer_pool;

#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY
#include <ruby/atomic.h>
static rb_atomic_t buffer_pool_max_size = 8;
#else
static unsigned int buffer_pool_max_size = 8;
static VALUE _global_buffer_pool = Qnil;
#endif

static void buffer_pool_free(void *data)
{
#ifndef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY
_global_buffer_pool = Qnil;
#endif

buffer_pool *pool = (buffer_pool *)data;
if (pool->capa) {
for (size_t index = 0; index < pool->len; index++) {
xfree(pool->entries[index].buff);
}
xfree(pool->entries);
}
xfree(pool);
}

static size_t buffer_pool_memsize(const void *data)
{
const buffer_pool *pool = (const buffer_pool *)data;

size_t memsize = sizeof(buffer_pool) + sizeof(buffer_pool_entry) * pool->capa;

if (pool->capa) {
for (size_t index = 0; index < pool->len; index++) {
memsize += pool->entries[index].cap;
}
}

return memsize;
}

static const rb_data_type_t buffer_pool_type = {
.wrap_struct_name = "trilogy/buffer_pool",
.function = {
.dmark = NULL,
.dfree = buffer_pool_free,
.dsize = buffer_pool_memsize,
},
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
};

static VALUE create_rb_buffer_pool(void)
{
buffer_pool *pool;
return TypedData_Make_Struct(Qfalse, buffer_pool, &buffer_pool_type, pool);
}

#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY
#include <ruby/ractor.h>
static rb_ractor_local_key_t buffer_pool_key;

static VALUE get_rb_buffer_pool(bool create)
{
VALUE pool;
if (!rb_ractor_local_storage_value_lookup(buffer_pool_key, &pool) && create) {
pool = create_rb_buffer_pool();
rb_ractor_local_storage_value_set(buffer_pool_key, pool);
}
return pool;
}
#else
static VALUE get_rb_buffer_pool(bool create)
{
if (NIL_P(_global_buffer_pool) && create) {
_global_buffer_pool = create_rb_buffer_pool();
}
return _global_buffer_pool;
}
#endif

static inline buffer_pool *get_buffer_pool(bool create)
{
buffer_pool *pool;
VALUE rb_pool = get_rb_buffer_pool(create);
if (NIL_P(rb_pool)) {
return NULL;
}
TypedData_Get_Struct(rb_pool, buffer_pool, &buffer_pool_type, pool);
return pool;
}

static void buffer_checkout(trilogy_buffer_t *buffer, size_t initial_capacity)
{
buffer_pool * pool = get_buffer_pool(true);
if (pool->len) {
pool->len--;
buffer->buff = pool->entries[pool->len].buff;
buffer->cap = pool->entries[pool->len].cap;
} else {
buffer->buff = RB_ALLOC_N(uint8_t, initial_capacity);
buffer->cap = initial_capacity;
}
}

static bool buffer_checkin(trilogy_buffer_t *buffer)
{
buffer_pool * pool = get_buffer_pool(true);

if (pool->len >= buffer_pool_max_size) {
xfree(buffer->buff);
buffer->buff = NULL;
buffer->cap = 0;
return false;
}

if (!pool->capa) {
pool->entries = RB_ALLOC_N(buffer_pool_entry, 16);
pool->capa = 16;
} else if (pool->len >= pool->capa) {
pool->capa *= 2;
RB_REALLOC_N(pool->entries, buffer_pool_entry, pool->capa);
}

pool->entries[pool->len].buff = buffer->buff;
pool->entries[pool->len].cap = buffer->cap;
pool->len++;

buffer->buff = NULL;
buffer->cap = 0;

return true;
}

static bool buffer_checkin_no_alloc(trilogy_buffer_t *buffer)
{
if (get_buffer_pool(false)) {
return buffer_checkin(buffer);
}

// The pool was freed, we're likely during Ruby shutdown
xfree(buffer->buff);
buffer->buff = NULL;
buffer->cap = 0;
return false;
}

VALUE Trilogy_CastError;
static VALUE Trilogy_BaseConnectionError, Trilogy_ProtocolError, Trilogy_SSLError, Trilogy_QueryError,
Trilogy_ConnectionClosedError,
Expand All @@ -34,6 +189,20 @@ struct trilogy_ctx {
VALUE encoding;
};

static void rb_trilogy_acquire_buffer(struct trilogy_ctx *ctx)
{
if (!ctx->conn.packet_buffer.buff) {
buffer_checkout(&ctx->conn.packet_buffer, TRILOGY_DEFAULT_BUF_SIZE);
}
}

static void rb_trilogy_release_buffer(struct trilogy_ctx *ctx)
{
if (ctx->conn.packet_buffer.buff) {
buffer_checkin(&ctx->conn.packet_buffer);
}
}

static void mark_trilogy(void *ptr)
{
struct trilogy_ctx *ctx = ptr;
Expand All @@ -43,6 +212,11 @@ static void mark_trilogy(void *ptr)
static void free_trilogy(void *ptr)
{
struct trilogy_ctx *ctx = ptr;

if (ctx->conn.packet_buffer.buff) {
buffer_checkin_no_alloc(&ctx->conn.packet_buffer);
}

trilogy_free(&ctx->conn);
xfree(ptr);
}
Expand Down Expand Up @@ -116,6 +290,8 @@ static void handle_trilogy_error(struct trilogy_ctx *ctx, int rc, const char *ms
VALUE rbmsg = rb_vsprintf(msg, args);
va_end(args);

rb_trilogy_release_buffer(ctx);

if (!trilogy_error_recoverable_p(rc)) {
if (ctx->conn.socket != NULL) {
// trilogy_sock_shutdown may affect errno
Expand Down Expand Up @@ -178,7 +354,7 @@ static VALUE allocate_trilogy(VALUE klass)

ctx->query_flags = TRILOGY_FLAGS_DEFAULT;

if (trilogy_init(&ctx->conn) < 0) {
if (trilogy_init_no_buffer(&ctx->conn) < 0) {
VALUE rbmsg = rb_str_new("trilogy_init", 13);
trilogy_syserr_fail_str(errno, rbmsg);
}
Expand Down Expand Up @@ -602,6 +778,8 @@ static VALUE rb_trilogy_connect(VALUE self, VALUE encoding, VALUE charset, VALUE
connopt.tls_max_version = NUM2INT(val);
}

rb_trilogy_acquire_buffer(ctx);

int rc = try_connect(ctx, &handshake, &connopt);
if (rc != TRILOGY_OK) {
if (connopt.path) {
Expand All @@ -617,6 +795,8 @@ static VALUE rb_trilogy_connect(VALUE self, VALUE encoding, VALUE charset, VALUE

authenticate(ctx, &handshake, connopt.ssl_mode);

rb_trilogy_release_buffer(ctx);

return Qnil;
}

Expand All @@ -626,6 +806,8 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database)

StringValue(database);

rb_trilogy_acquire_buffer(ctx);

int rc = trilogy_change_db_send(&ctx->conn, RSTRING_PTR(database), RSTRING_LEN(database));

if (rc == TRILOGY_AGAIN) {
Expand Down Expand Up @@ -653,13 +835,17 @@ static VALUE rb_trilogy_change_db(VALUE self, VALUE database)
}
}

rb_trilogy_release_buffer(ctx);

return Qtrue;
}

static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option)
{
struct trilogy_ctx *ctx = get_open_ctx(self);

rb_trilogy_acquire_buffer(ctx);

int rc = trilogy_set_option_send(&ctx->conn, NUM2INT(option));

if (rc == TRILOGY_AGAIN) {
Expand Down Expand Up @@ -687,6 +873,8 @@ static VALUE rb_trilogy_set_server_option(VALUE self, VALUE option)
}
}

rb_trilogy_release_buffer(ctx);

return Qtrue;
}

Expand Down Expand Up @@ -892,6 +1080,10 @@ static VALUE execute_read_query_response(struct trilogy_ctx *ctx)
handle_trilogy_error(ctx, args.rc, args.msg);
}

if (!(ctx->conn.server_status & TRILOGY_SERVER_STATUS_MORE_RESULTS_EXISTS)) {
rb_trilogy_release_buffer(ctx);
}

return result;
}

Expand Down Expand Up @@ -924,6 +1116,8 @@ static VALUE rb_trilogy_query(VALUE self, VALUE query)
StringValue(query);
query = rb_str_export_to_enc(query, rb_to_encoding(ctx->encoding));

rb_trilogy_acquire_buffer(ctx);

int rc = trilogy_query_send(&ctx->conn, RSTRING_PTR(query), RSTRING_LEN(query));

if (rc == TRILOGY_AGAIN) {
Expand All @@ -941,6 +1135,8 @@ static VALUE rb_trilogy_ping(VALUE self)
{
struct trilogy_ctx *ctx = get_open_ctx(self);

rb_trilogy_acquire_buffer(ctx);

int rc = trilogy_ping_send(&ctx->conn);

if (rc == TRILOGY_AGAIN) {
Expand Down Expand Up @@ -968,6 +1164,7 @@ static VALUE rb_trilogy_ping(VALUE self)
}
}

rb_trilogy_release_buffer(ctx);
return Qtrue;
}

Expand All @@ -985,13 +1182,19 @@ static VALUE rb_trilogy_escape(VALUE self, VALUE str)
const char *escaped_str;
size_t escaped_len;

rb_trilogy_acquire_buffer(ctx);

int rc = trilogy_escape(&ctx->conn, RSTRING_PTR(str), RSTRING_LEN(str), &escaped_str, &escaped_len);

if (rc < 0) {
handle_trilogy_error(ctx, rc, "trilogy_escape");
}

return rb_enc_str_new(escaped_str, escaped_len, str_enc);
VALUE escaped_string = rb_enc_str_new(escaped_str, escaped_len, str_enc);

rb_trilogy_release_buffer(ctx);

return escaped_string;
}

static VALUE rb_trilogy_close(VALUE self)
Expand All @@ -1002,6 +1205,8 @@ static VALUE rb_trilogy_close(VALUE self)
return Qnil;
}

rb_trilogy_acquire_buffer(ctx);

int rc = trilogy_close_send(&ctx->conn);

if (rc == TRILOGY_AGAIN) {
Expand All @@ -1027,6 +1232,8 @@ static VALUE rb_trilogy_close(VALUE self)
// we must clear any SSL errors left in the queue from a read/write.
ERR_clear_error();

rb_trilogy_release_buffer(ctx);

trilogy_free(&ctx->conn);

return Qnil;
Expand Down Expand Up @@ -1132,15 +1339,34 @@ static VALUE rb_trilogy_server_status(VALUE self) { return LONG2FIX(get_open_ctx

static VALUE rb_trilogy_server_version(VALUE self) { return rb_str_new_cstr(get_open_ctx(self)->server_version); }

static VALUE rb_trilogy_buffer_pool_size(VALUE klass)
{
return UINT2NUM(buffer_pool_max_size);
}

static VALUE rb_trilogy_buffer_pool_size_set(VALUE klass, VALUE size)
{
#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY
RUBY_ATOMIC_SET(buffer_pool_max_size, NUM2UINT(size));
#else
buffer_pool_max_size = NUM2UINT(size);
#endif
return size;
}

RUBY_FUNC_EXPORTED void Init_cext(void)
{
#ifdef HAVE_RB_EXT_RACTOR_SAFE
#ifdef HAVE_RB_RACTOR_LOCAL_STORAGE_VALUE_NEWKEY
rb_ext_ractor_safe(true);
buffer_pool_key = rb_ractor_local_storage_value_newkey();
#endif

VALUE Trilogy = rb_const_get(rb_cObject, rb_intern("Trilogy"));
rb_define_alloc_func(Trilogy, allocate_trilogy);

rb_define_singleton_method(Trilogy, "buffer_pool_size", rb_trilogy_buffer_pool_size, 0);
rb_define_singleton_method(Trilogy, "buffer_pool_size=", rb_trilogy_buffer_pool_size_set, 1);

rb_define_private_method(Trilogy, "_connect", rb_trilogy_connect, 3);
rb_define_method(Trilogy, "change_db", rb_trilogy_change_db, 1);
rb_define_alias(Trilogy, "select_db", "change_db");
Expand Down
2 changes: 1 addition & 1 deletion contrib/ruby/ext/trilogy-ruby/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
have_library("crypto", "CRYPTO_malloc")
have_library("ssl", "SSL_new")
have_func("rb_interned_str", "ruby.h")

have_func("rb_ractor_local_storage_value_newkey", "ruby.h")
create_makefile "trilogy/cext"
8 changes: 8 additions & 0 deletions contrib/ruby/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1131,4 +1131,12 @@ def test_error_classes_exclusively_match_subclasses
assert_operator SystemCallError, :===, klass.new
assert_operator Trilogy::ConnectionError, :===, klass.new
end

def test_buffer_pool_size_can_be_configured
assert_equal 8, Trilogy.buffer_pool_size
Trilogy.buffer_pool_size = 4
assert_equal 4, Trilogy.buffer_pool_size
ensure
Trilogy.buffer_pool_size = 8
end
end
Loading

0 comments on commit c080567

Please sign in to comment.