Skip to content

Commit

Permalink
adapt change from "TLS deliver buffer data during shutdown" (#474)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Jul 29, 2024
1 parent 601d778 commit a2fb16c
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 55 deletions.
6 changes: 5 additions & 1 deletion include/aws/http/private/h1_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,16 @@ struct aws_h1_connection {
/* If non-zero, reason to immediately reject new streams. (ex: closing) */
int new_stream_error_code;

/* If true, user has called connection_close() or stream_cancel(),
* but the cross_thread_work_task hasn't processed it yet */
bool shutdown_requested;
int shutdown_requested_error_code;

/* See `cross_thread_work_task` */
bool is_cross_thread_work_task_scheduled : 1;

/* For checking status from outside the event-loop thread. */
bool is_open : 1;

} synced_data;
};

Expand Down
71 changes: 56 additions & 15 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ void aws_h1_connection_unlock_synced_data(struct aws_h1_connection *connection)
* - Channel is shutting down in the read direction.
* - Channel is shutting down in the write direction.
* - An error occurs.
* - User wishes to close the connection (this is the only case where the function may run off-thread).
*/
static void s_stop(
struct aws_h1_connection *connection,
Expand All @@ -139,15 +138,14 @@ static void s_stop(
bool schedule_shutdown,
int error_code) {

AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */

if (stop_reading) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
connection->thread_data.is_reading_stopped = true;
}

if (stop_writing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
connection->thread_data.is_writing_stopped = true;
}
{ /* BEGIN CRITICAL SECTION */
Expand All @@ -169,6 +167,11 @@ static void s_stop(
aws_error_name(error_code));

aws_channel_shutdown(connection->base.channel_slot->channel, error_code);
if (stop_reading) {
/* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the TLS
* handler. */
aws_channel_slot_increment_read_window(connection->base.channel_slot, SIZE_MAX);
}
}
}

Expand All @@ -189,14 +192,45 @@ static void s_shutdown_due_to_error(struct aws_h1_connection *connection, int er
s_stop(connection, true /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, error_code);
}

/**
* Helper to shutdown the connection from non-channel thread. (User wishes to close the connection)
**/
static void s_shutdown_from_off_thread(struct aws_h1_connection *connection, int error_code) {
bool should_schedule_task = false;
{ /* BEGIN CRITICAL SECTION */
aws_h1_connection_lock_synced_data(connection);
if (!connection->synced_data.is_cross_thread_work_task_scheduled) {
connection->synced_data.is_cross_thread_work_task_scheduled = true;
should_schedule_task = true;
}
if (!connection->synced_data.shutdown_requested) {
connection->synced_data.shutdown_requested = true;
connection->synced_data.shutdown_requested_error_code = error_code;
}
/* Connection has shutdown, new streams should not be allowed. */
connection->synced_data.is_open = false;
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
aws_h1_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */

if (should_schedule_task) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION, "id=%p: Scheduling connection cross-thread work task.", (void *)&connection->base);
aws_channel_schedule_task_now(connection->base.channel_slot->channel, &connection->cross_thread_work_task);
} else {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Connection cross-thread work task was already scheduled",
(void *)&connection->base);
}
}

/**
* Public function for closing connection.
*/
static void s_connection_close(struct aws_http_connection *connection_base) {
struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);

/* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, AWS_ERROR_SUCCESS);
s_shutdown_from_off_thread(connection, AWS_ERROR_SUCCESS);
}

static void s_connection_stop_new_request(struct aws_http_connection *connection_base) {
Expand Down Expand Up @@ -412,8 +446,7 @@ void aws_h1_stream_cancel(struct aws_http_stream *stream, int error_code) {
(void *)stream,
error_code,
aws_error_name(error_code));

s_stop(connection, false /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code);
s_shutdown_from_off_thread(connection, error_code);
}

struct aws_http_stream *s_make_request(
Expand Down Expand Up @@ -495,10 +528,17 @@ static void s_cross_thread_work_task(struct aws_channel_task *channel_task, void
bool has_new_client_streams = !aws_linked_list_empty(&connection->synced_data.new_client_stream_list);
aws_linked_list_move_all_back(
&connection->thread_data.stream_list, &connection->synced_data.new_client_stream_list);
bool shutdown_requested = connection->synced_data.shutdown_requested;
int shutdown_error = connection->synced_data.shutdown_requested_error_code;
connection->synced_data.shutdown_requested = false;
connection->synced_data.shutdown_requested_error_code = 0;

aws_h1_connection_unlock_synced_data(connection);
/* END CRITICAL SECTION */

if (shutdown_requested) {
s_stop(connection, true /*stop_reading*/, true /*stop_writing*/, true /*schedule_shutdown*/, shutdown_error);
}
/* Kick off outgoing-stream task if necessary */
if (has_new_client_streams) {
aws_h1_connection_try_write_outgoing_stream(connection);
Expand Down Expand Up @@ -785,13 +825,8 @@ static void s_http_stream_response_first_byte_timeout_task(
(void *)connection_base,
response_first_byte_timeout_ms);

/* Don't stop reading/writing immediately, let that happen naturally during the channel shutdown process. */
s_stop(
connection,
false /*stop_reading*/,
false /*stop_writing*/,
true /*schedule_shutdown*/,
AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT);
/* Shutdown the connection. */
s_shutdown_due_to_error(connection, AWS_ERROR_HTTP_RESPONSE_FIRST_BYTE_TIMEOUT);
}

static void s_set_outgoing_message_done(struct aws_h1_stream *stream) {
Expand Down Expand Up @@ -1804,6 +1839,12 @@ static int s_handler_process_read_message(

AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION, "id=%p: Incoming message of size %zu.", (void *)&connection->base, message_size);
if (connection->thread_data.is_reading_stopped) {
/* Read has stopped, ignore the data, shutdown the channel incase it has not started yet. */
aws_mem_release(message->allocator, message); /* Release the message as we return success. */
s_shutdown_due_to_error(connection, AWS_ERROR_HTTP_CONNECTION_CLOSED);
return AWS_OP_SUCCESS;
}

/* Shrink connection window by amount of data received. See comments at variable's
* declaration site on why we use this instead of the official `aws_channel_slot.window_size`. */
Expand Down
5 changes: 5 additions & 0 deletions source/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,13 @@ static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, en

s_unlock_synced_data(websocket);
/* END CRITICAL SECTION */
websocket->thread_data.is_reading_stopped = true;
websocket->thread_data.is_writing_stopped = true;

aws_channel_shutdown(websocket->channel_slot->channel, error_code);
/* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the upstream
* handler. */
aws_channel_slot_increment_read_window(websocket->channel_slot, SIZE_MAX);
}

/* Tell the channel to shut down. It is safe to call this multiple times.
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ add_test_case(strutil_is_http_pseudo_header_name)

add_net_test_case(tls_download_medium_file_h1)
add_net_test_case(tls_download_medium_file_h2)
add_net_test_case(test_tls_download_shutdown_with_window_size_0)

add_test_case(websocket_decoder_sanity_check)
add_test_case(websocket_decoder_simplest_frame)
Expand Down
3 changes: 0 additions & 3 deletions tests/test_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,6 @@ AWS_TEST_CASE(connection_setup_shutdown, s_test_connection_setup_shutdown);
static int s_test_connection_setup_shutdown_tls(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

#ifdef __APPLE__ /* Something is wrong with APPLE */
return AWS_OP_SUCCESS;
#endif
struct tester_options options = {
.alloc = allocator,
.tls = true,
Expand Down
Loading

0 comments on commit a2fb16c

Please sign in to comment.