Skip to content

Commit

Permalink
merge with nw_socket_udp changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbSteveK committed Oct 15, 2024
2 parents e7cd42e + d071082 commit c859bde
Show file tree
Hide file tree
Showing 13 changed files with 817 additions and 305 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,15 @@ jobs:
macos-debug:
runs-on: macos-14 # latest
strategy:
matrix:
eventloop: ["-DAWS_USE_DISPATCH_QUEUE=ON", "-DAWS_USE_DISPATCH_QUEUE=OFF"]
steps:
- name: Build ${{ env.PACKAGE_NAME }} + consumers
run: |
python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
chmod a+x builder
./builder build -p ${{ env.PACKAGE_NAME }} --config Debug
./builder build -p ${{ env.PACKAGE_NAME }} --cmake-extra=${{ matrix.eventloop }} --config Debug
freebsd:
runs-on: ubuntu-22.04 # latest
Expand Down
15 changes: 4 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ elseif (APPLE)

file(GLOB AWS_IO_OS_SRC
"source/bsd/*.c"
"source/posix/pipe.c"
"source/posix/host_resolver.c"
"source/posix/shared_library.c"
"source/posix/*.c"
"source/darwin/darwin_pki_utils.c"
"source/darwin/secure_transport_tls_channel_handler.c"
)
Expand All @@ -137,19 +135,14 @@ elseif (APPLE)

if(AWS_USE_DISPATCH_QUEUE OR IOS)
set(EVENT_LOOP_DEFINES "-DAWS_USE_DISPATCH_QUEUE" )
message("use dispatch queue on Apple")
message("use dispatch queue")
file(GLOB AWS_IO_DISPATCH_QUEUE_SRC
"source/darwin/dispatch_queue_event_loop.c"
"source/darwin/nw_socket.c"
)
list(APPEND AWS_IO_OS_SRC ${AWS_IO_DISPATCH_QUEUE_SRC})
else ()
file(GLOB AWS_KQUEUE_SRC
"source/posix/socket.c"
)
message("use kqueue on Apple")
set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE")
list(APPEND AWS_IO_OS_SRC ${AWS_KQUEUE_SRC})
set(EVENT_LOOP_DEFINES "-DAWS_USE_KQUEUE")
endif()

elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetBSD" OR CMAKE_SYSTEM_NAME STREQUAL "OpenBSD")
Expand Down Expand Up @@ -258,4 +251,4 @@ if (NOT CMAKE_CROSSCOMPILING)
if (BUILD_TESTING)
add_subdirectory(tests)
endif()
endif()
endif()
1 change: 1 addition & 0 deletions include/aws/io/private/aws_apple_network_framework.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct dispatch_loop {
} synced_data;

bool wakeup_schedule_needed;
bool is_destroying;
};
#endif /* AWS_USE_DISPATCH_QUEUE */

Expand Down
5 changes: 3 additions & 2 deletions include/aws/io/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ struct aws_socket_options {
* This property is used to bind the socket to a particular network interface by name, such as eth0 and ens32.
* If this is empty, the socket will not be bound to any interface and will use OS defaults. If the provided name
* is invalid, `aws_socket_init()` will error out with AWS_IO_SOCKET_INVALID_OPTIONS. This option is only
* supported on Linux, macOS, and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It is not supported on
* Windows. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on unsupported platforms.
* supported on Linux, macOS(bsd socket), and platforms that have either SO_BINDTODEVICE or IP_BOUND_IF. It is not
* supported on Windows and Apple Network Framework. `AWS_ERROR_PLATFORM_NOT_SUPPORTED` will be raised on
* unsupported platforms.
*/
char network_interface_name[AWS_NETWORK_INTERFACE_NAME_MAX];
};
Expand Down
4 changes: 3 additions & 1 deletion source/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,9 @@ void aws_channel_schedule_task_future(
}

bool aws_channel_thread_is_callers_thread(struct aws_channel *channel) {
return aws_event_loop_thread_is_callers_thread(channel->loop);
if (channel && channel->loop)
return aws_event_loop_thread_is_callers_thread(channel->loop);
return true;
}

