From d071082bfd3519cb01c408a93d57afda2ffb72e8 Mon Sep 17 00:00:00 2001 From: Vera Xia Date: Tue, 15 Oct 2024 09:01:41 -0700 Subject: [PATCH] [WIP]nw_socket protect socket data with lock (#685) --- .github/workflows/ci.yml | 5 +- CMakeLists.txt | 15 +- include/aws/io/private/dispatch_queue.h | 1 + include/aws/io/socket.h | 5 +- source/channel.c | 4 +- source/channel_bootstrap.c | 2 +- source/darwin/dispatch_queue_event_loop.c | 37 +- source/darwin/nw_socket.c | 726 ++++++++++++++-------- tests/CMakeLists.txt | 12 +- tests/channel_test.c | 3 + tests/socket_handler_test.c | 7 + tests/socket_test.c | 304 ++++++++- tests/tls_handler_test.c | 3 + 13 files changed, 817 insertions(+), 307 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 32ea9c615..986685b5c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -179,12 +179,15 @@ jobs: macos-debug: runs-on: macos-14 # latest + strategy: + matrix: + eventloop: ["-DAWS_USE_DISPATCH_QUEUE=ON", "-DAWS_USE_DISPATCH_QUEUE=OFF"] steps: - name: Build ${{ env.PACKAGE_NAME }} + consumers run: | python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')" chmod a+x builder - ./builder build -p ${{ env.PACKAGE_NAME }} --config Debug + ./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=${{ matrix.eventloop }} --config Debug freebsd: runs-on: ubuntu-22.04 # latest diff --git a/CMakeLists.txt b/CMakeLists.txt index e36bfcddd..3dfaf1afd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -115,9 +115,7 @@ elseif (APPLE) file(GLOB AWS_IO_OS_SRC "source/bsd/*.c" - "source/posix/pipe.c" - "source/posix/host_resolver.c" - "source/posix/shared_library.c" + "source/posix/*.c" "source/darwin/darwin_pki_utils.c" "source/darwin/secure_transport_tls_channel_handler.c" ) @@ -137,19 +135,14 @@ elseif (APPLE) if(AWS_USE_DISPATCH_QUEUE OR IOS) set(EVENT_LOOP_DEFINES "-DAWS_USE_DISPATCH_QUEUE" ) - message("use dispatch queue on Apple") + message("use dispatch queue") file(GLOB AWS_IO_DISPATCH_QUEUE_SRC "source/darwin/dispatch_queue_event_loop.c" "source/darwin/nw_socket.c" ) list(APPEND AWS_IO_OS_SRC ${AWS_IO_DISPATCH_QUEUE_SRC}) else () - file(GLOB AWS_KQUEUE_SRC - "source/posix/socket.c" - ) - message("use kqueue on Apple") - set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE") - list(APPEND AWS_IO_OS_SRC ${AWS_KQUEUE_SRC}) + set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE") endif() elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetBSD" OR CMAKE_SYSTEM_NAME STREQUAL "OpenBSD") @@ -258,4 +251,4 @@ if (NOT CMAKE_CROSSCOMPILING) if (BUILD_TESTING) add_subdirectory(tests) endif() -endif() \ No newline at end of file +endif() diff --git a/include/aws/io/private/dispatch_queue.h b/include/aws/io/private/dispatch_queue.h index 76175a041..a38d8de4f 100644 --- a/include/aws/io/private/dispatch_queue.h +++ b/include/aws/io/private/dispatch_queue.h @@ -57,6 +57,7 @@ struct dispatch_loop { } synced_data; bool wakeup_schedule_needed; + bool is_destroying; }; #endif /* #ifndef AWS_IO_PRIVATE_DISPATCH_QUEUE_H */ diff --git a/include/aws/io/socket.h b/include/aws/io/socket.h index 45b67d8ee..9a2e672ef 100644 --- a/include/aws/io/socket.h +++ b/include/aws/io/socket.h @@ -53,8 +53,9 @@ struct aws_socket_options { * This property is used to bind the socket to a particular network interface by name, such as eth0 and ens32. * If this is empty, the socket will not be bound to any interface and will use OS defaults. If the provided name * is invalid, `aws_socket_init()` will error out with AWS_IO_SOCKET_INVALID_OPTIONS. This option is only - * supported on Linux, macOS, and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It is not supported on - * Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms. + * supported on Linux, macOS(bsd socket), and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It is not + * supported on Windows and Apple Network Framework. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on + * unsupported platforms. */ char network_interface_name[AWS_NETWORK_INTERFACE_NAME_MAX]; }; diff --git a/source/channel.c b/source/channel.c index 55903fc1d..698b9e962 100644 --- a/source/channel.c +++ b/source/channel.c @@ -645,7 +645,9 @@ void aws_channel_schedule_task_future( } bool aws_channel_thread_is_callers_thread(struct aws_channel *channel) { - return aws_event_loop_thread_is_callers_thread(channel->loop); + if (channel && channel->loop) + return aws_event_loop_thread_is_callers_thread(channel->loop); + return true; } static void s_update_channel_slot_message_overheads(struct aws_channel *channel) { diff --git a/source/channel_bootstrap.c b/source/channel_bootstrap.c index 7969ba57b..6fb2a9461 100644 --- a/source/channel_bootstrap.c +++ b/source/channel_bootstrap.c @@ -642,7 +642,7 @@ static void s_attempt_connection(struct aws_task *task, void *arg, enum aws_task goto task_cancelled; } - struct aws_socket *outgoing_socket = aws_mem_acquire(allocator, sizeof(struct aws_socket)); + struct aws_socket *outgoing_socket = aws_mem_calloc(allocator, 1, sizeof(struct aws_socket)); if (aws_socket_init(outgoing_socket, allocator, &task_data->options)) { goto socket_init_failed; } diff --git a/source/darwin/dispatch_queue_event_loop.c b/source/darwin/dispatch_queue_event_loop.c index 237f60eb6..273532eb6 100644 --- a/source/darwin/dispatch_queue_event_loop.c +++ b/source/darwin/dispatch_queue_event_loop.c @@ -54,6 +54,7 @@ struct scheduled_service_entry { uint64_t timestamp; struct aws_linked_list_node node; struct aws_event_loop *loop; // might eventually need to be ref-counted for cleanup? + bool cancel; }; struct scheduled_service_entry *scheduled_service_entry_new(struct aws_event_loop *loop, uint64_t timestamp) { @@ -77,6 +78,7 @@ void scheduled_service_entry_destroy(struct scheduled_service_entry *entry) { aws_ref_count_release(&dispatch_loop->ref_count); aws_mem_release(entry->allocator, entry); + entry = NULL; } // checks to see if another scheduled iteration already exists that will either @@ -99,14 +101,13 @@ static void s_dispatch_event_loop_destroy(void *context) { struct aws_event_loop *event_loop = context; struct dispatch_loop *dispatch_loop = event_loop->impl_data; - AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy Dispatch Queue Event Loop.", (void *)event_loop); - aws_mutex_clean_up(&dispatch_loop->synced_data.lock); aws_string_destroy(dispatch_loop->dispatch_queue_id); aws_mem_release(dispatch_loop->allocator, dispatch_loop); aws_event_loop_clean_up_base(event_loop); aws_mem_release(event_loop->alloc, event_loop); + AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop); aws_thread_decrement_unjoined_count(); } @@ -198,9 +199,13 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options( static void s_destroy(struct aws_event_loop *event_loop) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop); - struct dispatch_loop *dispatch_loop = event_loop->impl_data; + if (dispatch_loop->is_destroying) { + return; + } + dispatch_loop->is_destroying = true; + /* make sure the loop is running so we can schedule a last task. */ s_run(event_loop); @@ -225,14 +230,16 @@ static void s_destroy(struct aws_event_loop *event_loop) { task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); } - while (!aws_linked_list_empty(&dispatch_loop->synced_data.scheduling_state.scheduled_services)) { - struct aws_linked_list_node *node = - aws_linked_list_pop_front(&dispatch_loop->synced_data.scheduling_state.scheduled_services); - struct scheduled_service_entry *entry = AWS_CONTAINER_OF(node, struct scheduled_service_entry, node); - scheduled_service_entry_destroy(entry); - } - aws_mutex_lock(&dispatch_loop->synced_data.lock); + // The entry in the scheduled_services are all pushed to dispatch loop as the function context. + // Apple does not allow NULL context here, do not destroy the entry until the block run. + struct aws_linked_list_node *iter = NULL; + for (iter = aws_linked_list_begin(&dispatch_loop->synced_data.scheduling_state.scheduled_services); + iter != aws_linked_list_end(&dispatch_loop->synced_data.scheduling_state.scheduled_services); + iter = aws_linked_list_next(iter)) { + struct scheduled_service_entry *entry = AWS_CONTAINER_OF(iter, struct scheduled_service_entry, node); + entry->cancel = true; + } dispatch_loop->synced_data.suspended = true; dispatch_loop->synced_data.is_executing = false; aws_mutex_unlock(&dispatch_loop->synced_data.lock); @@ -337,19 +344,23 @@ void end_iteration(struct scheduled_service_entry *entry) { } } - aws_mutex_unlock(&loop->synced_data.lock); scheduled_service_entry_destroy(entry); + aws_mutex_unlock(&loop->synced_data.lock); } // this function is what gets scheduled and executed by the Dispatch Queue API void run_iteration(void *context) { struct scheduled_service_entry *entry = context; struct aws_event_loop *event_loop = entry->loop; - if (event_loop == NULL) - return; struct dispatch_loop *dispatch_loop = event_loop->impl_data; + AWS_ASSERT(event_loop && dispatch_loop); + if (entry->cancel) { + scheduled_service_entry_destroy(entry); + return; + } if (!begin_iteration(entry)) { + scheduled_service_entry_destroy(entry); return; } diff --git a/source/darwin/nw_socket.c b/source/darwin/nw_socket.c index 781be26cc..4e8d1d5d5 100644 --- a/source/darwin/nw_socket.c +++ b/source/darwin/nw_socket.c @@ -76,30 +76,32 @@ enum socket_state { CLOSED, }; -struct nw_socket_listener_args { +struct nw_listener_connection_args { int error_code; struct aws_allocator *allocator; - struct aws_socket *socket; - struct aws_socket *new_socket; + struct nw_socket *nw_socket; + nw_connection_t new_connection; void *user_data; }; struct nw_socket_timeout_args { struct aws_task task; struct aws_allocator *allocator; - struct aws_socket *socket; + struct nw_socket *nw_socket; + bool connection_succeed; }; -struct nw_socket_readable_args { +struct nw_socket_scheduled_task_args { int error_code; struct aws_allocator *allocator; - struct aws_socket *socket; + struct nw_socket *nw_socket; + dispatch_data_t data; }; struct nw_socket_written_args { int error_code; struct aws_allocator *allocator; - struct aws_socket *socket; + struct nw_socket *nw_socket; aws_socket_on_write_completed_fn *written_fn; void *user_data; size_t bytes_written; @@ -107,7 +109,7 @@ struct nw_socket_written_args { struct nw_socket_cancel_task_args { struct aws_allocator *allocator; - struct aws_socket *socket; + struct nw_socket *nw_socket; struct aws_task *task_to_cancel; }; @@ -128,6 +130,12 @@ struct nw_socket { struct nw_socket_timeout_args *timeout_args; aws_socket_on_connection_result_fn *on_connection_result_fn; void *connect_accept_user_data; + + struct { + struct aws_mutex lock; + struct aws_event_loop *event_loop; + struct aws_socket *base_socket; + } synced_data; }; struct socket_address { @@ -176,7 +184,7 @@ static int s_setup_socket_params(struct nw_socket *nw_socket, const struct aws_s } } else if (options->type == AWS_SOCKET_DGRAM) { nw_socket->socket_options_to_params = - nw_parameters_create_secure_udp(NW_PARAMETERS_DEFAULT_CONFIGURATION, NW_PARAMETERS_DEFAULT_CONFIGURATION); + nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL, NW_PARAMETERS_DEFAULT_CONFIGURATION); } if (!nw_socket->socket_options_to_params) { @@ -243,6 +251,8 @@ static struct aws_socket_vtable s_vtable = { .socket_is_open_fn = s_socket_is_open_fn, }; +static void s_schedule_next_read(struct nw_socket *socket); + static void s_socket_cleanup_fn(struct aws_socket *socket) { if (!socket->impl) { /* protect from double clean */ @@ -257,9 +267,10 @@ static void s_socket_cleanup_fn(struct aws_socket *socket) { struct nw_socket *nw_socket = socket->impl; - // The cleanup of nw_connection_t will be handled in the nw_socket destroy + // The cleanup of nw_connection_t will be handled in the s_socket_impl_destroy + nw_socket->synced_data.base_socket = NULL; aws_ref_count_release(&nw_socket->ref_count); - + socket->impl = NULL; AWS_ZERO_STRUCT(*socket); } @@ -307,11 +318,9 @@ static void s_socket_impl_destroy(void *sock_ptr) { nw_socket->nw_listener = NULL; } - if (nw_socket->timeout_args) { - aws_mem_release(nw_socket->allocator, nw_socket->timeout_args); - } - + aws_mutex_clean_up(&nw_socket->synced_data.lock); aws_mem_release(nw_socket->allocator, nw_socket); + nw_socket = NULL; } @@ -319,6 +328,26 @@ int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, cons AWS_ASSERT(options); AWS_ZERO_STRUCT(*socket); + // Network Interface is not supported with Apple Network Framework yet + size_t network_interface_length = 0; + if (aws_secure_strlen(options->network_interface_name, AWS_NETWORK_INTERFACE_NAME_MAX, &network_interface_length)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: network_interface_name max length must be %d length and NULL terminated", + (void *)socket, + socket->io_handle.data.fd, + AWS_NETWORK_INTERFACE_NAME_MAX); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS); + } + if (network_interface_length != 0) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p fd=%d: network_interface_name is not supported on this platform.", + (void *)socket, + socket->io_handle.data.fd); + return aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED); + } + struct nw_socket *nw_socket = aws_mem_calloc(alloc, 1, sizeof(struct nw_socket)); nw_socket->allocator = alloc; @@ -329,6 +358,11 @@ int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, cons socket->vtable = &s_vtable; socket->event_loop_style = AWS_EVENT_LOOP_STYLE_COMPLETION_PORT_BASED; + aws_mutex_init(&nw_socket->synced_data.lock); + aws_mutex_lock(&nw_socket->synced_data.lock); + nw_socket->synced_data.base_socket = socket; + aws_mutex_unlock(&nw_socket->synced_data.lock); + if (s_setup_socket_params(nw_socket, options)) { aws_mem_release(alloc, nw_socket); return AWS_OP_ERR; @@ -352,186 +386,338 @@ static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_ (void)task; (void)status; - if (status == AWS_TASK_STATUS_CANCELED) { - // We will clean up the task and args on socket destory. - return; - } struct nw_socket_timeout_args *timeout_args = args; + struct nw_socket *nw_socket = timeout_args->nw_socket; AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task); + + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *socket = nw_socket->synced_data.base_socket; /* successful connection will have nulled out timeout_args->socket */ - if (timeout_args->socket) { + if (!timeout_args->connection_succeed && socket) { AWS_LOGF_ERROR( AWS_LS_IO_SOCKET, "id=%p handle=%p: timed out, shutting down.", - (void *)timeout_args->socket, - timeout_args->socket->io_handle.data.handle); + (void *)socket, + (void *)nw_socket->nw_connection); - timeout_args->socket->state = TIMEDOUT; + socket->state = TIMEDOUT; int error_code = AWS_IO_SOCKET_TIMEOUT; - struct nw_socket *socket_impl = timeout_args->socket->impl; - + if (status != AWS_TASK_STATUS_RUN_READY) { + error_code = AWS_IO_EVENT_LOOP_SHUTDOWN; + } aws_raise_error(error_code); - struct aws_socket *socket = timeout_args->socket; + + // Must set timeout_args to NULL to avoid double cancel. Clean up the timeout task + aws_mem_release(nw_socket->allocator, nw_socket->timeout_args); + nw_socket->timeout_args = NULL; aws_socket_close(socket); - socket_impl->on_connection_result_fn(socket, error_code, socket_impl->connect_accept_user_data); + nw_socket->on_connection_result_fn(socket, error_code, nw_socket->connect_accept_user_data); + } else { // else we simply clean up the timeout args + aws_mem_release(nw_socket->allocator, nw_socket->timeout_args); + nw_socket->timeout_args = NULL; } + + aws_mutex_unlock(&nw_socket->synced_data.lock); + aws_ref_count_release(&nw_socket->ref_count); + + // No need to release task, as task lives on timeout_args here. } static void s_process_readable_task(struct aws_task *task, void *arg, enum aws_task_status status) { - // TODO: WAHT IF THE TASK IS CANCELED??? (void)status; - struct nw_socket_readable_args *args = arg; + struct nw_socket_scheduled_task_args *readable_args = arg; + struct nw_socket *nw_socket = readable_args->nw_socket; + + if (status != AWS_TASK_STATUS_CANCELED) { + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *socket = nw_socket->synced_data.base_socket; + if (readable_args->error_code == AWS_IO_SOCKET_CLOSED) { + aws_socket_close(socket); + } - struct nw_socket *nw_socket = args->socket->impl; - if (nw_socket->on_readable) - nw_socket->on_readable(args->socket, args->error_code, nw_socket->on_readable_user_data); + if (socket && nw_socket->on_readable) { + if (readable_args->data) { + struct read_queue_node *node = aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct read_queue_node)); + node->allocator = nw_socket->allocator; + node->received_data = readable_args->data; + aws_linked_list_push_back(&nw_socket->read_queue, &node->node); + } + nw_socket->on_readable(socket, readable_args->error_code, nw_socket->on_readable_user_data); + } + aws_mutex_unlock(&nw_socket->synced_data.lock); + } - aws_mem_release(args->allocator, task); - aws_mem_release(args->allocator, args); + aws_ref_count_release(&nw_socket->ref_count); + aws_mem_release(readable_args->allocator, task); + aws_mem_release(readable_args->allocator, readable_args); } -static void s_schedule_on_readable(struct aws_socket *socket, int error_code) { - struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - ; - struct nw_socket_readable_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args)); +static void s_schedule_on_readable(struct nw_socket *nw_socket, int error_code, dispatch_data_t data) { + + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *socket = nw_socket->synced_data.base_socket; + if (socket && nw_socket->synced_data.event_loop) { + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - args->socket = socket; - args->allocator = socket->allocator; - args->error_code = error_code; + struct nw_socket_scheduled_task_args *args = + aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_scheduled_task_args)); - aws_task_init(task, s_process_readable_task, args, "readableTask"); + args->nw_socket = nw_socket; + args->allocator = nw_socket->allocator; + args->error_code = error_code; - aws_event_loop_schedule_task_now(socket->event_loop, task); + if (data) { + dispatch_retain(data); + args->data = data; + } + aws_ref_count_acquire(&nw_socket->ref_count); + + aws_task_init(task, s_process_readable_task, args, "readableTask"); + + aws_event_loop_schedule_task_now(nw_socket->synced_data.event_loop, task); + } + aws_mutex_unlock(&nw_socket->synced_data.lock); } static void s_process_connection_success_task(struct aws_task *task, void *arg, enum aws_task_status status) { - // TODO: WAHT IF THE TASK IS CANCELED??? - (void)status; - struct nw_socket_readable_args *args = arg; - struct nw_socket *nw_socket = args->socket->impl; - if (nw_socket->on_connection_result_fn) - nw_socket->on_connection_result_fn(args->socket, args->error_code, nw_socket->connect_accept_user_data); + struct nw_socket_scheduled_task_args *task_args = arg; + struct nw_socket *nw_socket = task_args->nw_socket; - aws_mem_release(args->allocator, task); - aws_mem_release(args->allocator, args); + if (status != AWS_TASK_STATUS_CANCELED) { + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *socket = nw_socket->synced_data.base_socket; + if (socket && nw_socket->on_connection_result_fn) + nw_socket->on_connection_result_fn(socket, task_args->error_code, nw_socket->connect_accept_user_data); + aws_mutex_unlock(&nw_socket->synced_data.lock); + } + + aws_ref_count_release(&nw_socket->ref_count); + aws_mem_release(task_args->allocator, task); + aws_mem_release(task_args->allocator, task_args); } -static void s_schedule_on_connection_success(struct aws_socket *socket, int error_code) { +static void s_schedule_on_connection_success(struct nw_socket *nw_socket, int error_code) { + + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *socket = nw_socket->synced_data.base_socket; + if (socket && nw_socket->synced_data.event_loop) { + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - ; - struct nw_socket_readable_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args)); + struct nw_socket_scheduled_task_args *args = + aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_scheduled_task_args)); - args->socket = socket; - args->allocator = socket->allocator; - args->error_code = error_code; + args->nw_socket = nw_socket; + args->allocator = socket->allocator; + args->error_code = error_code; + aws_ref_count_acquire(&nw_socket->ref_count); + aws_task_init(task, s_process_connection_success_task, args, "connectionSuccessTask"); + aws_event_loop_schedule_task_now(nw_socket->synced_data.event_loop, task); + } - aws_task_init(task, s_process_connection_success_task, args, "connectionSuccessTask"); - aws_event_loop_schedule_task_now(socket->event_loop, task); + aws_mutex_unlock(&nw_socket->synced_data.lock); } static void s_process_listener_success_task(struct aws_task *task, void *args, enum aws_task_status status) { - // TODO: WAHT IF THE TASK IS CANCELED??? - (void)status; - struct nw_socket_listener_args *listener_args = args; - - if (listener_args->socket->accept_result_fn) - listener_args->socket->accept_result_fn( - listener_args->socket, listener_args->error_code, listener_args->new_socket, listener_args->user_data); + struct nw_listener_connection_args *task_args = args; + struct aws_allocator *allocator = task_args->allocator; + struct nw_socket *listener_nw_socket = task_args->nw_socket; + + if (status == AWS_TASK_STATUS_RUN_READY) { + + if (listener_nw_socket) { + aws_mutex_lock(&listener_nw_socket->synced_data.lock); + struct aws_socket *listener = listener_nw_socket->synced_data.base_socket; + + struct aws_socket *new_socket = aws_mem_calloc(allocator, 1, sizeof(struct aws_socket)); + struct aws_socket_options options = listener->options; + int error = aws_socket_init(new_socket, allocator, &options); + if (error) { + aws_mem_release(allocator, new_socket); + nw_release(task_args->new_connection); + if (listener->accept_result_fn) { + listener->accept_result_fn(listener, task_args->error_code, NULL, task_args->user_data); + } + } else { + new_socket->io_handle.data.handle = task_args->new_connection; + + new_socket->io_handle.set_queue = s_client_set_dispatch_queue; + new_socket->io_handle.clear_queue = s_client_clear_dispatch_queue; + + nw_endpoint_t endpoint = nw_connection_copy_endpoint(task_args->new_connection); + const char *hostname = nw_endpoint_get_hostname(endpoint); + uint16_t port = nw_endpoint_get_port(endpoint); + + if (hostname != NULL) { + size_t hostname_len = strlen(hostname); + size_t buffer_size = AWS_ARRAY_SIZE(new_socket->remote_endpoint.address); + size_t to_copy = aws_min_size(hostname_len, buffer_size); + memcpy(new_socket->remote_endpoint.address, hostname, to_copy); + new_socket->remote_endpoint.port = port; + } + nw_release(endpoint); + + // Setup socket state to start read/write operations. + new_socket->state = CONNECTED_READ | CONNECTED_WRITE; + struct nw_socket *new_nw_socket = new_socket->impl; + new_nw_socket->nw_connection = task_args->new_connection; + new_nw_socket->setup_run = true; + new_nw_socket->currently_connected = true; + + AWS_LOGF_DEBUG( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: incoming connection", + (void *)listener, + listener->io_handle.data.handle); + + AWS_LOGF_INFO( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: connected to %s:%d, incoming handle %p", + (void *)listener, + listener->io_handle.data.handle, + new_socket->remote_endpoint.address, + new_socket->remote_endpoint.port, + new_socket->io_handle.data.handle); + + if (listener->accept_result_fn) { + listener->accept_result_fn(listener, task_args->error_code, new_socket, task_args->user_data); + } else // The connection is not sent to user, clean it up. The nw_connection should be released in + // socket clean up. + { + aws_socket_clean_up(new_socket); + } + } + aws_mutex_unlock(&listener_nw_socket->synced_data.lock); + } + } else { + // If the task is not scheduled, release the connection. + nw_release(task_args->new_connection); + } - aws_mem_release(listener_args->allocator, task); - aws_mem_release(listener_args->allocator, listener_args); + aws_ref_count_release(&listener_nw_socket->ref_count); + aws_mem_release(task_args->allocator, task); + aws_mem_release(task_args->allocator, task_args); } static void s_schedule_on_listener_success( - struct aws_socket *socket, + struct nw_socket *nw_socket, int error_code, - struct aws_socket *new_socket, + nw_connection_t new_connection, void *user_data) { - struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - ; - struct nw_socket_listener_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_readable_args)); + aws_mutex_lock(&nw_socket->synced_data.lock); + if (nw_socket->synced_data.base_socket && nw_socket->synced_data.event_loop) { + struct aws_task *task = aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct aws_task)); + + struct nw_listener_connection_args *args = + aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct nw_listener_connection_args)); + + args->nw_socket = nw_socket; + args->allocator = nw_socket->allocator; + args->error_code = error_code; + args->new_connection = new_connection; + args->user_data = user_data; - args->socket = socket; - args->allocator = socket->allocator; - args->error_code = error_code; - args->new_socket = new_socket; - args->user_data = user_data; + aws_ref_count_acquire(&nw_socket->ref_count); + nw_retain(new_connection); - aws_task_init(task, s_process_listener_success_task, args, "listenerSuccessTask"); - aws_event_loop_schedule_task_now(socket->event_loop, task); + aws_task_init(task, s_process_listener_success_task, args, "listenerSuccessTask"); + aws_event_loop_schedule_task_now(nw_socket->synced_data.event_loop, task); + } + aws_mutex_unlock(&nw_socket->synced_data.lock); } static void s_process_cancel_task(struct aws_task *task, void *arg, enum aws_task_status status) { - // TODO: WAHT IF THE TASK IS CANCELED??? (void)status; - struct nw_socket_cancel_task_args *args = arg; + struct nw_socket_cancel_task_args *cancel_args = arg; + struct nw_socket *nw_socket = cancel_args->nw_socket; - if (status == AWS_TASK_STATUS_RUN_READY) - aws_event_loop_cancel_task(args->socket->event_loop, args->task_to_cancel); + // The task is proceed in socket event loop. The event loop had to be avaliable. + AWS_ASSERT(nw_socket->synced_data.event_loop); - aws_mem_release(args->allocator, task); - aws_mem_release(args->allocator, args); + if (status == AWS_TASK_STATUS_RUN_READY) { + aws_event_loop_cancel_task(nw_socket->synced_data.event_loop, cancel_args->task_to_cancel); + } + + aws_ref_count_release(&nw_socket->ref_count); + aws_mem_release(cancel_args->allocator, task); + aws_mem_release(cancel_args->allocator, cancel_args); } // As cancel task has to run on the same thread & we dont have control on dispatch queue thread, // we always schedule the cancel task on event loop -static void s_schedule_cancel_task(struct aws_socket *socket, struct aws_task *task_to_cancel) { - - struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - ; - struct nw_socket_cancel_task_args *args = - aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_cancel_task_args)); - - args->socket = socket; - args->allocator = socket->allocator; - args->task_to_cancel = task_to_cancel; +static void s_schedule_cancel_task(struct nw_socket *nw_socket, struct aws_task *task_to_cancel) { + + aws_mutex_lock(&nw_socket->synced_data.lock); + if (nw_socket->synced_data.event_loop) { + struct aws_task *task = aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct aws_task)); + struct nw_socket_cancel_task_args *args = + aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct nw_socket_cancel_task_args)); + args->nw_socket = nw_socket; + args->allocator = nw_socket->allocator; + args->task_to_cancel = task_to_cancel; + aws_ref_count_acquire(&nw_socket->ref_count); + aws_task_init(task, s_process_cancel_task, args, "cancelTaskTask"); + // TODO DEBUG: + AWS_LOGF_DEBUG( + AWS_LS_COMMON_TASK_SCHEDULER, "id=%p: Schedule cancel %s task", (void *)task_to_cancel, task->type_tag); + aws_event_loop_schedule_task_now(nw_socket->synced_data.event_loop, task); + } - aws_task_init(task, s_process_cancel_task, args, "cancelTaskTask"); - aws_event_loop_schedule_task_now(socket->event_loop, task); + aws_mutex_unlock(&nw_socket->synced_data.lock); } -static void s_process_write_task(struct aws_task *task, void *arg, enum aws_task_status status) { - // TODO: WAHT IF THE TASK IS CANCELED??? +static void s_process_write_task(struct aws_task *task, void *args, enum aws_task_status status) { - (void)status; - struct nw_socket_written_args *args = arg; + struct nw_socket_written_args *task_args = args; + struct aws_allocator *allocator = task_args->allocator; + struct nw_socket *nw_socket = task_args->nw_socket; - if (args->written_fn) - args->written_fn(args->socket, args->error_code, args->bytes_written, args->user_data); + if (status != AWS_TASK_STATUS_CANCELED) { + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *socket = nw_socket->synced_data.base_socket; + if (task_args->written_fn) + task_args->written_fn(socket, task_args->error_code, task_args->bytes_written, task_args->user_data); + aws_mutex_unlock(&nw_socket->synced_data.lock); + } - aws_mem_release(args->allocator, task); - aws_mem_release(args->allocator, args); + aws_ref_count_release(&nw_socket->ref_count); + aws_mem_release(allocator, task); + aws_mem_release(allocator, task_args); } static void s_schedule_write_fn( - struct aws_socket *socket, + struct nw_socket *nw_socket, int error_code, size_t bytes_written, void *user_data, aws_socket_on_write_completed_fn *written_fn) { - struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); - ; - struct nw_socket_written_args *args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_written_args)); - - args->socket = socket; - args->allocator = socket->allocator; - args->error_code = error_code; - args->written_fn = written_fn; - args->user_data = user_data; - args->bytes_written = bytes_written; + aws_mutex_lock(&nw_socket->synced_data.lock); + if (nw_socket->synced_data.event_loop) { + struct aws_task *task = aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct aws_task)); + + struct nw_socket_written_args *args = + aws_mem_calloc(nw_socket->allocator, 1, sizeof(struct nw_socket_written_args)); + + args->nw_socket = nw_socket; + args->allocator = nw_socket->allocator; + args->error_code = error_code; + args->written_fn = written_fn; + args->user_data = user_data; + args->bytes_written = bytes_written; + aws_ref_count_acquire(&nw_socket->ref_count); + aws_task_init(task, s_process_write_task, args, "writtenTask"); + aws_event_loop_schedule_task_now(nw_socket->synced_data.event_loop, task); + } - aws_task_init(task, s_process_write_task, args, "writtenTask"); - aws_event_loop_schedule_task_now(socket->event_loop, task); + aws_mutex_unlock(&nw_socket->synced_data.lock); } static int s_socket_connect_fn( @@ -558,7 +744,8 @@ static int s_socket_connect_fn( return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); } } else { /* UDP socket */ - /* UDP sockets jump to CONNECT_READ if bind is called first */ + // Though UDP is a connectionless transport, but the network framework uses a connection based abstraction on + // top of the UDP layer. We should always do an "connect" action for Apple Network Framework. if (socket->state != CONNECTED_READ && socket->state != INIT) { return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE); } @@ -575,12 +762,12 @@ static int s_socket_connect_fn( int pton_err = 1; if (socket->options.domain == AWS_SOCKET_IPV4) { pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); - address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port); + address.sock_addr_types.addr_in.sin_port = htons((uint16_t)remote_endpoint->port); address.sock_addr_types.addr_in.sin_family = AF_INET; address.sock_addr_types.addr_in.sin_len = sizeof(struct sockaddr_in); } else if (socket->options.domain == AWS_SOCKET_IPV6) { pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); - address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port); + address.sock_addr_types.addr_in6.sin6_port = htons((uint16_t)remote_endpoint->port); address.sock_addr_types.addr_in6.sin6_family = AF_INET6; address.sock_addr_types.addr_in6.sin6_len = sizeof(struct sockaddr_in6); } else if (socket->options.domain == AWS_SOCKET_LOCAL) { @@ -651,9 +838,11 @@ static int s_socket_connect_fn( nw_socket->on_connection_result_fn = on_connection_result; nw_socket->connect_accept_user_data = user_data; + + AWS_ASSERT(socket->options.connect_timeout_ms); nw_socket->timeout_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_timeout_args)); - nw_socket->timeout_args->socket = socket; + nw_socket->timeout_args->nw_socket = nw_socket; nw_socket->timeout_args->allocator = socket->allocator; aws_task_init( @@ -662,6 +851,10 @@ static int s_socket_connect_fn( nw_socket->timeout_args, "NWSocketConnectionTimeoutTask"); + aws_mutex_lock(&nw_socket->synced_data.lock); + nw_socket->synced_data.event_loop = event_loop; + aws_mutex_unlock(&nw_socket->synced_data.lock); + /* set a handler for socket state changes. This is where we find out if the connection timed out, was successful, * was disconnected etc .... */ nw_connection_set_state_changed_handler( @@ -697,11 +890,15 @@ static int s_socket_connect_fn( socket->local_endpoint.address, port); // Cancel the connection timeout task - s_schedule_cancel_task(socket, &nw_socket->timeout_args->task); + if (nw_socket->timeout_args) { + nw_socket->timeout_args->connection_succeed = true; + s_schedule_cancel_task(nw_socket, &nw_socket->timeout_args->task); + } socket->state = CONNECTED_WRITE | CONNECTED_READ; nw_socket->setup_run = true; aws_ref_count_acquire(&nw_socket->ref_count); - s_schedule_on_connection_success(socket, AWS_OP_SUCCESS); + s_schedule_on_connection_success(nw_socket, AWS_OP_SUCCESS); + s_schedule_next_read(nw_socket); aws_ref_count_release(&nw_socket->ref_count); } else if (error) { @@ -714,44 +911,42 @@ static int s_socket_connect_fn( socket->io_handle.data.handle, error_code); // Cancel the connection timeout task - s_schedule_cancel_task(socket, &nw_socket->timeout_args->task); - /* we don't let this thing do DNS or TLS. Everything had better be a posix error. */ - // AWS_ASSERT(nw_error_get_error_domain(error) == nw_error_domain_posix); - // DEBUG WIP we do in fact allow this to do TLS + if (nw_socket->timeout_args) { + nw_socket->timeout_args->connection_succeed = true; + s_schedule_cancel_task(nw_socket, &nw_socket->timeout_args->task); + } error_code = s_determine_socket_error(error_code); nw_socket->last_error = error_code; aws_raise_error(error_code); socket->state = ERROR; - aws_ref_count_acquire(&nw_socket->ref_count); if (!nw_socket->setup_run) { - s_schedule_on_connection_success(socket, error_code); + s_schedule_on_connection_success(nw_socket, error_code); nw_socket->setup_run = true; } else if (socket->readable_fn) { - s_schedule_on_readable(socket, nw_socket->last_error); + s_schedule_on_readable(nw_socket, nw_socket->last_error, NULL); } - - aws_ref_count_release(&nw_socket->ref_count); } else if (state == nw_connection_state_cancelled || state == nw_connection_state_failed) { /* this should only hit when the socket was closed by not us. Note, * we uninstall this handler right before calling close on the socket so this shouldn't * get hit unless it was triggered remotely */ // Cancel the connection timeout task - s_schedule_cancel_task(socket, &nw_socket->timeout_args->task); + if (nw_socket->timeout_args) { + nw_socket->timeout_args->connection_succeed = true; + s_schedule_cancel_task(nw_socket, &nw_socket->timeout_args->task); + } AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed remotely.", (void *)socket, socket->io_handle.data.handle); socket->state = CLOSED; - aws_ref_count_acquire(&nw_socket->ref_count); aws_raise_error(AWS_IO_SOCKET_CLOSED); if (!nw_socket->setup_run) { - s_schedule_on_connection_success(socket, AWS_IO_SOCKET_CLOSED); + s_schedule_on_connection_success(nw_socket, AWS_IO_SOCKET_CLOSED); nw_socket->setup_run = true; } else if (socket->readable_fn) { - s_schedule_on_readable(socket, AWS_IO_SOCKET_CLOSED); + s_schedule_on_readable(nw_socket, AWS_IO_SOCKET_CLOSED, NULL); } - aws_ref_count_release(&nw_socket->ref_count); } else if (state == nw_connection_state_waiting) { AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, @@ -790,6 +985,8 @@ static int s_socket_connect_fn( socket->io_handle.data.handle, (unsigned long long)timeout); nw_socket->timeout_args->task.timestamp = timeout; + // Acquire a nwsocket for the timeout task + aws_ref_count_acquire(&nw_socket->ref_count); aws_event_loop_schedule_task_future(event_loop, &nw_socket->timeout_args->task, timeout); return AWS_OP_SUCCESS; @@ -816,12 +1013,12 @@ static int s_socket_bind_fn(struct aws_socket *socket, const struct aws_socket_e int pton_err = 1; if (socket->options.domain == AWS_SOCKET_IPV4) { pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr); - address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port); + address.sock_addr_types.addr_in.sin_port = htons((uint16_t)local_endpoint->port); address.sock_addr_types.addr_in.sin_family = AF_INET; address.sock_addr_types.addr_in.sin_len = sizeof(struct sockaddr_in); } else if (socket->options.domain == AWS_SOCKET_IPV6) { pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr); - address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port); + address.sock_addr_types.addr_in6.sin6_port = htons((uint16_t)local_endpoint->port); address.sock_addr_types.addr_in6.sin6_family = AF_INET6; address.sock_addr_types.addr_in6.sin6_len = sizeof(struct sockaddr_in6); } else if (socket->options.domain == AWS_SOCKET_LOCAL) { @@ -853,9 +1050,7 @@ static int s_socket_bind_fn(struct aws_socket *socket, const struct aws_socket_e nw_parameters_set_local_endpoint(nw_socket->socket_options_to_params, endpoint); nw_release(endpoint); - // DEBUG WIP: - // Though UDP is a connectionless transport, but the network framework uses a connection based abstraction on top of the UDP layer. - // We should always do an abatract "connection" action for Apple Network Framework + // Apple network framework requires connection besides bind. socket->state = BOUND; AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p: successfully bound", (void *)socket); @@ -904,6 +1099,24 @@ static int s_socket_listen_fn(struct aws_socket *socket, int backlog_size) { return AWS_OP_SUCCESS; } +static void s_process_set_listener_endpoint_task(struct aws_task *task, void *arg, enum aws_task_status status) { + struct nw_socket_scheduled_task_args *readable_args = arg; + struct nw_socket *nw_socket = readable_args->nw_socket; + + aws_mutex_lock(&nw_socket->synced_data.lock); + struct aws_socket *aws_socket = nw_socket->synced_data.base_socket; + if (aws_socket && status == AWS_TASK_STATUS_RUN_READY) { + if (nw_socket->is_listener) { + aws_socket->local_endpoint.port = nw_listener_get_port(nw_socket->nw_listener); + } + } + aws_mutex_unlock(&nw_socket->synced_data.lock); + + aws_ref_count_release(&nw_socket->ref_count); + aws_mem_release(readable_args->allocator, task); + aws_mem_release(readable_args->allocator, arg); +} + static int s_socket_start_accept_fn( struct aws_socket *socket, struct aws_event_loop *accept_loop, @@ -935,7 +1148,11 @@ static int s_socket_start_accept_fn( socket->event_loop = accept_loop; socket->accept_result_fn = on_accept_result; socket->connect_accept_user_data = user_data; - struct aws_allocator *allocator = socket->allocator; + + struct nw_socket *nw_socket = socket->impl; + aws_mutex_lock(&nw_socket->synced_data.lock); + nw_socket->synced_data.event_loop = accept_loop; + aws_mutex_unlock(&nw_socket->synced_data.lock); nw_listener_set_state_changed_handler( socket->io_handle.data.handle, ^(nw_listener_state_t state, nw_error_t error) { @@ -966,7 +1183,21 @@ static int s_socket_start_accept_fn( AWS_LS_IO_SOCKET, "id=%p handle=%p: lisnter on port ready ", (void *)socket, - socket->io_handle.data.handle); + (void *)nw_socket->nw_connection); + + struct aws_task *task = aws_mem_calloc(socket->allocator, 1, sizeof(struct aws_task)); + + struct nw_socket_scheduled_task_args *args = + aws_mem_calloc(socket->allocator, 1, sizeof(struct nw_socket_scheduled_task_args)); + + args->nw_socket = nw_socket; + args->allocator = nw_socket->allocator; + // acquire ref count for the task + aws_ref_count_acquire(&nw_socket->ref_count); + + aws_task_init(task, s_process_set_listener_endpoint_task, args, "listenerSuccessTask"); + aws_event_loop_schedule_task_now(socket->event_loop, task); + } else if (state == nw_listener_state_cancelled) { AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, @@ -977,58 +1208,7 @@ static int s_socket_start_accept_fn( }); nw_listener_set_new_connection_handler(socket->io_handle.data.handle, ^(nw_connection_t connection) { - /* invoked upon an incoming connection. In BSD/Posix land this is the result of an - * accept() call. */ - - AWS_LOGF_DEBUG( - AWS_LS_IO_SOCKET, "id=%p handle=%p: incoming connection", (void *)socket, socket->io_handle.data.handle); - - struct aws_socket *new_socket = aws_mem_calloc(allocator, 1, sizeof(struct aws_socket)); - - struct aws_socket_options options = socket->options; - int error = aws_socket_init(new_socket, allocator, &options); - if(error) - { - aws_mem_release(allocator, new_socket); - s_schedule_on_listener_success(socket, aws_last_error(), NULL, user_data); - return; - } - new_socket->state = CONNECTED_READ | CONNECTED_WRITE; - new_socket->io_handle.data.handle = connection; - // The connection would be released in socket destroy. - nw_retain(connection); - new_socket->io_handle.set_queue = s_client_set_dispatch_queue; - new_socket->io_handle.clear_queue = s_client_clear_dispatch_queue; - - nw_endpoint_t endpoint = nw_connection_copy_endpoint(connection); - const char *hostname = nw_endpoint_get_hostname(endpoint); - uint16_t port = nw_endpoint_get_port(endpoint); - - if (hostname != NULL) { - size_t hostname_len = strlen(hostname); - size_t buffer_size = AWS_ARRAY_SIZE(new_socket->remote_endpoint.address); - size_t to_copy = aws_min_size(hostname_len, buffer_size); - memcpy(new_socket->remote_endpoint.address, hostname, to_copy); - new_socket->remote_endpoint.port = port; - } - nw_release(endpoint); - - // Setup socket state to start read/write operations. - new_socket->state = CONNECTED_READ | CONNECTED_WRITE; - struct nw_socket *new_nw_socket = new_socket->impl; - new_nw_socket->nw_connection = connection; - new_nw_socket->setup_run = true; - new_nw_socket->currently_connected = true; - - AWS_LOGF_INFO( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: connected to %s:%d, incoming handle %p", - (void *)socket, - socket->io_handle.data.handle, - new_socket->remote_endpoint.address, - new_socket->remote_endpoint.port, - new_socket->io_handle.data.handle); - s_schedule_on_listener_success(socket, AWS_OP_SUCCESS, new_socket, user_data); + s_schedule_on_listener_success(nw_socket, AWS_OP_SUCCESS, connection, user_data); }); nw_listener_start(socket->io_handle.data.handle); return AWS_OP_SUCCESS; @@ -1059,6 +1239,13 @@ static int s_socket_close_fn(struct aws_socket *socket) { struct nw_socket *nw_socket = socket->impl; AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p handle=%p: closing", (void *)socket, socket->io_handle.data.handle); + // The timeout_args only setup for connected client connections. + if (!nw_socket->is_listener && nw_socket->timeout_args && nw_socket->currently_connected) { + // if the timeout args is not triggered, cancel it and clean up + nw_socket->timeout_args->connection_succeed = true; + s_schedule_cancel_task(nw_socket, &nw_socket->timeout_args->task); + } + /* disable the handlers. We already know it closed and don't need pointless use-after-free event/async hell*/ if (nw_socket->is_listener) { nw_listener_set_state_changed_handler(socket->io_handle.data.handle, NULL); @@ -1069,6 +1256,7 @@ static int s_socket_close_fn(struct aws_socket *socket) { nw_connection_cancel(socket->io_handle.data.handle); } nw_socket->currently_connected = false; + socket->state = CLOSED; return AWS_OP_SUCCESS; } @@ -1076,7 +1264,10 @@ static int s_socket_close_fn(struct aws_socket *socket) { static int s_socket_shutdown_dir_fn(struct aws_socket *socket, enum aws_channel_direction dir) { // DEBUG WIP does this need implementation? (void)dir; - return s_socket_close_fn(socket); + AWS_ASSERT(true); + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, "id=%p: shutdown by direction is not support for Apple network framework.", (void *)socket); + return aws_raise_error(AWS_IO_SOCKET_INVALID_OPERATION_FOR_TYPE); } static int s_socket_set_options_fn(struct aws_socket *socket, const struct aws_socket_options *options) { @@ -1118,10 +1309,17 @@ static int s_socket_assign_to_event_loop_fn(struct aws_socket *socket, struct aw socket->io_handle.data.handle, (void *)event_loop); socket->event_loop = event_loop; + + struct nw_socket *nw_socket = socket->impl; + // aws_mutex_lock(&nw_socket->synced_data.lock); + nw_socket->synced_data.event_loop = event_loop; + if (!aws_event_loop_connect_handle_to_completion_port(event_loop, &socket->io_handle)) { nw_connection_start(socket->io_handle.data.handle); + aws_mutex_unlock(&nw_socket->synced_data.lock); return AWS_OP_SUCCESS; } + // aws_mutex_unlock(&nw_socket->synced_data.lock); return AWS_OP_ERR; } @@ -1131,18 +1329,15 @@ static int s_socket_assign_to_event_loop_fn(struct aws_socket *socket, struct aw /* sockets need to emulate edge-triggering. When we find that we've read all of our buffers or we preemptively know * we're going to want more notifications, we schedule a read. That read, upon occuring gets queued into an internal * buffer to then be vended upon a call to aws_socket_read() */ -static void s_schedule_next_read(struct aws_socket *socket) { - struct nw_socket *nw_socket = socket->impl; - - struct aws_allocator *allocator = socket->allocator; - struct aws_linked_list *list = &nw_socket->read_queue; +static void s_schedule_next_read(struct nw_socket *nw_socket) { + struct aws_socket *socket = nw_socket->synced_data.base_socket; if (!(socket->state & CONNECTED_READ)) { AWS_LOGF_ERROR( AWS_LS_IO_SOCKET, "id=%p handle=%p: cannot read to because it is not connected", - (void *)socket, - socket->io_handle.data.handle); + (void *)nw_socket, + (void *)nw_socket->nw_connection); aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); return; } @@ -1157,31 +1352,37 @@ static void s_schedule_next_read(struct aws_socket *socket) { UINT32_MAX, ^(dispatch_data_t data, nw_content_context_t context, bool is_complete, nw_error_t error) { (void)context; - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p handle=%p: read cb invoked", (void *)socket, socket->io_handle.data.handle); if (!nw_socket->currently_connected) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); aws_raise_error(AWS_IO_SOCKET_CLOSED); } else if (!error || nw_error_get_error_code(error) == 0) { if (data) { - struct read_queue_node *node = aws_mem_calloc(allocator, 1, sizeof(struct read_queue_node)); - node->allocator = allocator; - node->received_data = data; - dispatch_retain(data); - aws_linked_list_push_back(list, &node->node); + // struct read_queue_node *node = aws_mem_calloc(allocator, 1, sizeof(struct read_queue_node)); + // node->allocator = allocator; + // node->received_data = data; + // aws_linked_list_push_back(list, &node->node); AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p handle=%p: queued read buffer of size %d", - (void *)socket, - socket->io_handle.data.handle, + (void *)nw_socket, + (void *)nw_socket->nw_connection, (int)dispatch_data_get_size(data)); - s_schedule_on_readable(socket, AWS_ERROR_SUCCESS); + s_schedule_on_readable(nw_socket, AWS_ERROR_SUCCESS, data); } if (!is_complete) { - s_schedule_next_read(socket); + s_schedule_next_read(nw_socket); + } else { + if (socket->options.type != AWS_SOCKET_DGRAM) { + // the message is complete socket the socket + AWS_LOGF_TRACE( + AWS_LS_IO_SOCKET, + "id=%p handle=%p:complete hange up ", + (void *)socket, + socket->io_handle.data.handle); + aws_raise_error(AWS_IO_SOCKET_CLOSED); + s_schedule_on_readable(nw_socket, AWS_IO_SOCKET_CLOSED, NULL); + } } } else { int error_code = s_determine_socket_error(nw_error_get_error_code(error)); @@ -1193,8 +1394,7 @@ static void s_schedule_next_read(struct aws_socket *socket) { (void *)socket, socket->io_handle.data.handle, error_code); - - s_schedule_on_readable(socket, error_code); + s_schedule_on_readable(nw_socket, error_code, NULL); } aws_ref_count_release(&nw_socket->ref_count); }); @@ -1206,13 +1406,23 @@ static int s_socket_subscribe_to_readable_events_fn( void *user_data) { struct nw_socket *nw_socket = socket->impl; + socket->readable_user_data = user_data; + socket->readable_fn = on_readable; + + // nw_socket is ref counted. It is possible that the aws_socket object + // is released while nw_socket is still alive an processing events. + // Store the function on nw_socket to avoid bad access after the + // aws_socket is released. + // DEBUG: test to check if this could be removed...? nw_socket->on_readable = on_readable; nw_socket->on_readable_user_data = user_data; - s_schedule_next_read(socket); + s_schedule_next_read(nw_socket); return AWS_OP_SUCCESS; } +// WARNING: This function should never lock!!!! aws_socket_read() should always called on event loop thread, +// which means we already acquire a necessary lock there. static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read_buffer, size_t *amount_read) { struct nw_socket *nw_socket = socket->impl; @@ -1228,15 +1438,6 @@ static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY); } - if (!(socket->state & CONNECTED_READ)) { - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: cannot read because it is not connected", - (void *)socket, - socket->io_handle.data.handle); - return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED); - } - __block size_t max_to_read = read_buffer->capacity - read_buffer->len; /* if empty, schedule a read and return WOULD_BLOCK */ @@ -1246,8 +1447,18 @@ static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read "id=%p handle=%p: read queue is empty, scheduling another read", (void *)socket, socket->io_handle.data.handle); + + if (!(socket->state & CONNECTED_READ)) { + AWS_LOGF_ERROR( + AWS_LS_IO_SOCKET, + "id=%p handle=%p: socket is not connected to read.", + (void *)socket, + socket->io_handle.data.handle); + return aws_raise_error(AWS_IO_SOCKET_CLOSED); + } + if (!nw_socket->read_queued) { - s_schedule_next_read(socket); + s_schedule_next_read(nw_socket); nw_socket->read_queued = true; } *amount_read = 0; @@ -1262,25 +1473,26 @@ static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read struct aws_linked_list_node *node = aws_linked_list_front(&nw_socket->read_queue); struct read_queue_node *read_node = AWS_CONTAINER_OF(node, struct read_queue_node, node); + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p, [DEBUG READ DATA] start processing node ", (void *)node); bool read_completed = dispatch_data_apply( read_node->received_data, (dispatch_data_applier_t) ^ (dispatch_data_t region, size_t offset, const void *buffer, size_t size) { + (void)region; (void)offset; size_t to_copy = aws_min_size(max_to_read, size - read_node->current_offset); - aws_byte_buf_write(read_buffer, (const uint8_t *)buffer, to_copy); - if (to_copy < size) { - dispatch_retain(region); - read_node->current_offset = size - to_copy; - return false; - } - + aws_byte_buf_write(read_buffer, (const uint8_t *)buffer + read_node->current_offset, to_copy); max_to_read -= to_copy; *amount_read += to_copy; - read_node->current_offset = 0; - return true; + read_node->current_offset += to_copy; + if (read_node->current_offset == size) { + read_node->current_offset = 0; + return true; + } + return false; }); if (read_completed) { + AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p, [DEBUG READ DATA] clean up the node", (void *)node); aws_linked_list_remove(node); s_clean_up_read_queue_node(read_node); } @@ -1294,7 +1506,7 @@ static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read } /* keep replacing buffers */ - s_schedule_next_read(socket); + s_schedule_next_read(nw_socket); return AWS_OP_SUCCESS; } @@ -1317,34 +1529,23 @@ static int s_socket_write_fn( } struct nw_socket *nw_socket = socket->impl; - aws_ref_count_acquire(&nw_socket->ref_count); AWS_ASSERT(written_fn); dispatch_data_t data = dispatch_data_create(cursor->ptr, cursor->len, NULL, DISPATCH_DATA_DESTRUCTOR_DEFAULT); + aws_ref_count_acquire(&nw_socket->ref_count); + nw_connection_send( socket->io_handle.data.handle, data, _nw_content_context_default_message, true, ^(nw_error_t error) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: processing write requests, called from aws_socket_write", - (void *)socket, - socket->io_handle.data.handle); - if (!nw_socket->currently_connected) { - AWS_LOGF_TRACE( - AWS_LS_IO_SOCKET, "id=%p handle=%p: socket closed", (void *)socket, socket->io_handle.data.handle); - // As the socket is not open, we no longer have access to the event loop to schedule tasks - // directly execute the written callback instead of scheduling a task. - written_fn(socket, 0, 0, user_data); + // As the socket is closed, we dont put the callback on event loop to schedule tasks. + // Directly execute the written callback instead of scheduling a task. At this moment, + // we no longer has access to socket either. + + s_schedule_write_fn(nw_socket, 0, 0, user_data, written_fn); goto nw_socket_release; } - AWS_LOGF_ERROR( - AWS_LS_IO_SOCKET, - "id=%p handle=%p: DEBUG:: callback writing message: %p", - (void *)socket, - socket->io_handle.data.handle, - user_data); int error_code = !error || nw_error_get_error_code(error) == 0 ? AWS_OP_SUCCESS : s_determine_socket_error(nw_error_get_error_code(error)); @@ -1367,7 +1568,7 @@ static int s_socket_write_fn( (void *)socket, socket->io_handle.data.handle, (int)written_size); - s_schedule_write_fn(socket, error_code, !error_code ? written_size : 0, user_data, written_fn); + s_schedule_write_fn(nw_socket, error_code, !error_code ? written_size : 0, user_data, written_fn); nw_socket_release: aws_ref_count_release(&nw_socket->ref_count); }); @@ -1398,6 +1599,7 @@ void aws_socket_endpoint_init_local_address_for_test(struct aws_socket_endpoint AWS_FATAL_ASSERT(aws_uuid_to_str(&uuid, &uuid_buf) == AWS_OP_SUCCESS); snprintf(endpoint->address, sizeof(endpoint->address), "testsock" PRInSTR ".local", AWS_BYTE_BUF_PRI(uuid_buf)); } + int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) { if (socket->local_endpoint.address[0] == 0) { AWS_LOGF_ERROR( diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c4db357ec..7c0b89682 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -62,7 +62,6 @@ add_net_test_case(tcp_socket_communication) add_net_test_case(udp_socket_communication) add_net_test_case(test_socket_with_bind_to_interface) add_net_test_case(test_socket_with_bind_to_invalid_interface) -add_test_case(udp_bind_connect_communication) add_net_test_case(connect_timeout) add_net_test_case(connect_timeout_cancelation) @@ -74,17 +73,24 @@ endif() add_test_case(outgoing_local_sock_errors) add_test_case(outgoing_tcp_sock_error) add_test_case(incoming_tcp_sock_errors) -add_test_case(incoming_duplicate_tcp_bind_errors) add_net_test_case(bind_on_zero_port_tcp_ipv4) add_net_test_case(bind_on_zero_port_udp_ipv4) add_test_case(incoming_udp_sock_errors) -add_test_case(wrong_thread_read_write_fails) add_net_test_case(cleanup_before_connect_or_timeout_doesnt_explode) add_test_case(cleanup_in_accept_doesnt_explode) add_test_case(cleanup_in_write_cb_doesnt_explode) add_test_case(sock_write_cb_is_async) add_test_case(socket_validate_port) +if(NOT AWS_USE_DISPATCH_QUEUE) +# The read/write will always run a different thread for Apple Network Framework +add_test_case(wrong_thread_read_write_fails) +# Apple Network Framework does not support bind+connect +add_test_case(udp_bind_connect_communication) +add_test_case(incoming_duplicate_tcp_bind_errors) +endif() + + if(WIN32) add_test_case(local_socket_pipe_connected_race) endif() diff --git a/tests/channel_test.c b/tests/channel_test.c index 9a730a351..e63bc514b 100644 --- a/tests/channel_test.c +++ b/tests/channel_test.c @@ -837,6 +837,9 @@ static int s_test_channel_connect_some_hosts_timeout(struct aws_allocator *alloc aws_io_library_clean_up(); + // DEBUG WIP, sleep to wait for reference release + aws_thread_current_sleep(1000000000); + return 0; } diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index 85c7c7c39..25ad338a1 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -366,6 +366,13 @@ static int s_local_server_tester_init( /* find out which port the socket is bound to */ ASSERT_SUCCESS(aws_socket_get_bound_address(tester->listener, &tester->endpoint)); + // Apple Dispatch Queue requires a listener to be ready before it can get the assigned port. We wait until the + // port is back. Not sure if there is a better way to work around it. Probably start_listener need a callback + // function to process the updated the endpoint and host. + while (tester->endpoint.port == 0 && tester->socket_options.domain != AWS_SOCKET_LOCAL) { + ASSERT_SUCCESS(aws_socket_get_bound_address(tester->listener, &tester->endpoint)); + } + return AWS_OP_SUCCESS; } diff --git a/tests/socket_test.c b/tests/socket_test.c index 5b4000855..2dbd6902b 100644 --- a/tests/socket_test.c +++ b/tests/socket_test.c @@ -29,6 +29,9 @@ static bool s_use_dispatch_queue = true; static bool s_use_dispatch_queue = false; #endif +#define NANOS_PER_SEC ((uint64_t)AWS_TIMESTAMP_NANOS) +#define TIMEOUT (10 * NANOS_PER_SEC) + struct local_listener_args { struct aws_socket *incoming; struct aws_mutex *mutex; @@ -401,7 +404,186 @@ static int s_test_socket_ex( aws_event_loop_destroy(event_loop); // DEBUG WIP, sleep to wait for reference release - aws_thread_current_sleep(3000000000); + aws_thread_current_sleep(1000000000); + + return 0; +} + +static int s_test_socket_udp_dispatch_queue( + struct aws_allocator *allocator, + struct aws_socket_options *options, + struct aws_socket_endpoint *endpoint) { + struct aws_event_loop *event_loop = aws_event_loop_new_default(allocator, aws_high_res_clock_get_ticks); + + ASSERT_NOT_NULL(event_loop, "Event loop creation failed with error: %s", aws_error_debug_str(aws_last_error())); + ASSERT_SUCCESS(aws_event_loop_run(event_loop)); + + struct aws_mutex mutex = AWS_MUTEX_INIT; + struct aws_condition_variable condition_variable = AWS_CONDITION_VARIABLE_INIT; + + struct local_listener_args listener_args = { + .mutex = &mutex, + .condition_variable = &condition_variable, + .incoming = NULL, + .incoming_invoked = false, + .error_invoked = false, + }; + + struct aws_socket listener; + ASSERT_SUCCESS(aws_socket_init(&listener, allocator, options)); + + ASSERT_SUCCESS(aws_socket_bind(&listener, endpoint)); + + struct aws_socket_endpoint bound_endpoint; + ASSERT_SUCCESS(aws_socket_get_bound_address(&listener, &bound_endpoint)); + ASSERT_INT_EQUALS(endpoint->port, bound_endpoint.port); + ASSERT_STR_EQUALS(endpoint->address, bound_endpoint.address); + + ASSERT_SUCCESS(aws_socket_listen(&listener, 1024)); + ASSERT_SUCCESS(aws_socket_start_accept(&listener, event_loop, s_local_listener_incoming, &listener_args)); + // DEBUG WIP, sleep to wait for reference release + aws_thread_current_sleep(1000000000); + + struct local_outgoing_args outgoing_args = { + .mutex = &mutex, .condition_variable = &condition_variable, .connect_invoked = false, .error_invoked = false}; + + struct aws_socket outgoing; + ASSERT_SUCCESS(aws_socket_init(&outgoing, allocator, options)); + ASSERT_SUCCESS(aws_socket_connect(&outgoing, endpoint, event_loop, s_local_outgoing_connection, &outgoing_args)); + + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &condition_variable, &mutex, s_connection_completed_predicate, &outgoing_args)); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + + ASSERT_SUCCESS(aws_socket_assign_to_event_loop(&listener, event_loop)); + aws_socket_subscribe_to_readable_events(&outgoing, s_on_readable, NULL); + + /* now test the read and write across the connection. */ + const char read_data[] = "I'm a little teapot"; + char write_data[sizeof(read_data)] = {0}; + + struct aws_byte_buf read_buffer = aws_byte_buf_from_array((const uint8_t *)read_data, sizeof(read_data)); + struct aws_byte_buf write_buffer = aws_byte_buf_from_array((const uint8_t *)write_data, sizeof(write_data)); + write_buffer.len = 0; + + struct aws_byte_cursor read_cursor = aws_byte_cursor_from_buf(&read_buffer); + + struct socket_io_args io_args = { + .socket = &outgoing, + .to_write = &read_cursor, + .to_read = &read_buffer, + .read_data = &write_buffer, + .mutex = &mutex, + .amount_read = 0, + .amount_written = 0, + .error_code = 0, + .condition_variable = AWS_CONDITION_VARIABLE_INIT, + .close_completed = false, + }; + + struct aws_task write_task = { + .fn = s_write_task, + .arg = &io_args, + }; + + aws_event_loop_schedule_task_now(event_loop, &write_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_write_completed_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + ASSERT_INT_EQUALS(AWS_OP_SUCCESS, io_args.error_code); + + if (listener.options.type == AWS_SOCKET_STREAM || s_use_dispatch_queue) { + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + ASSERT_SUCCESS( + aws_condition_variable_wait_pred(&condition_variable, &mutex, s_incoming_predicate, &listener_args)); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + } + + ASSERT_TRUE(listener_args.incoming_invoked); + ASSERT_FALSE(listener_args.error_invoked); + struct aws_socket *server_sock = listener_args.incoming; + ASSERT_TRUE(outgoing_args.connect_invoked); + ASSERT_FALSE(outgoing_args.error_invoked); + ASSERT_INT_EQUALS(options->domain, listener_args.incoming->options.domain); + ASSERT_INT_EQUALS(options->type, listener_args.incoming->options.type); + ASSERT_SUCCESS(aws_socket_assign_to_event_loop(server_sock, event_loop)); + + aws_socket_subscribe_to_readable_events(server_sock, s_on_readable, NULL); + + io_args.socket = server_sock; + struct aws_task read_task = { + .fn = s_read_task, + .arg = &io_args, + }; + + aws_event_loop_schedule_task_now(event_loop, &read_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_read_task_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + ASSERT_INT_EQUALS(AWS_OP_SUCCESS, io_args.error_code); + ASSERT_BIN_ARRAYS_EQUALS(read_buffer.buffer, read_buffer.len, write_buffer.buffer, write_buffer.len); + + memset((void *)write_data, 0, sizeof(write_data)); + write_buffer.len = 0; + + io_args.error_code = 0; + io_args.amount_read = 0; + io_args.amount_written = 0; + io_args.socket = server_sock; + aws_event_loop_schedule_task_now(event_loop, &write_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_write_completed_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + ASSERT_INT_EQUALS(AWS_OP_SUCCESS, io_args.error_code); + + io_args.socket = &outgoing; + aws_event_loop_schedule_task_now(event_loop, &read_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_read_task_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + ASSERT_INT_EQUALS(AWS_OP_SUCCESS, io_args.error_code); + ASSERT_BIN_ARRAYS_EQUALS(read_buffer.buffer, read_buffer.len, write_buffer.buffer, write_buffer.len); + + struct aws_task close_task = { + .fn = s_socket_close_task, + .arg = &io_args, + }; + + if (listener_args.incoming) { + io_args.socket = listener_args.incoming; + io_args.close_completed = false; + aws_event_loop_schedule_task_now(event_loop, &close_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_close_completed_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + + aws_socket_clean_up(listener_args.incoming); + aws_mem_release(allocator, listener_args.incoming); + } + + io_args.socket = &outgoing; + io_args.close_completed = false; + aws_event_loop_schedule_task_now(event_loop, &close_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_close_completed_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + + aws_socket_clean_up(&outgoing); + + io_args.socket = &listener; + io_args.close_completed = false; + aws_event_loop_schedule_task_now(event_loop, &close_task); + ASSERT_SUCCESS(aws_mutex_lock(&mutex)); + aws_condition_variable_wait_pred(&io_args.condition_variable, &mutex, s_close_completed_predicate, &io_args); + ASSERT_SUCCESS(aws_mutex_unlock(&mutex)); + + aws_socket_clean_up(&listener); + + aws_event_loop_destroy(event_loop); + + // DEBUG WIP, sleep to wait for reference release + aws_thread_current_sleep(5000000000); return 0; } @@ -411,7 +593,10 @@ static int s_test_socket( struct aws_socket_options *options, struct aws_socket_endpoint *endpoint) { - return s_test_socket_ex(allocator, options, NULL, endpoint); + if (s_use_dispatch_queue && options->type == AWS_SOCKET_DGRAM) + return s_test_socket_udp_dispatch_queue(allocator, options, endpoint); + else + return s_test_socket_ex(allocator, options, NULL, endpoint); } static int s_test_local_socket_communication(struct aws_allocator *allocator, void *ctx) { @@ -457,7 +642,7 @@ static int s_test_socket_with_bind_to_interface(struct aws_allocator *allocator, (void)ctx; struct aws_socket_options options; AWS_ZERO_STRUCT(options); - options.connect_timeout_ms = 3000; + options.connect_timeout_ms = 30000; options.keepalive = true; options.keep_alive_interval_sec = 1000; options.keep_alive_timeout_sec = 60000; @@ -470,7 +655,8 @@ static int s_test_socket_with_bind_to_interface(struct aws_allocator *allocator, #endif struct aws_socket_endpoint endpoint = {.address = "127.0.0.1", .port = 8128}; if (s_test_socket(allocator, &options, &endpoint)) { -#if !defined(AWS_OS_APPLE) && !defined(AWS_OS_LINUX) +#if !defined(AWS_OS_LINUX) + // On Apple, nw_socket currently not support network_interface_name if (aws_last_error() == AWS_ERROR_PLATFORM_NOT_SUPPORTED) { return AWS_OP_SKIP; } @@ -508,7 +694,7 @@ static int s_test_socket_with_bind_to_invalid_interface(struct aws_allocator *al options.domain = AWS_SOCKET_IPV4; strncpy(options.network_interface_name, "invalid", AWS_NETWORK_INTERFACE_NAME_MAX); struct aws_socket outgoing; -#if defined(AWS_OS_APPLE) || defined(AWS_OS_LINUX) +#if (defined(AWS_OS_APPLE) && defined(AWS_USE_KQUEUE)) || defined(AWS_OS_LINUX) ASSERT_ERROR(AWS_IO_SOCKET_INVALID_OPTIONS, aws_socket_init(&outgoing, allocator, &options)); #else ASSERT_ERROR(AWS_ERROR_PLATFORM_NOT_SUPPORTED, aws_socket_init(&outgoing, allocator, &options)); @@ -817,6 +1003,12 @@ static void s_null_sock_connection(struct aws_socket *socket, int error_code, vo aws_mutex_unlock(&error_args->mutex); } +static bool s_outgoing_local_error_predicate(void *args) { + struct error_test_args *test_args = (struct error_test_args *)args; + + return test_args->error_code != 0; +} + static int s_test_outgoing_local_sock_errors(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -842,9 +1034,20 @@ static int s_test_outgoing_local_sock_errors(struct aws_allocator *allocator, vo struct aws_socket outgoing; ASSERT_SUCCESS(aws_socket_init(&outgoing, allocator, &options)); - ASSERT_FAILS(aws_socket_connect(&outgoing, &endpoint, event_loop, s_null_sock_connection, &args)); - ASSERT_TRUE( - aws_last_error() == AWS_IO_SOCKET_CONNECTION_REFUSED || aws_last_error() == AWS_ERROR_FILE_INVALID_PATH); + int socket_connect_result = aws_socket_connect(&outgoing, &endpoint, event_loop, s_null_sock_connection, &args); + // As Apple network framework has a async API design, we would not get the error back on connect + if (!s_use_dispatch_queue) { + ASSERT_FAILS(socket_connect_result); + ASSERT_TRUE( + aws_last_error() == AWS_IO_SOCKET_CONNECTION_REFUSED || aws_last_error() == AWS_ERROR_FILE_INVALID_PATH); + } else { + ASSERT_SUCCESS(aws_mutex_lock(&args.mutex)); + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &args.condition_variable, &args.mutex, s_outgoing_local_error_predicate, &args)); + ASSERT_SUCCESS(aws_mutex_unlock(&args.mutex)); + ASSERT_TRUE( + args.error_code == AWS_IO_SOCKET_CONNECTION_REFUSED || args.error_code == AWS_ERROR_FILE_INVALID_PATH); + } aws_socket_clean_up(&outgoing); aws_event_loop_destroy(event_loop); @@ -982,6 +1185,37 @@ static int s_test_incoming_duplicate_tcp_bind_errors(struct aws_allocator *alloc AWS_TEST_CASE(incoming_duplicate_tcp_bind_errors, s_test_incoming_duplicate_tcp_bind_errors) +struct nw_socket_bind_args { + struct aws_socket *incoming; + struct aws_socket *listener; + struct aws_mutex *mutex; + struct aws_condition_variable *condition_variable; + bool incoming_invoked; + bool error_invoked; +}; + +static void s_local_listener_incoming_destroy_listener_bind( + struct aws_socket *socket, + int error_code, + struct aws_socket *new_socket, + void *user_data) { + (void)socket; + struct nw_socket_bind_args *listener_args = (struct nw_socket_bind_args *)user_data; + aws_mutex_lock(listener_args->mutex); + + if (!error_code) { + listener_args->incoming = new_socket; + listener_args->incoming_invoked = true; + } else { + listener_args->error_invoked = true; + } + if (new_socket) + aws_socket_clean_up(new_socket); + aws_condition_variable_notify_one(listener_args->condition_variable); + aws_mutex_unlock(listener_args->mutex); +} + +// TODO: Setup bind zero port test for dispatch queue /* Ensure that binding to port 0 results in OS assigning a port */ static int s_test_bind_on_zero_port( struct aws_allocator *allocator, @@ -1016,10 +1250,33 @@ static int s_test_bind_on_zero_port( ASSERT_SUCCESS(aws_socket_get_bound_address(&incoming, &local_address1)); - if (sock_type != AWS_SOCKET_DGRAM) { + if (s_use_dispatch_queue) { + struct aws_mutex mutex = AWS_MUTEX_INIT; + struct aws_condition_variable condition_variable = AWS_CONDITION_VARIABLE_INIT; + + struct nw_socket_bind_args listener_args = { + .incoming = NULL, + .listener = &incoming, + .incoming_invoked = false, + .error_invoked = false, + .mutex = &mutex, + .condition_variable = &condition_variable, + }; ASSERT_SUCCESS(aws_socket_listen(&incoming, 1024)); - } + ASSERT_SUCCESS(aws_socket_start_accept( + &incoming, event_loop, s_local_listener_incoming_destroy_listener_bind, &listener_args)); + // Apple Dispatch Queue requires a listener to be ready before it can get the assigned port. We wait until the + // port is back. + while (local_address1.port == 0) { + ASSERT_SUCCESS(aws_socket_get_bound_address(&incoming, &local_address1)); + } + + } else { + if (sock_type != AWS_SOCKET_DGRAM) { + ASSERT_SUCCESS(aws_socket_listen(&incoming, 1024)); + } + } ASSERT_TRUE(local_address1.port > 0); ASSERT_STR_EQUALS(address, local_address1.address); @@ -1360,6 +1617,9 @@ static int s_cleanup_in_accept_doesnt_explode(struct aws_allocator *allocator, v aws_socket_clean_up(&outgoing); aws_event_loop_destroy(event_loop); + // DEBUG WIP, sleep to wait for reference release + aws_thread_current_sleep(1000000000); + return 0; } AWS_TEST_CASE(cleanup_in_accept_doesnt_explode, s_cleanup_in_accept_doesnt_explode) @@ -1502,6 +1762,9 @@ static int s_cleanup_in_write_cb_doesnt_explode(struct aws_allocator *allocator, aws_socket_clean_up(&listener); aws_event_loop_destroy(event_loop); + // DEBUG WIP, sleep to wait for reference release + aws_thread_current_sleep(1000000000); + return 0; } AWS_TEST_CASE(cleanup_in_write_cb_doesnt_explode, s_cleanup_in_write_cb_doesnt_explode) @@ -1515,13 +1778,14 @@ enum async_role { ASYNC_ROLE_COUNT }; -static struct { +static struct async_test_args { struct aws_allocator *allocator; struct aws_event_loop *event_loop; struct aws_socket *write_socket; struct aws_socket *read_socket; bool currently_writing; enum async_role next_expected_callback; + int read_error; struct aws_mutex *mutex; struct aws_condition_variable *condition_variable; @@ -1547,7 +1811,12 @@ static void s_async_read_task(struct aws_task *task, void *args, enum aws_task_s buf.len = 0; if (aws_socket_read(g_async_tester.read_socket, &buf, &amount_read)) { /* reschedule task to try reading more later */ - if (AWS_IO_READ_WOULD_BLOCK == aws_last_error()) { + /* + * For Apple Network Framework (dispatch queue), the read error would not directly returned from + * aws_socket_read, but from the callback, therefore, we validate the g_async_tester.read_error + * returned from the callback + */ + if (!g_async_tester.read_error && AWS_IO_READ_WOULD_BLOCK == aws_last_error()) { aws_event_loop_schedule_task_now(g_async_tester.event_loop, task); break; } @@ -1633,6 +1902,15 @@ static void s_async_write_task(struct aws_task *task, void *args, enum aws_task_ g_async_tester.currently_writing = false; } +static void s_on_readable_return(struct aws_socket *socket, int error_code, void *user_data) { + (void)socket; + (void)error_code; + struct async_test_args *async_tester = user_data; + if (error_code) { + async_tester->read_error = error_code; + } +} + /** * aws_socket_write()'s completion callback MUST fire asynchronously. * Otherwise, we can get multiple write() calls in the same callstack, which @@ -1699,7 +1977,7 @@ static int s_sock_write_cb_is_async(struct aws_allocator *allocator, void *ctx) ASSERT_INT_EQUALS(options.type, listener_args.incoming->options.type); ASSERT_SUCCESS(aws_socket_assign_to_event_loop(server_sock, event_loop)); - aws_socket_subscribe_to_readable_events(server_sock, s_on_readable, NULL); + aws_socket_subscribe_to_readable_events(server_sock, s_on_readable_return, &g_async_tester); aws_socket_subscribe_to_readable_events(&outgoing, s_on_readable, NULL); /* set up g_async_tester */ diff --git a/tests/tls_handler_test.c b/tests/tls_handler_test.c index 0b0f5c88c..3007a3164 100644 --- a/tests/tls_handler_test.c +++ b/tests/tls_handler_test.c @@ -196,6 +196,9 @@ static int s_tls_common_tester_clean_up(struct tls_common_tester *tester) { aws_condition_variable_clean_up(&tester->condition_variable); aws_mutex_clean_up(&tester->mutex); + // DEBUG WIP, sleep to wait for reference release + aws_thread_current_sleep(1000000000); + return AWS_OP_SUCCESS; }