Skip to content

Commit

Permalink
catch corner conditions
Browse files Browse the repository at this point in the history
mark connection inactive on zombie start, not on zombie end.
existing requests will continue to use it, but new requests will
go to a different connection.

don't send retry if writes are blocked.
  • Loading branch information
alandekok committed Dec 10, 2024
1 parent af714ab commit 2882e68
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 24 deletions.
90 changes: 67 additions & 23 deletions src/modules/rlm_radius2/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ static void protocol_error_reply(bio_request_t *u, bio_result_t *r, bio_handle_

static unlang_action_t mod_resume(rlm_rcode_t *p_result, module_ctx_t const *mctx, UNUSED request_t *request);
static void mod_signal(module_ctx_t const *mctx, UNUSED request_t *request, fr_signal_t action);

static void mod_write(request_t *request, trunk_request_t *treq, bio_handle_t *h);

#ifndef NDEBUG
/** Log additional information about a tracking entry
Expand Down Expand Up @@ -1182,7 +1182,6 @@ static void zombie_timeout(fr_event_list_t *el, fr_time_t now, void *uctx)
* Don't use this connection, and re-queue all of its
* requests onto other connections.
*/
trunk_connection_signal_inactive(tconn);
(void) trunk_connection_requests_requeue(tconn, TRUNK_REQUEST_STATE_ALL, 0, false);

/*
Expand Down Expand Up @@ -1257,7 +1256,12 @@ static bool check_for_zombie(fr_event_list_t *el, trunk_connection_t *tconn, fr_
if (h->inst->synchronous && fr_time_gt(last_sent, fr_time_wrap(0)) &&
(fr_time_lt(fr_time_add(last_sent, h->inst->response_window), now))) return false;

/*
* Stop using it for new requests.
*/
WARN("%s - Entering Zombie state - connection %s", h->module_name, h->fd_info->name);
trunk_connection_signal_inactive(tconn);

if (h->inst->status_check) {
h->status_checking = true;

Expand All @@ -1283,6 +1287,7 @@ static bool check_for_zombie(fr_event_list_t *el, trunk_connection_t *tconn, fr_
return true;
}


/** Handle retries.
*
*/
Expand All @@ -1297,16 +1302,16 @@ static void mod_retry(module_ctx_t const *mctx, request_t *request, fr_retry_t c

bio_request_t *u = talloc_get_type_abort(treq->preq, bio_request_t);

bio_handle_t *h;

fr_assert(request == treq->request);
fr_assert(treq->preq); /* Must still have a protocol request */

switch (retry->state) {

case FR_RETRY_CONTINUE:
u->retry = *retry;

switch (treq->state) {

case TRUNK_REQUEST_STATE_INIT:
case TRUNK_REQUEST_STATE_UNASSIGNED:
fr_assert(0);
Expand All @@ -1326,8 +1331,16 @@ static void mod_retry(module_ctx_t const *mctx, request_t *request, fr_retry_t c

case TRUNK_REQUEST_STATE_SENT:
fr_assert(tconn);

h = talloc_get_type_abort(tconn->conn->h, bio_handle_t);

if (h->fd_info->write_blocked) {
RDEBUG("IO is blocked - suppressing retransmission");
return;
}

r->is_retry = true;
trunk_request_requeue(treq);
mod_write(request, treq, h);
return;

case TRUNK_REQUEST_STATE_REAPABLE:
Expand Down Expand Up @@ -1364,20 +1377,12 @@ static void mod_retry(module_ctx_t const *mctx, request_t *request, fr_retry_t c
check_for_zombie(unlang_interpret_event_list(request), tconn, now, retry->start);
}


CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function*/
static void request_mux(UNUSED fr_event_list_t *el,
trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
{
bio_handle_t *h = talloc_get_type_abort(conn->h, bio_handle_t);
rlm_radius_t const *inst = h->inst;
trunk_request_t *treq;
bio_request_t *u;
request_t *request;
char const *action;
uint8_t const *packet;
size_t packet_len;
ssize_t slen;

if (unlikely(trunk_connection_pop_request(&treq, tconn) < 0)) return;

Expand All @@ -1386,16 +1391,27 @@ static void request_mux(UNUSED fr_event_list_t *el,
*/
if (!treq) return;

mod_write(treq->request, treq, h);
}

static void mod_write(request_t *request, trunk_request_t *treq, bio_handle_t *h)
{
rlm_radius_t const *inst = h->inst;
bio_request_t *u;
char const *action;
uint8_t const *packet;
size_t packet_len;
ssize_t slen;

fr_assert((treq->state == TRUNK_REQUEST_STATE_PENDING) ||
(treq->state == TRUNK_REQUEST_STATE_PARTIAL));

request = treq->request;
u = talloc_get_type_abort(treq->preq, bio_request_t);

fr_assert(!u->status_check);

/*
* Send partial packets first.
* If it's a partial packet, then write the partial bit.
*/
if (u->partial) {
fr_assert(u->partial < u->packet_len);
Expand Down Expand Up @@ -1470,16 +1486,21 @@ static void request_mux(UNUSED fr_event_list_t *el,
* Temporary conditions
*/
switch (errno) {
/*
* The BIO code should catch EAGAIN, EWOULDBLOCK, EINTR,
* and return "0 bytes written".
*/
#if defined(EWOULDBLOCK) && (EWOULDBLOCK != EAGAIN)
case EWOULDBLOCK: /* No outbound packet buffers, maybe? */
#endif
case EAGAIN: /* No outbound packet buffers, maybe? */
case EINTR: /* Interrupted by signal */

case ENOBUFS: /* No outbound packet buffers, maybe? */
case ENOMEM: /* malloc failure in kernel? */
RWARN("%s - Failed sending data over connection %s: %s",
h->module_name, h->fd_info->name, fr_syserror(errno));
trunk_request_requeue(treq);
trunk_request_signal_fail(treq);
break;

/*
Expand All @@ -1498,7 +1519,7 @@ static void request_mux(UNUSED fr_event_list_t *el,
default:
ERROR("%s - Failed sending data over connection %s: %s",
h->module_name, h->fd_info->name, fr_syserror(errno));
trunk_connection_signal_reconnect(tconn, CONNECTION_FAILED);
trunk_connection_signal_reconnect(treq->tconn, CONNECTION_FAILED);
break;
}

Expand All @@ -1514,6 +1535,7 @@ static void request_mux(UNUSED fr_event_list_t *el,
RWARN("%s - Failed sending data over connection %s: sent zero bytes",
h->module_name, h->fd_info->name);
trunk_request_requeue(treq);
return;
}

packet_len += slen;
Expand All @@ -1524,26 +1546,39 @@ static void request_mux(UNUSED fr_event_list_t *el,
}

/*
* Don't print anything more for replicated requests.
* For retransmissions.
*/
u->partial = 0;

/*
* Don't print anything extra for replication.
*/
if (inst->replicate) {
bio_result_t *r = talloc_get_type_abort(treq->rctx, bio_result_t);

r->rcode = RLM_MODULE_OK;
trunk_request_signal_complete(treq);
} else {
trunk_request_signal_sent(treq);
return;
}

/*
* Tell the admin what's going on
* On first packet, signal it as sent, and update stats.
*
* Later packets are just retransmissions to the BIO, and don't need to involve
* the trunk code.
*/
if (u->retry.count == 1) {
action = inst->originate ? "Originated" : "Proxied";
h->last_sent = u->retry.start;
if (fr_time_lteq(h->first_sent, h->last_idle)) h->first_sent = h->last_sent;

trunk_request_signal_sent(treq);

action = inst->originate ? "Originated" : "Proxied";

} else {
/*
* We don't signal the trunk that it's been sent, it was already senty
*/
action = "Retransmitted";
}

Expand Down Expand Up @@ -2034,6 +2069,7 @@ static void mod_signal(module_ctx_t const *mctx, UNUSED request_t *request, fr_s

bio_thread_t *t = talloc_get_type_abort(module_thread(mctx->mi)->data, bio_thread_t);
bio_result_t *r = talloc_get_type_abort(mctx->rctx, bio_result_t);
bio_handle_t *h;

/*
* We received a duplicate packet, but we're not doing
Expand Down Expand Up @@ -2080,6 +2116,14 @@ static void mod_signal(module_ctx_t const *mctx, UNUSED request_t *request, fr_s
*/
if (!t->inst->synchronous) return;

h = r->treq->tconn->conn->h;

if (h->fd_info->write_blocked) {
RDEBUG("IO is blocked - suppressing retransmission");
return;
}
r->is_retry = true;

/*
* We are synchronous, retransmit the current
* request on the same connection.
Expand All @@ -2088,7 +2132,7 @@ static void mod_signal(module_ctx_t const *mctx, UNUSED request_t *request, fr_s
* connection is dead, then a callback will move
* this request to a new connection.
*/
trunk_request_requeue(r->treq);
mod_write(request, r->treq, h);
return;

default:
Expand Down
8 changes: 7 additions & 1 deletion src/modules/rlm_radius2/rlm_radius.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,13 @@ static int mod_instantiate(module_inst_ctx_t const *mctx)
/*
* Replication is write-only, and append by default.
*/
if (inst->replicate) inst->fd_config.flags = O_WRONLY | O_APPEND;
if (inst->replicate) {
inst->fd_config.flags = O_WRONLY | O_APPEND;

} else if (inst->fd_config.filename) {
cf_log_err(conf, "When using an output 'filename', you MUST set 'replicate = true');
return -1;
}

if (fr_bio_fd_check_config(&inst->fd_config) < 0) {
cf_log_perr(conf, "Invalid configuration");
Expand Down

0 comments on commit 2882e68

Please sign in to comment.