Skip to content

Commit

Permalink
Fix TSL + docs
Browse files Browse the repository at this point in the history
  • Loading branch information
boazsegev committed Dec 2, 2024
1 parent ac4298b commit 364111d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 74 deletions.
43 changes: 0 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,49 +485,6 @@ These options can be used, for example, like so:

More possible compile time options can be found in the [facil.io documentation](http://facil.io).

## Evented oriented design with extra safety

Iodine is an evented server, similar in its architecture to `nginx` and `puma`. It's different than the simple "thread-per-client" design that is often taught when we begin to learn about network programming.
By leveraging `epoll` (on Linux) and `kqueue` (on BSD), iodine can listen to multiple network events on multiple sockets using a single thread.
All these events go into a task queue, together with the application events and any user generated tasks, such as ones scheduled by [`Iodine.run`](http://www.rubydoc.info/github/boazsegev/iodine/Iodine#run-class_method).
In pseudo-code, this might look like this
```ruby
QUEUE = Queue.new
def server_cycle
if(QUEUE.empty?)
QUEUE << get_next_32_socket_events # these events schedule the proper user code to run
end
QUEUE << server_cycle
end
def run_server
while ((event = QUEUE.pop))
event.shift.call(*event)
end
end
```
In pure Ruby (without using C extensions or Java), it's possible to do the same by using `select`... and although `select` has some issues, it could work well for lighter loads.

The server events are fairly fast and fragmented (longer code is fragmented across multiple events), so one thread is enough to run the server including it's static file service and everything...
...but single threaded mode should probably be avoided.
It's very common that the application's code will run slower and require external resources (i.e., databases, a custom pub/sub service, etc'). This slow code could "starve" the server, which is patiently waiting to run it's short tasks on the same thread.
The thread pool is there to help slow user code.
The slower your application code, the more threads you will need to keep the server running in a responsive manner (note that responsiveness and speed aren't always the same).

To make a thread pool easier and safer to use, iodine makes sure that no connection task / callback is called concurrently for the same connection.

For example, a is a WebSocket connection is already busy in it's `on_message` callback, no other messages will be forwarded to the callback until the current callback returns.
## Free, as in freedom (BYO beer)

Iodine is **free** and **open source**, so why not take it out for a spin?
Expand Down
5 changes: 4 additions & 1 deletion ext/iodine/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
begin
require 'openssl'
if(OpenSSL::VERSION.to_i > 2)
puts "* Detected OpenSSL version > 3 (#{OpenSSL::VERSION}), setting the HAVE_OPENSSL flag."
puts "* Detected OpenSSL version >= 3 (#{OpenSSL::VERSION}), setting the HAVE_OPENSSL flag."
$defs << "-DHAVE_OPENSSL"
else
puts "* Detected OpenSSL with incompatible version (#{OpenSSL::VERSION})."
end
rescue LoadError
puts "* Couldn't find OpenSSL - skipping!"
end
end

Expand Down
131 changes: 101 additions & 30 deletions ext/iodine/fio-stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35015,6 +35015,7 @@ struct fio_io_async_s {
fio_queue_s *q;
uint32_t count;
fio_queue_s queue;
fio_timer_queue_s timers;
FIO_LIST_NODE node;
};

Expand Down Expand Up @@ -35048,6 +35049,30 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads);
/** Pushes a task to an IO Async Queue (macro helper). */
#define fio_io_async(q_, ...) fio_queue_push((q_)->q, __VA_ARGS__)

/** Schedules a timer bound task for the async queue (`fio_timer_schedule`). */
SFUNC void fio_io_async_every(fio_io_async_s *q, fio_timer_schedule_args_s);

/**
* Schedules a timer bound task, for the async queue, see `fio_timer_schedule`.
*
* Possible "named arguments" (fio_timer_schedule_args_s members) include:
*
* * The timer function. If it returns a non-zero value, the timer stops:
* int (*fn)(void *, void *)
* * Opaque user data:
* void *udata1
* * Opaque user data:
* void *udata2
* * Called when the timer is done (finished):
* void (*on_stop)(void *, void *)
* * Timer interval, in milliseconds:
* uint32_t every
* * The number of times the timer should be performed. -1 == infinity:
* int32_t repetitions
*/
#define fio_io_async_every(async, ...) \
fio_io_async_every(async, (fio_timer_schedule_args_s){__VA_ARGS__})

/* *****************************************************************************
IO API Finish
***************************************************************************** */
Expand All @@ -35069,6 +35094,7 @@ Copyright and License: see header file (000 copyright.h) or top of file
(defined(FIO_EXTERN_COMPLETE) || !defined(FIO_EXTERN))
#define H___FIO_IO_TYPES___H

/** I would love to use fio_time_mono, but using time_real enables logging. */
#define FIO___IO_GET_TIME_MILLI() fio_time2milli(fio_time_real())

/* *****************************************************************************
Expand Down Expand Up @@ -35715,7 +35741,8 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {
args.dealloc);
} else if ((unsigned)(args.fd + 1) > 1) {
packet = fio_stream_pack_fd((int)args.fd, args.len, args.offset, args.copy);
}
} else /* fio_io_write2 called without data */
goto do_nothing;
if (!packet)
goto error;
if ((io->flags & FIO___IO_FLAG_CLOSE))
Expand All @@ -35724,14 +35751,17 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {
return;

error: /* note: `dealloc` already called by the `fio_stream` error handler. */
FIO_LOG_ERROR("couldn't create %zu bytes long user-packet for IO %p (%d)",
args.len,
(void *)io,
(io ? io->fd : -1));
FIO_LOG_ERROR(
"(%d) couldn't create %zu bytes long user-packet for IO %p (%d)",
fio_io_pid(),
args.len,
(void *)io,
(io ? io->fd : -1));
return;

write_called_after_close:
FIO_LOG_DEBUG2("`write` called after `close` was called for IO.");
FIO_LOG_DEBUG2("(%d) `write` called after `close` was called for IO.",
fio_io_pid());
{
union {
void *ptr;
Expand All @@ -35743,6 +35773,7 @@ SFUNC void fio_io_write2 FIO_NOOP(fio_io_s *io, fio_io_write_args_s args) {

io_error_null:
FIO_LOG_ERROR("(%d) `fio_write2` called for invalid IO (NULL)", FIO___IO.pid);
do_nothing:
if (args.dealloc) {
union {
void *ptr;
Expand Down Expand Up @@ -36585,6 +36616,7 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads) {
.q = fio_io_queue(),
.count = threads,
.queue = FIO_QUEUE_STATIC_INIT(q->queue),
.timers = FIO_TIMER_QUEUE_INIT,
.node = FIO_LIST_INIT(q->node),
};
FIO_LIST_PUSH(&FIO___IO.async, &q->node);
Expand All @@ -36594,6 +36626,14 @@ SFUNC void fio_io_async_attach(fio_io_async_s *q, uint32_t threads) {
fio___io_async_start(q);
}

void fio_io_async_every___(void); /* IDE Mark */
/** Schedules a timer bound task for the async queue (`fio_timer_schedule`). */
SFUNC void fio_io_async_every FIO_NOOP(fio_io_async_s *q,
fio_timer_schedule_args_s a) {
a.start_at = FIO___IO.tick;
fio_timer_schedule FIO_NOOP(&q->timers, a);
}

/* *****************************************************************************
Managing data after a fork
***************************************************************************** */
Expand Down Expand Up @@ -36691,6 +36731,9 @@ FIO_SFUNC void fio___io_tick(int timeout) {
}
FIO___IO.tick = FIO___IO_GET_TIME_MILLI();
fio_timer_push2queue(&FIO___IO.queue, &FIO___IO.timer, FIO___IO.tick);
FIO_LIST_EACH(fio_io_async_s, node, &FIO___IO.async, a) {
fio_timer_push2queue(a->q, &a->timers, FIO___IO.tick);
}
for (size_t i = 0; i < 2048; ++i)
if (fio_queue_perform(&FIO___IO.queue))
break;
Expand Down Expand Up @@ -37339,7 +37382,7 @@ IO Reactor Finish

Copyright and License: see header file (000 copyright.h) or top of file
***************************************************************************** */
#if defined(H___FIO_SERVER___H) && \
#if defined(H___FIO_IO___H) && \
(HAVE_OPENSSL || __has_include("openssl/ssl.h")) && \
!defined(H___FIO_OPENSSL___H) && !defined(FIO___RECURSIVE_INCLUDE)
#define H___FIO_OPENSSL___H 1
Expand Down Expand Up @@ -37753,7 +37796,8 @@ FIO_LEAK_COUNTER_DEF(fio___SSL)

/** called once the IO was attached and the TLS object was set. */
FIO_SFUNC void fio___openssl_start(fio_io_s *io) {
fio___openssl_context_s *ctx_parent = (fio___openssl_context_s *)fio_tls(io);
fio___openssl_context_s *ctx_parent =
(fio___openssl_context_s *)fio_io_tls(io);
FIO_ASSERT_DEBUG(ctx_parent, "OpenSSL Context missing!");

SSL *ssl = SSL_new(ctx_parent->ctx);
Expand All @@ -37764,7 +37808,7 @@ FIO_SFUNC void fio___openssl_start(fio_io_s *io) {
FIO_LOG_DDEBUG2("(%d) allocated new TLS context for %p.",
(int)fio_thread_getpid(),
(void *)io);
BIO *bio = BIO_new_socket(fio_fd(io), 0);
BIO *bio = BIO_new_socket(fio_io_fd(io), 0);
SSL_set_bio(ssl, bio, bio);
SSL_set_ex_data(ssl, 0, (void *)io);
if (SSL_is_server(ssl))
Expand Down Expand Up @@ -37811,7 +37855,7 @@ static void fio___openssl_free_context_task(void *tls_ctx, void *ignr_) {

/** Builds a local TLS context out of the fio_io_tls_s object. */
static void fio___openssl_free_context(void *tls_ctx) {
fio_srv_defer(fio___openssl_free_context_task, tls_ctx, NULL);
fio_io_defer(fio___openssl_free_context_task, tls_ctx, NULL);
}
/* *****************************************************************************
IO Functions Structure
Expand All @@ -37834,7 +37878,7 @@ SFUNC fio_io_functions_s fio_openssl_io_functions(void) {
FIO_CONSTRUCTOR(fio___openssl_setup_default) {
static fio_io_functions_s FIO___OPENSSL_IO_FUNCS;
FIO___OPENSSL_IO_FUNCS = fio_openssl_io_functions();
fio_io_tls_default_io_functions(&FIO___OPENSSL_IO_FUNCS);
fio_io_tls_default_functions(&FIO___OPENSSL_IO_FUNCS);
#ifdef SIGPIPE
fio_signal_monitor(SIGPIPE, NULL, NULL); /* avoid OpenSSL issue... */
#endif
Expand Down Expand Up @@ -40837,7 +40881,7 @@ struct fio_http_controller_s {
void (*on_destroyed)(fio_http_s *h);
/** Informs the controller that request / response headers must be sent. */
void (*send_headers)(fio_http_s *h);
/** called by the HTTP handle for each body chunk (or to finish a response. */
/** called by the HTTP handle for each body chunk, or to finish a response. */
void (*write_body)(fio_http_s *h, fio_http_write_args_s args);
/** called once a request / response had finished */
void (*on_finish)(fio_http_s *h);
Expand Down Expand Up @@ -42498,10 +42542,8 @@ FIO_SFUNC int fio____http_write_start(fio_http_s *h,

FIO_SFUNC int fio____http_write_cont(fio_http_s *h,
fio_http_write_args_s *args) {
if (args->buf || args->fd) {
h->controller->write_body(h, *args);
h->sent += args->len;
}
h->controller->write_body(h, *args);
h->sent += args->len;
if (args->finish) {
h->state |= FIO_HTTP_STATE_FINISHED;
h->writer = (h->state & FIO_HTTP_STATE_UPGRADED)
Expand Down Expand Up @@ -46162,13 +46204,15 @@ FIO_SFUNC void fio___http_controller_http1_write_body(
goto no_write_err;
if (fio_http_is_streaming(h))
goto stream_chunk;
if (c->state.http.buf.len && args.buf && args.len) {
fio_string_write(&c->state.http.buf,
FIO_STRING_REALLOC,
(char *)args.buf + args.offset,
args.len);
if (args.dealloc)
args.dealloc((void *)args.buf);
if (c->state.http.buf.len) {
if (args.buf && args.len) {
fio_string_write(&c->state.http.buf,
FIO_STRING_REALLOC,
(char *)args.buf + args.offset,
args.len);
if (args.dealloc)
args.dealloc((void *)args.buf);
}
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
Expand All @@ -46188,14 +46232,40 @@ FIO_SFUNC void fio___http_controller_http1_write_body(

stream_chunk:
if (args.len) { /* print chunk header */
char buf[24];
fio_str_info_s i = FIO_STR_INFO3(buf, 0, 24);
fio_string_write_hex(&i, NULL, args.len);
fio_string_write(&i, NULL, "\r\n", 2);
fio_io_write2(c->io, .buf = (void *)i.buf, .len = i.len, .copy = 1);
if (c->state.http.buf.len) {
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
.dealloc = FIO_STRING_FREE);
fio_string_write2(&c->state.http.buf,
FIO_STRING_REALLOC,
FIO_STRING_WRITE_HEX(args.len),
FIO_STRING_WRITE_STR2("\r\n", 2));
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
.dealloc = FIO_STRING_FREE);
c->state.http.buf = FIO_STR_INFO0;
} else {
char buf[24];
fio_str_info_s i = FIO_STR_INFO3(buf, 0, 24);
fio_string_write_hex(&i, NULL, args.len);
fio_string_write(&i, NULL, "\r\n", 2);
fio_io_write2(c->io, .buf = (void *)i.buf, .len = i.len, .copy = 1);
}
} else {
FIO_LOG_ERROR("HTTP1 streaming requires a correctly pre-determined "
"length per chunk.");
if (c->state.http.buf.len) {
fio_io_write2(c->io,
.buf = (void *)c->state.http.buf.buf,
.len = c->state.http.buf.len,
.dealloc = FIO_STRING_FREE);
c->state.http.buf = FIO_STR_INFO0;
}
if (args.buf || (uint32_t)(args.fd + 1) > 0U)
FIO_LOG_ERROR("HTTP1 streaming requires a correctly pre-determined "
"length per chunk.");
else
goto no_write_err;
}
fio_io_write2(c->io,
.buf = (void *)args.buf,
Expand All @@ -46210,6 +46280,7 @@ FIO_SFUNC void fio___http_controller_http1_write_body(
fio_io_write2(c->io, .buf = trailer.buf, .len = trailer.len, .copy = 1);
}
return;

no_write_err:
if (args.buf) {
if (args.dealloc)
Expand Down
10 changes: 10 additions & 0 deletions ext/iodine/iodine_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,16 @@ Listen to incoming TCP/IP Connections
***************************************************************************** */

static void iodine_tcp_on_stop(fio_io_protocol_s *p, void *udata) {
/* TODO! call on_close */
// VALUE connection = rb_obj_alloc(iodine_rb_IODINE_CONNECTION);
// STORE.hold(connection);
// iodine_connection_s *c = iodine_connection_ptr(m);
// c->store[IODINE_CONNECTION_STORE_handler] = (VALUE)udata;
// c->io = NULL;
// c->http = NULL;
// iodine_ruby_call_outside((VALUE)udata, IODINE_CLOSE_ID, 1, &connection);
// STORE.release(connection);

STORE.release((VALUE)udata);
FIO_MEM_FREE(p, sizeof(*p));
}
Expand Down

0 comments on commit 364111d

Please sign in to comment.