static void s_update_channel_slot_message_overheads(struct aws_channel *channel) {
Expand Down
2 changes: 1 addition & 1 deletion source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ static void s_attempt_connection(struct aws_task *task, void *arg, enum aws_task
goto task_cancelled;
}

struct aws_socket *outgoing_socket = aws_mem_acquire(allocator, sizeof(struct aws_socket));
struct aws_socket *outgoing_socket = aws_mem_calloc(allocator, 1, sizeof(struct aws_socket));
if (aws_socket_init(outgoing_socket, allocator, &task_data->options)) {
goto socket_init_failed;
}
Expand Down
37 changes: 24 additions & 13 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct scheduled_service_entry {
uint64_t timestamp;
struct aws_linked_list_node node;
struct aws_event_loop *loop; // might eventually need to be ref-counted for cleanup?
bool cancel;
};

struct scheduled_service_entry *scheduled_service_entry_new(struct aws_event_loop *loop, uint64_t timestamp) {
Expand All @@ -78,6 +79,7 @@ void scheduled_service_entry_destroy(struct scheduled_service_entry *entry) {
aws_ref_count_release(&dispatch_loop->ref_count);

aws_mem_release(entry->allocator, entry);
entry = NULL;
}

// checks to see if another scheduled iteration already exists that will either
Expand All @@ -100,14 +102,13 @@ static void s_dispatch_event_loop_destroy(void *context) {
struct aws_event_loop *event_loop = context;
struct dispatch_loop *dispatch_loop = event_loop->impl_data;

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroy Dispatch Queue Event Loop.", (void *)event_loop);

aws_mutex_clean_up(&dispatch_loop->synced_data.lock);
aws_string_destroy(dispatch_loop->dispatch_queue_id);
aws_mem_release(dispatch_loop->allocator, dispatch_loop);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroyed Dispatch Queue Event Loop.", (void *)event_loop);
aws_thread_decrement_unjoined_count();
}

Expand Down Expand Up @@ -199,9 +200,13 @@ struct aws_event_loop *aws_event_loop_new_dispatch_queue_with_options(

static void s_destroy(struct aws_event_loop *event_loop) {
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying Dispatch Queue Event Loop", (void *)event_loop);

struct dispatch_loop *dispatch_loop = event_loop->impl_data;

if (dispatch_loop->is_destroying) {
return;
}
dispatch_loop->is_destroying = true;

/* make sure the loop is running so we can schedule a last task. */
s_run(event_loop);

Expand All @@ -226,14 +231,16 @@ static void s_destroy(struct aws_event_loop *event_loop) {
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

while (!aws_linked_list_empty(&dispatch_loop->synced_data.scheduling_state.scheduled_services)) {
struct aws_linked_list_node *node =
aws_linked_list_pop_front(&dispatch_loop->synced_data.scheduling_state.scheduled_services);
struct scheduled_service_entry *entry = AWS_CONTAINER_OF(node, struct scheduled_service_entry, node);
scheduled_service_entry_destroy(entry);
}

aws_mutex_lock(&dispatch_loop->synced_data.lock);
// The entry in the scheduled_services are all pushed to dispatch loop as the function context.
// Apple does not allow NULL context here, do not destroy the entry until the block run.
struct aws_linked_list_node *iter = NULL;
for (iter = aws_linked_list_begin(&dispatch_loop->synced_data.scheduling_state.scheduled_services);
iter != aws_linked_list_end(&dispatch_loop->synced_data.scheduling_state.scheduled_services);
iter = aws_linked_list_next(iter)) {
struct scheduled_service_entry *entry = AWS_CONTAINER_OF(iter, struct scheduled_service_entry, node);
entry->cancel = true;
}
dispatch_loop->synced_data.suspended = true;
dispatch_loop->synced_data.is_executing = false;
aws_mutex_unlock(&dispatch_loop->synced_data.lock);
Expand Down Expand Up @@ -338,19 +345,23 @@ void end_iteration(struct scheduled_service_entry *entry) {
}
}

aws_mutex_unlock(&loop->synced_data.lock);
scheduled_service_entry_destroy(entry);
aws_mutex_unlock(&loop->synced_data.lock);
}

// this function is what gets scheduled and executed by the Dispatch Queue API
void run_iteration(void *context) {
struct scheduled_service_entry *entry = context;
struct aws_event_loop *event_loop = entry->loop;
if (event_loop == NULL)
return;
struct dispatch_loop *dispatch_loop = event_loop->impl_data;
AWS_ASSERT(event_loop && dispatch_loop);
if (entry->cancel) {
scheduled_service_entry_destroy(entry);
return;
}

if (!begin_iteration(entry)) {
scheduled_service_entry_destroy(entry);
return;
}

Expand Down
Loading

0 comments on commit c859bde

Please sign in to comment.