diff --git a/src/arch/eventloop_posix.cpp b/src/arch/eventloop_posix.cpp new file mode 100644 index 0000000..623dfe3 --- /dev/null +++ b/src/arch/eventloop_posix.cpp @@ -0,0 +1,559 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer) + * Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes) + */ + +#include "eventloop_posix.h" +//#include "open62541/plugin/eventloop.h" + +/*********/ +/* Timer */ +/*********/ + +static UA_DateTime +UA_EventLoopPOSIX_nextCyclicTime(UA_EventLoop *public_el) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + return UA_Timer_nextRepeatedTime(&el->timer); +} + +static UA_StatusCode +UA_EventLoopPOSIX_addTimedCallback(UA_EventLoop *public_el, + UA_Callback callback, + void *application, void *data, + UA_DateTime date, + UA_UInt64 *callbackId) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + return UA_Timer_addTimedCallback(&el->timer, callback, application, + data, date, callbackId); +} + +static UA_StatusCode +UA_EventLoopPOSIX_addCyclicCallback(UA_EventLoop *public_el, + UA_Callback cb, + void *application, void *data, + UA_Double interval_ms, + UA_DateTime *baseTime, + UA_TimerPolicy timerPolicy, + UA_UInt64 *callbackId) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + return UA_Timer_addRepeatedCallback(&el->timer, cb, application, + data, interval_ms, baseTime, + timerPolicy, callbackId); +} + +static UA_StatusCode +UA_EventLoopPOSIX_modifyCyclicCallback(UA_EventLoop *public_el, + UA_UInt64 callbackId, + UA_Double interval_ms, + UA_DateTime *baseTime, + UA_TimerPolicy timerPolicy) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + return UA_Timer_changeRepeatedCallback(&el->timer, callbackId, + interval_ms, baseTime, + timerPolicy); +} + +static void +UA_EventLoopPOSIX_removeCyclicCallback(UA_EventLoop *public_el, + UA_UInt64 callbackId) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + UA_Timer_removeCallback(&el->timer, callbackId); +} + +static void +UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el, + UA_DelayedCallback *dc) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + UA_LOCK(&el->elMutex); + dc->next = el->delayedCallbacks; + el->delayedCallbacks = dc; + UA_UNLOCK(&el->elMutex); +} + +static void +UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el, + UA_DelayedCallback *dc) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el; + UA_LOCK(&el->elMutex); + UA_DelayedCallback **prev = &el->delayedCallbacks; + while(*prev) { + if(*prev == dc) { + *prev = (*prev)->next; + UA_UNLOCK(&el->elMutex); + return; + } + prev = &(*prev)->next; + } + UA_UNLOCK(&el->elMutex); +} + +/* Process and then free registered delayed callbacks */ +static void +processDelayed(UA_EventLoopPOSIX *el) { + UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Process delayed callbacks"); + + UA_LOCK_ASSERT(&el->elMutex, 1); + + /* First empty the linked list in the el. So a delayed callback can add + * (itself) to the list. New entries are then processed during the next + * iteration. */ + UA_DelayedCallback *dc = el->delayedCallbacks, *next = NULL; + el->delayedCallbacks = NULL; + + for(; dc; dc = next) { + next = dc->next; + /* Delayed Callbacks might have no callback set. We don't return a + * StatusCode during "add" and don't validate. So test here. */ + if(!dc->callback) + continue; + UA_UNLOCK(&el->elMutex); + dc->callback(dc->application, dc->context); + UA_LOCK(&el->elMutex); + } +} + +/***********************/ +/* EventLoop Lifecycle */ +/***********************/ + +static UA_StatusCode +UA_EventLoopPOSIX_start(UA_EventLoopPOSIX *el) { + UA_LOCK(&el->elMutex); + + if(el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH && + el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED) { + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Starting the EventLoop"); + +#ifdef UA_HAVE_EPOLL + el->epollfd = epoll_create1(0); + if(el->epollfd == -1) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "Eventloop\t| Could not create the epoll socket (%s)", + errno_str)); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } +#endif + + UA_StatusCode res = UA_STATUSCODE_GOOD; + UA_EventSource *es = el->eventLoop.eventSources; + while(es) { + UA_UNLOCK(&el->elMutex); + res |= es->start(es); + UA_LOCK(&el->elMutex); + es = es->next; + } + + /* Dirty-write the state that is const "from the outside" */ + *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = + UA_EVENTLOOPSTATE_STARTED; + + UA_UNLOCK(&el->elMutex); + return res; +} + +static void +checkClosed(UA_EventLoopPOSIX *el) { + UA_LOCK_ASSERT(&el->elMutex, 1); + + UA_EventSource *es = el->eventLoop.eventSources; + while(es) { + if(es->state != UA_EVENTSOURCESTATE_STOPPED) + return; + es = es->next; + } + + /* Not closed until all delayed callbacks are processed */ + if(el->delayedCallbacks != NULL) + return; + + /* Dirty-write the state that is const "from the outside" */ + *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = + UA_EVENTLOOPSTATE_STOPPED; + + /* Close the epoll/IOCP socket once all EventSources have shut down */ +#ifdef UA_HAVE_EPOLL + close(el->epollfd); +#endif + + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "The EventLoop has stopped"); +} + +static void +UA_EventLoopPOSIX_stop(UA_EventLoopPOSIX *el) { + UA_LOCK(&el->elMutex); + + if(el->eventLoop.state != UA_EVENTLOOPSTATE_STARTED) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "The EventLoop is not running, cannot be stopped"); + UA_UNLOCK(&el->elMutex); + return; + } + + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Stopping the EventLoop"); + + /* Set to STOPPING to prevent "normal use" */ + *(UA_EventLoopState*)(uintptr_t)&el->eventLoop.state = + UA_EVENTLOOPSTATE_STOPPING; + + /* Stop all event sources (asynchronous) */ + UA_EventSource *es = el->eventLoop.eventSources; + for(; es; es = es->next) { + if(es->state == UA_EVENTSOURCESTATE_STARTING || + es->state == UA_EVENTSOURCESTATE_STARTED) { + UA_UNLOCK(&el->elMutex); + es->stop(es); + UA_LOCK(&el->elMutex); + } + } + + /* Set to STOPPED if all EventSources are STOPPED */ + checkClosed(el); + + UA_UNLOCK(&el->elMutex); +} + +static UA_StatusCode +UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) { + UA_LOCK(&el->elMutex); + + if(el->executing) { + UA_LOG_ERROR(el->eventLoop.logger, + UA_LOGCATEGORY_EVENTLOOP, + "Cannot run EventLoop from the run method itself"); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + el->executing = true; + + if(el->eventLoop.state == UA_EVENTLOOPSTATE_FRESH || + el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPED) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Cannot iterate a stopped EventLoop"); + el->executing = false; + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Iterate the EventLoop"); + + /* Process cyclic callbacks */ + UA_DateTime dateBefore = + el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); + + UA_UNLOCK(&el->elMutex); + UA_DateTime dateNext = UA_Timer_process(&el->timer, dateBefore); + UA_LOCK(&el->elMutex); + + /* Process delayed callbacks here: + * - Removes closed sockets already here instead of polling them again. + * - The timeout for polling is selected to be ready in time for the next + * cyclic callback. So we want to do little work between the timeout + * running out and executing the due cyclic callbacks. */ + processDelayed(el); + + /* A delayed callback could create another delayed callback (or re-add + * itself). In that case we don't want to wait (indefinitely) for an event + * to happen. Process queued events but don't sleep. Then process the + * delayed callbacks in the next iteration. */ + if(el->delayedCallbacks != NULL) + timeout = 0; + + /* Compute the remaining time */ + UA_DateTime maxDate = dateBefore + (timeout * UA_DATETIME_MSEC); + if(dateNext > maxDate) + dateNext = maxDate; + UA_DateTime listenTimeout = + dateNext - el->eventLoop.dateTime_nowMonotonic(&el->eventLoop); + if(listenTimeout < 0) + listenTimeout = 0; + + /* Listen on the active file-descriptors (sockets) from the + * ConnectionManagers */ + UA_StatusCode rv = UA_EventLoopPOSIX_pollFDs(el, listenTimeout); + + /* Check if the last EventSource was successfully stopped */ + if(el->eventLoop.state == UA_EVENTLOOPSTATE_STOPPING) + checkClosed(el); + + el->executing = false; + UA_UNLOCK(&el->elMutex); + return rv; +} + +/*****************************/ +/* Registering Event Sources */ +/*****************************/ + +static UA_StatusCode +UA_EventLoopPOSIX_registerEventSource(UA_EventLoopPOSIX *el, + UA_EventSource *es) { + UA_LOCK(&el->elMutex); + + /* Already registered? */ + if(es->state != UA_EVENTSOURCESTATE_FRESH) { + UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "Cannot register the EventSource \"%.*s\": " + "already registered", + (int)es->name.length, (char*)es->name.data); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Add to linked list */ + es->next = el->eventLoop.eventSources; + el->eventLoop.eventSources = es; + + es->eventLoop = &el->eventLoop; + es->state = UA_EVENTSOURCESTATE_STOPPED; + + /* Start if the entire EventLoop is started */ + UA_StatusCode res = UA_STATUSCODE_GOOD; + if(el->eventLoop.state == UA_EVENTLOOPSTATE_STARTED) + res = es->start(es); + + UA_UNLOCK(&el->elMutex); + return res; +} + +static UA_StatusCode +UA_EventLoopPOSIX_deregisterEventSource(UA_EventLoopPOSIX *el, + UA_EventSource *es) { + UA_LOCK(&el->elMutex); + + if(es->state != UA_EVENTSOURCESTATE_STOPPED) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Cannot deregister the EventSource %.*s: " + "Has to be stopped first", + (int)es->name.length, es->name.data); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Remove from the linked list */ + UA_EventSource **s = &el->eventLoop.eventSources; + while(*s) { + if(*s == es) { + *s = es->next; + break; + } + s = &(*s)->next; + } + + /* Set the state to non-registered */ + es->state = UA_EVENTSOURCESTATE_FRESH; + + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_GOOD; +} + +/***************/ +/* Time Domain */ +/***************/ + +/* No special synchronization with an external source, just use the globally + * defined functions. */ + +static UA_DateTime +UA_EventLoopPOSIX_DateTime_now(UA_EventLoop *el) { + return UA_DateTime_now(); +} + +static UA_DateTime +UA_EventLoopPOSIX_DateTime_nowMonotonic(UA_EventLoop *el) { + return UA_DateTime_nowMonotonic(); +} + +static UA_Int64 +UA_EventLoopPOSIX_DateTime_localTimeUtcOffset(UA_EventLoop *el) { + return UA_DateTime_localTimeUtcOffset(); +} + +/*************************/ +/* Initialize and Delete */ +/*************************/ + +static UA_StatusCode +UA_EventLoopPOSIX_free(UA_EventLoopPOSIX *el) { + UA_LOCK(&el->elMutex); + + /* Check if the EventLoop can be deleted */ + if(el->eventLoop.state != UA_EVENTLOOPSTATE_STOPPED && + el->eventLoop.state != UA_EVENTLOOPSTATE_FRESH) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "Cannot delete a running EventLoop"); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Deregister and delete all the EventSources */ + while(el->eventLoop.eventSources) { + UA_EventSource *es = el->eventLoop.eventSources; + UA_UNLOCK(&el->elMutex); + UA_EventLoopPOSIX_deregisterEventSource(el, es); + UA_LOCK(&el->elMutex); + es->free(es); + } + + /* Remove the repeated timed callbacks */ + UA_Timer_clear(&el->timer); + + /* Process remaining delayed callbacks */ + processDelayed(el); + +#ifdef _WIN32 + /* Stop the Windows networking subsystem */ + WSACleanup(); +#endif + + /* Clean up */ + UA_UNLOCK(&el->elMutex); + UA_LOCK_DESTROY(&el->elMutex); + UA_free(el); + return UA_STATUSCODE_GOOD; +} + +UA_EventLoop * +UA_EventLoop_new_POSIX(const UA_Logger *logger) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*) + UA_calloc(1, sizeof(UA_EventLoopPOSIX)); + if(!el) + return NULL; + + UA_LOCK_INIT(&el->elMutex); + UA_Timer_init(&el->timer); + +#ifdef _WIN32 + /* Start the WSA networking subsystem on Windows */ + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#endif + + /* Set the public EventLoop content */ + el->eventLoop.logger = logger; + + el->eventLoop.start = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_start; + el->eventLoop.stop = (void (*)(UA_EventLoop*))UA_EventLoopPOSIX_stop; + el->eventLoop.run = (UA_StatusCode (*)(UA_EventLoop*, UA_UInt32))UA_EventLoopPOSIX_run; + el->eventLoop.free = (UA_StatusCode (*)(UA_EventLoop*))UA_EventLoopPOSIX_free; + + el->eventLoop.dateTime_now = UA_EventLoopPOSIX_DateTime_now; + el->eventLoop.dateTime_nowMonotonic = + UA_EventLoopPOSIX_DateTime_nowMonotonic; + el->eventLoop.dateTime_localTimeUtcOffset = + UA_EventLoopPOSIX_DateTime_localTimeUtcOffset; + + el->eventLoop.nextCyclicTime = UA_EventLoopPOSIX_nextCyclicTime; + el->eventLoop.addCyclicCallback = UA_EventLoopPOSIX_addCyclicCallback; + el->eventLoop.modifyCyclicCallback = UA_EventLoopPOSIX_modifyCyclicCallback; + el->eventLoop.removeCyclicCallback = UA_EventLoopPOSIX_removeCyclicCallback; + el->eventLoop.addTimedCallback = UA_EventLoopPOSIX_addTimedCallback; + el->eventLoop.addDelayedCallback = UA_EventLoopPOSIX_addDelayedCallback; + el->eventLoop.removeDelayedCallback = UA_EventLoopPOSIX_removeDelayedCallback; + + el->eventLoop.registerEventSource = + (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*)) + UA_EventLoopPOSIX_registerEventSource; + el->eventLoop.deregisterEventSource = + (UA_StatusCode (*)(UA_EventLoop*, UA_EventSource*)) + UA_EventLoopPOSIX_deregisterEventSource; + + return &el->eventLoop; +} + +/* Reusable EventSource functionality */ + +UA_StatusCode +UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm, + uintptr_t connectionId, + UA_ByteString *buf, + size_t bufSize) { + return UA_ByteString_allocBuffer(buf, bufSize); +} + +void +UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm, + uintptr_t connectionId, + UA_ByteString *buf) { + UA_ByteString_clear(buf); +} + +UA_StatusCode +UA_EventLoopPOSIX_allocateRXBuffer(UA_POSIXConnectionManager *pcm) { + UA_UInt32 rxBufSize = 2u << 16; /* The default is 64kb */ + const UA_UInt32 *configRxBufSize = (const UA_UInt32 *) + UA_KeyValueMap_getScalar(&pcm->cm.eventSource.params, + UA_QUALIFIEDNAME(0, "recv-bufsize"), + &UA_TYPES[UA_TYPES_UINT32]); + if(configRxBufSize) + rxBufSize = *configRxBufSize; + if(pcm->rxBuffer.length != rxBufSize) { + UA_ByteString_clear(&pcm->rxBuffer); + return UA_ByteString_allocBuffer(&pcm->rxBuffer, rxBufSize); + } + return UA_STATUSCODE_GOOD; +} + +/* Socket Handling */ + +enum ZIP_CMP +cmpFD(const UA_FD *a, const UA_FD *b) { + if(*a == *b) + return ZIP_CMP_EQ; + return (*a < *b) ? ZIP_CMP_LESS : ZIP_CMP_MORE; +} + +UA_StatusCode +UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd) { +#ifndef _WIN32 + int opts = fcntl(sockfd, F_GETFL); + if(opts < 0 || fcntl(sockfd, F_SETFL, opts | O_NONBLOCK) < 0) + return UA_STATUSCODE_BADINTERNALERROR; +#else + u_long iMode = 1; + if(ioctlsocket(sockfd, FIONBIO, &iMode) != NO_ERROR) + return UA_STATUSCODE_BADINTERNALERROR; +#endif + return UA_STATUSCODE_GOOD; +} + +UA_StatusCode +UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd) { +#ifdef SO_NOSIGPIPE + int val = 1; + int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)); + if(res < 0) + return UA_STATUSCODE_BADINTERNALERROR; +#endif + return UA_STATUSCODE_GOOD; +} + +UA_StatusCode +UA_EventLoopPOSIX_setReusable(UA_FD sockfd) { + int enableReuseVal = 1; +#ifndef _WIN32 + int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, + (const char*)&enableReuseVal, sizeof(enableReuseVal)); + res |= UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, + (const char*)&enableReuseVal, sizeof(enableReuseVal)); + return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR; +#else + int res = UA_setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, + (const char*)&enableReuseVal, sizeof(enableReuseVal)); + return (res == 0) ? UA_STATUSCODE_GOOD : UA_STATUSCODE_BADINTERNALERROR; +#endif +} diff --git a/src/arch/eventloop_posix.h b/src/arch/eventloop_posix.h new file mode 100644 index 0000000..bbb8ea7 --- /dev/null +++ b/src/arch/eventloop_posix.h @@ -0,0 +1,178 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer) + * Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes) + */ + +#ifndef UA_EVENTLOOP_POSIX_H_ +#define UA_EVENTLOOP_POSIX_H_ + +#include "../open62541.h" + +#if defined(UA_ARCHITECTURE_POSIX) || defined(UA_ARCHITECTURE_WIN32) + +/* Include architecture-specific definitions */ +#if defined(UA_ARCHITECTURE_WIN32) +#include "win32/ua_architecture.h" +#elif defined(UA_ARCHITECTURE_POSIX) +#include "posix/ua_architecture.h" +#endif + +#include "common/ua_timer.h" +#include "../deps/mp_printf.h" +#include "../deps/open62541_queue.h" + +/* epoll_pwait returns bogus data with the tc compiler */ +#if defined(__linux__) && !defined(__TINYC__) +# define UA_HAVE_EPOLL +# include +#endif + +#define UA_MAXBACKLOG 100 +#define UA_MAXHOSTNAME_LENGTH 256 +#define UA_MAXPORTSTR_LENGTH 6 + +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +#ifndef MSG_DONTWAIT +#define MSG_DONTWAIT 0 +#endif + +_UA_BEGIN_DECLS + +/* POSIX events are based on sockets / file descriptors. The EventSources can + * register their fd in the EventLoop so that they are considered by the + * EventLoop dropping into "poll" to wait for events. */ + +/* TODO: Move the macro-forest from /arch//ua_architecture.h */ +#define UA_FD UA_SOCKET +#define UA_INVALID_FD UA_INVALID_SOCKET + +struct UA_RegisteredFD; +typedef struct UA_RegisteredFD UA_RegisteredFD; + +/* Bitmask to be used for the UA_FDCallback event argument */ +#define UA_FDEVENT_IN 1 +#define UA_FDEVENT_OUT 2 +#define UA_FDEVENT_ERR 4 + +typedef void (*UA_FDCallback)(UA_EventSource *es, UA_RegisteredFD *rfd, short event); + +struct UA_RegisteredFD { + UA_DelayedCallback dc; /* Used for async closing. Must be the first member + * because the rfd is freed by the delayed callback + * mechanism. */ + + ZIP_ENTRY(UA_RegisteredFD) zipPointers; /* Register FD in the EventSource */ + UA_FD fd; + short listenEvents; /* UA_FDEVENT_IN | UA_FDEVENT_OUT*/ + + UA_EventSource *es; /* Backpointer to the EventSource */ + UA_FDCallback eventSourceCB; +}; + +enum ZIP_CMP cmpFD(const UA_FD *a, const UA_FD *b); +typedef ZIP_HEAD(UA_FDTree, UA_RegisteredFD) UA_FDTree; +ZIP_FUNCTIONS(UA_FDTree, UA_RegisteredFD, zipPointers, UA_FD, fd, cmpFD) + +/* All ConnectionManager in the POSIX EventLoop can be cast to + * UA_ConnectionManagerPOSIX. They carry a sorted tree of their open + * sockets/file-descriptors. */ +typedef struct { + UA_ConnectionManager cm; + + /* Reused receive buffer. The size is configured via + * the recv-bufsize parameter.*/ + UA_ByteString rxBuffer; + + /* Sorted tree of the FDs */ + size_t fdsSize; + UA_FDTree fds; +} UA_POSIXConnectionManager; + +typedef struct { + UA_EventLoop eventLoop; + + /* Timer */ + UA_Timer timer; + + /* Linked List of Delayed Callbacks */ + UA_DelayedCallback *delayedCallbacks; + + /* Flag determining whether the eventloop is currently within the + * "run" method */ + UA_Boolean executing; + +#if defined(UA_HAVE_EPOLL) + UA_FD epollfd; +#else + UA_RegisteredFD **fds; + size_t fdsSize; +#endif + +#if UA_MULTITHREADING >= 100 + UA_Lock elMutex; +#endif +} UA_EventLoopPOSIX; + +/* + * The following functions differ between epoll and normal select + */ + +/* Register to start receiving events */ +UA_StatusCode +UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd); + +/* Modify the events that the fd listens on */ +UA_StatusCode +UA_EventLoopPOSIX_modifyFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd); + +/* Deregister but do not close the fd. No further events are received. */ +void +UA_EventLoopPOSIX_deregisterFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd); + +UA_StatusCode +UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout); + +/* Helper functions between EventSources */ + +UA_StatusCode +UA_EventLoopPOSIX_allocateRXBuffer(UA_POSIXConnectionManager *pcm); + +UA_StatusCode +UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm, + uintptr_t connectionId, + UA_ByteString *buf, + size_t bufSize); + +void +UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm, + uintptr_t connectionId, + UA_ByteString *buf); + +/* + * Helper functions to be used across protocols + */ + +/* Set the socket non-blocking. If the listen-socket is nonblocking, incoming + * connections inherit this state. */ +UA_StatusCode +UA_EventLoopPOSIX_setNonBlocking(UA_FD sockfd); + +/* Don't have the socket create interrupt signals */ +UA_StatusCode +UA_EventLoopPOSIX_setNoSigPipe(UA_FD sockfd); + +/* Enables sharing of the same listening address on different sockets */ +UA_StatusCode +UA_EventLoopPOSIX_setReusable(UA_FD sockfd); + +_UA_END_DECLS + +#endif /* defined(UA_ARCHITECTURE_POSIX) || defined(UA_ARCHITECTURE_WIN32) */ + +#endif /* UA_EVENTLOOP_POSIX_H_ */ diff --git a/src/arch/eventloop_posix_tcp.cpp b/src/arch/eventloop_posix_tcp.cpp new file mode 100644 index 0000000..ef0165f --- /dev/null +++ b/src/arch/eventloop_posix_tcp.cpp @@ -0,0 +1,1065 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 2021-2022 (c) Fraunhofer IOSB (Author: Julius Pfrommer) + * Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes) + */ + +#include "../open62541.h" +#include "eventloop_posix.h" +#include "eventloop_common.h" + +#include "posix/mbed_tcp.h" + +/* Configuration parameters */ +#define TCP_PARAMETERSSIZE 6 +#define TCP_PARAMINDEX_RECVBUF 0 +#define TCP_PARAMINDEX_ADDR 1 +#define TCP_PARAMINDEX_PORT 2 +#define TCP_PARAMINDEX_LISTEN 3 +#define TCP_PARAMINDEX_VALIDATE 4 +#define TCP_PARAMINDEX_REUSE 5 + +static UA_KeyValueRestriction TCPConfigParameters[TCP_PARAMETERSSIZE] = { + {{0, UA_STRING_STATIC("recv-bufsize")}, &UA_TYPES[UA_TYPES_UINT32], false, true, false}, + {{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], false, true, true}, + {{0, UA_STRING_STATIC("port")}, &UA_TYPES[UA_TYPES_UINT16], true, true, false}, + {{0, UA_STRING_STATIC("listen")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false}, + {{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false}, + {{0, UA_STRING_STATIC("reuse")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false} +}; + +typedef struct { + UA_RegisteredFD rfd; + + UA_ConnectionManager_connectionCallback applicationCB; + void *application; + void *context; +} TCP_FD; + +static void +TCP_shutdown(UA_ConnectionManager *cm, TCP_FD *conn); + +/* Do not merge packets on the socket (disable Nagle's algorithm) */ +static UA_StatusCode +TCP_setNoNagle(UA_FD sockfd) { + int val = 1; + int res = UA_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)); + if(res < 0) + return UA_STATUSCODE_BADINTERNALERROR; + return UA_STATUSCODE_GOOD; +} + +/* Test if the ConnectionManager can be stopped */ +static void +TCP_checkStopped(UA_POSIXConnectionManager *pcm) { + UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop)->elMutex, 1); + + if(pcm->fdsSize == 0 && + pcm->cm.eventSource.state == UA_EVENTSOURCESTATE_STOPPING) { + UA_LOG_DEBUG(pcm->cm.eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| All sockets closed, the EventLoop has stopped"); + pcm->cm.eventSource.state = UA_EVENTSOURCESTATE_STOPPED; + } +} + +static void +TCP_delayedClose(void *application, void *context) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)application; + UA_ConnectionManager *cm = &pcm->cm; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + TCP_FD *conn = (TCP_FD*)context; + + UA_LOCK(&el->elMutex); + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "TCP %u\t| Delayed closing of the connection", + (unsigned)conn->rfd.fd); + + /* Deregister from the EventLoop */ + UA_EventLoopPOSIX_deregisterFD(el, &conn->rfd); + + /* Deregister internally */ + ZIP_REMOVE(UA_FDTree, &pcm->fds, &conn->rfd); + UA_assert(pcm->fdsSize > 0); + pcm->fdsSize--; + + /* Signal closing to the application */ + UA_UNLOCK(&el->elMutex); + conn->applicationCB(cm, (uintptr_t)conn->rfd.fd, + conn->application, &conn->context, + UA_CONNECTIONSTATE_CLOSING, + &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + UA_LOCK(&el->elMutex); + + /* Close the socket */ + int ret = UA_close(conn->rfd.fd); + if(ret == 0) { + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Socket closed", (unsigned)conn->rfd.fd); + } else { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Could not close the socket (%s)", + (unsigned)conn->rfd.fd, errno_str)); + } + + UA_free(conn); + + /* Check if this was the last connection for a closing ConnectionManager */ + TCP_checkStopped(pcm); + + UA_UNLOCK(&el->elMutex); +} + +static int +getSockError(TCP_FD *conn) { + int error = 0; +#ifndef _WIN32 + socklen_t errlen = sizeof(int); + int err = getsockopt(conn->rfd.fd, SOL_SOCKET, SO_ERROR, &error, &errlen); +#else + int errlen = (int)sizeof(int); + int err = getsockopt((SOCKET)conn->rfd.fd, SOL_SOCKET, SO_ERROR, + (char*)&error, &errlen); +#endif + return (err == 0) ? error : err; +} + +/* Gets called when a connection socket opens, receives data or closes */ +static void +TCP_connectionSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, + short event) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + UA_LOCK_ASSERT(&el->elMutex, 1); + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Activity on the socket", + (unsigned)conn->rfd.fd); + + /* Error. The connection has closed. */ + if(event == UA_FDEVENT_ERR) { + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| The connection closes with error %i", + (unsigned)conn->rfd.fd, getSockError(conn)); + TCP_shutdown(cm, conn); + return; + } + + /* Write-Event, a new connection has opened. But some errors come as an + * out-event. For example if the remote side could not be reached to + * initiate the connection. So we check manually for error conditions on + * the socket. */ + if(event == UA_FDEVENT_OUT) { + int error = getSockError(conn); + if(error != 0) { + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| The connection closes with error %i", + (unsigned)conn->rfd.fd, error); + TCP_shutdown(cm, conn); + return; + } + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Opening a new connection", + (unsigned)conn->rfd.fd); + + /* Now we are interested in read-events. */ + conn->rfd.listenEvents = UA_FDEVENT_IN; + UA_EventLoopPOSIX_modifyFD(el, &conn->rfd); + + /* A new socket has opened. Signal it to the application. */ + UA_UNLOCK(&el->elMutex); + conn->applicationCB(cm, (uintptr_t)conn->rfd.fd, + conn->application, &conn->context, + UA_CONNECTIONSTATE_ESTABLISHED, + &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + UA_LOCK(&el->elMutex); + return; + } + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Allocate receive buffer", + (unsigned)conn->rfd.fd); + + /* Use the already allocated receive-buffer */ + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + UA_ByteString response = pcm->rxBuffer; + + /* Receive */ +#ifndef _WIN32 + ssize_t ret = UA_recv(conn->rfd.fd, (char*)response.data, + response.length, MSG_DONTWAIT); +#else + int ret = UA_recv(conn->rfd.fd, (char*)response.data, + response.length, MSG_DONTWAIT); +#endif + + /* Receive has failed */ + if(ret <= 0) { + if(UA_ERRNO == UA_INTERRUPTED || + UA_ERRNO == UA_WOULDBLOCK || + UA_ERRNO == UA_AGAIN) + return; /* Temporary error on an non-blocking socket */ + + /* Orderly shutdown of the socket */ + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| recv signaled the socket was shutdown (%s)", + (unsigned)conn->rfd.fd, errno_str)); + TCP_shutdown(cm, conn); + return; + } + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Received message of size %u", + (unsigned)conn->rfd.fd, (unsigned)ret); + + /* Callback to the application layer */ + response.length = (size_t)ret; /* Set the length of the received buffer */ + UA_UNLOCK(&el->elMutex); + conn->applicationCB(cm, (uintptr_t)conn->rfd.fd, + conn->application, &conn->context, + UA_CONNECTIONSTATE_ESTABLISHED, + &UA_KEYVALUEMAP_NULL, response); + UA_LOCK(&el->elMutex); +} + +/* Gets called when a new connection opens or if the listenSocket is closed */ +static void +TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + UA_LOCK_ASSERT(&el->elMutex, 1); + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Callback on server socket", + (unsigned)conn->rfd.fd); + + /* Try to accept a new connection */ + struct sockaddr_storage remote; + socklen_t remote_size = sizeof(remote); + UA_FD newsockfd = accept(conn->rfd.fd, (struct sockaddr*)&remote, &remote_size); + if(newsockfd == UA_INVALID_FD) { + /* Temporary error -- retry */ + if(UA_ERRNO == UA_INTERRUPTED) + return; + + /* Close the listen socket */ + if(cm->eventSource.state != UA_EVENTSOURCESTATE_STOPPING) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error %s, closing the server socket", + (unsigned)conn->rfd.fd, errno_str)); + } + + TCP_shutdown(cm, conn); + return; + } + + /* Log the name of the remote host */ + char hoststr[UA_MAXHOSTNAME_LENGTH]; + int get_res = UA_getnameinfo((struct sockaddr *)&remote, sizeof(remote), + hoststr, sizeof(hoststr), + NULL, 0, NI_NUMERICHOST); + if(get_res != 0) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| getnameinfo(...) could not resolve the " + "hostname (%s)", (unsigned)conn->rfd.fd, errno_str)); + } + UA_LOG_INFO(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Connection opened from \"%s\" via the server socket %u", + (unsigned)newsockfd, hoststr, (unsigned)conn->rfd.fd); + + /* Configure the new socket */ + UA_StatusCode res = UA_STATUSCODE_GOOD; + /* res |= UA_EventLoopPOSIX_setNonBlocking(newsockfd); Inherited from the listen-socket */ + res |= UA_EventLoopPOSIX_setNoSigPipe(newsockfd); /* Supress interrupts from the socket */ + res |= TCP_setNoNagle(newsockfd); /* Disable Nagle's algorithm */ + if(res != UA_STATUSCODE_GOOD) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error seeting the TCP options (%s)", + (unsigned)newsockfd, errno_str)); + /* Close the new socket */ + UA_close(newsockfd); + return; + } + + /* Allocate the UA_RegisteredFD */ + TCP_FD *newConn = (TCP_FD*)UA_calloc(1, sizeof(TCP_FD)); + if(!newConn) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error allocating memory for the socket", + (unsigned)newsockfd); + UA_close(newsockfd); + return; + } + + newConn->rfd.fd = newsockfd; + newConn->rfd.listenEvents = UA_FDEVENT_IN; + newConn->rfd.es = &cm->eventSource; + newConn->rfd.eventSourceCB = (UA_FDCallback)TCP_connectionSocketCallback; + newConn->applicationCB = conn->applicationCB; + newConn->application = conn->application; + newConn->context = conn->context; + + /* Register in the EventLoop. Signal to the user if registering failed. */ + res = UA_EventLoopPOSIX_registerFD(el, &newConn->rfd); + if(res != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error registering the socket", + (unsigned)newsockfd); + UA_free(newConn); + UA_close(newsockfd); + return; + } + + /* Register internally in the EventSource */ + ZIP_INSERT(UA_FDTree, &pcm->fds, &newConn->rfd); + pcm->fdsSize++; + + /* Forward the remote hostname to the application */ + UA_KeyValuePair kvp; + kvp.key = UA_QUALIFIEDNAME(0, "remote-address"); + UA_String hostName = UA_STRING(hoststr); + UA_Variant_setScalar(&kvp.value, &hostName, &UA_TYPES[UA_TYPES_STRING]); + + UA_KeyValueMap kvm; + kvm.mapSize = 1; + kvm.map = &kvp; + + /* The socket has opened. Signal it to the application. */ + UA_UNLOCK(&el->elMutex); + newConn->applicationCB(cm, (uintptr_t)newsockfd, + newConn->application, &newConn->context, + UA_CONNECTIONSTATE_ESTABLISHED, + &kvm, UA_BYTESTRING_NULL); + UA_LOCK(&el->elMutex); +} + +static UA_StatusCode +TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai, + const char *hostname, UA_UInt16 port, + void *application, void *context, + UA_ConnectionManager_connectionCallback connectionCallback, + UA_Boolean validate, UA_Boolean reuseaddr) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop; + UA_LOCK_ASSERT(&el->elMutex, 1); + + /* Translate INADDR_ANY to IPv4/IPv6 address */ + char addrstr[UA_MAXHOSTNAME_LENGTH]; + int get_res = UA_getnameinfo(ai->ai_addr, ai->ai_addrlen, + addrstr, sizeof(addrstr), NULL, 0, 0); + if(get_res != 0) { + get_res = UA_getnameinfo(ai->ai_addr, ai->ai_addrlen, + addrstr, sizeof(addrstr), + NULL, 0, NI_NUMERICHOST); + if(get_res != 0) { + addrstr[0] = 0; + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| getnameinfo(...) could not resolve the " + "hostname (%s)", errno_str)); + } + } + + /* Create the server socket */ + UA_FD listenSocket = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if(listenSocket == UA_INVALID_FD) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error opening the listen socket for " + "\"%s\" on port %u (%s)", + (unsigned)listenSocket, addrstr, port, errno_str)); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* If the INADDR_ANY is used, use the local hostname */ + char hoststr[UA_MAXHOSTNAME_LENGTH]; + if(hostname) { + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Creating listen socket for \"%s\" on port %u", + (unsigned)listenSocket, hostname, port); + } else { + gethostname(hoststr, UA_MAXHOSTNAME_LENGTH); + hostname = hoststr; + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Creating listen socket for \"%s\" " + "(with local hostname \"%s\") on port %u", + (unsigned)listenSocket, addrstr, hostname, port); + } + + /* Some Linux distributions have net.ipv6.bindv6only not activated. So + * sockets can double-bind to IPv4 and IPv6. This leads to problems. Use + * AF_INET6 sockets only for IPv6. */ + int optval = 1; +#if UA_IPV6 + if(ai->ai_family == AF_INET6 && + UA_setsockopt(listenSocket, IPPROTO_IPV6, IPV6_V6ONLY, + (const char*)&optval, sizeof(optval)) == -1) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Could not set an IPv6 socket to IPv6 only", + (unsigned)listenSocket); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } +#endif + + /* Allow rebinding to the IP/port combination. Eg. to restart the server. */ + if(reuseaddr && + UA_EventLoopPOSIX_setReusable(listenSocket) != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Could not make the socket addr reusable", + (unsigned)listenSocket); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Set the socket non-blocking */ + if(UA_EventLoopPOSIX_setNonBlocking(listenSocket) != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Could not set the socket non-blocking", + (unsigned)listenSocket); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Supress interrupts from the socket */ + if(UA_EventLoopPOSIX_setNoSigPipe(listenSocket) != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Could not disable SIGPIPE", + (unsigned)listenSocket); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Bind socket to address */ + int ret = bind(listenSocket, ai->ai_addr, (socklen_t)ai->ai_addrlen); + if(ret < 0) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error binding the socket to the address %s (%s)", + (unsigned)listenSocket, hostname, errno_str)); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Only validate, don't actually start listening */ + if(validate) { + UA_close(listenSocket); + return UA_STATUSCODE_GOOD; + } + + /* Start listening */ + if(listen(listenSocket, UA_MAXBACKLOG) < 0) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error listening on the socket (%s)", + (unsigned)listenSocket, errno_str)); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Allocate the connection */ + TCP_FD *newConn = (TCP_FD*)UA_calloc(1, sizeof(TCP_FD)); + if(!newConn) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error allocating memory for the socket", + (unsigned)listenSocket); + UA_close(listenSocket); + return UA_STATUSCODE_BADINTERNALERROR; + } + + newConn->rfd.fd = listenSocket; + newConn->rfd.listenEvents = UA_FDEVENT_IN; + newConn->rfd.es = &pcm->cm.eventSource; + newConn->rfd.eventSourceCB = (UA_FDCallback)TCP_listenSocketCallback; + newConn->applicationCB = connectionCallback; + newConn->application = application; + newConn->context = context; + + /* Register in the EventLoop */ + UA_StatusCode res = UA_EventLoopPOSIX_registerFD(el, &newConn->rfd); + if(res != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error registering the socket", + (unsigned)listenSocket); + UA_free(newConn); + UA_close(listenSocket); + return res; + } + + /* Register internally */ + ZIP_INSERT(UA_FDTree, &pcm->fds, &newConn->rfd); + pcm->fdsSize++; + + /* Set up the callback parameters */ + UA_String listenAddress = UA_STRING((char*)(uintptr_t)hostname); + UA_KeyValuePair params[2]; + params[0].key = UA_QUALIFIEDNAME(0, "listen-address"); + UA_Variant_setScalar(¶ms[0].value, &listenAddress, &UA_TYPES[UA_TYPES_STRING]); + params[1].key = UA_QUALIFIEDNAME(0, "listen-port"); + UA_Variant_setScalar(¶ms[1].value, &port, &UA_TYPES[UA_TYPES_UINT16]); + UA_KeyValueMap paramMap = {2, params}; + + /* Announce the listen-socket in the application */ + UA_UNLOCK(&el->elMutex); + connectionCallback(&pcm->cm, (uintptr_t)listenSocket, + application, &newConn->context, + UA_CONNECTIONSTATE_ESTABLISHED, + ¶mMap, UA_BYTESTRING_NULL); + UA_LOCK(&el->elMutex); + + return UA_STATUSCODE_GOOD; +} + +static UA_StatusCode +TCP_registerListenSockets(UA_POSIXConnectionManager *pcm, const char *hostname, + UA_UInt16 port, void *application, void *context, + UA_ConnectionManager_connectionCallback connectionCallback, + UA_Boolean validate, UA_Boolean reuseaddr) { + UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop)->elMutex, 1); + + /* Create a string for the port */ + char portstr[6]; + mp_snprintf(portstr, sizeof(portstr), "%d", port); + + /* Get all the interface and IPv4/6 combinations for the configured hostname */ + struct addrinfo hints, *res; + memset(&hints, 0, sizeof hints); +#if UA_IPV6 + hints.ai_family = AF_UNSPEC; /* Allow IPv4 and IPv6 */ +#else + hints.ai_family = AF_INET; /* IPv4 only */ +#endif + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_PASSIVE; + + int retcode = getaddrinfo(hostname, portstr, &hints, &res); + if(retcode != 0) { +#ifdef _WIN32 + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(pcm->cm.eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Lookup for \"%s\" on port %u failed (%s)", + hostname, port, errno_str)); +#else + UA_LOG_WARNING(pcm->cm.eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Lookup for \"%s\" on port %u failed (%s)", + hostname, port, gai_strerror(retcode)); +#endif + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Add listen sockets. Aggregate the results to see if at least one + * listen-socket was established. */ + UA_StatusCode total_result = UA_INT32_MAX; + struct addrinfo *ai = res; + while(ai) { + total_result &= TCP_registerListenSocket(pcm, ai, hostname, port, application, context, + connectionCallback, validate, reuseaddr); + ai = ai->ai_next; + } + freeaddrinfo(res); + + return total_result; +} + +/* Close the connection via a delayed callback */ +static void +TCP_shutdown(UA_ConnectionManager *cm, TCP_FD *conn) { + /* Already closing - nothing to do */ + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + UA_LOCK_ASSERT(&el->elMutex, 1); + + if(conn->rfd.dc.callback) { + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Cannot shutdown - already triggered", + (unsigned)conn->rfd.fd); + return; + } + + /* Shutdown the socket to cancel the current select/epoll */ + shutdown(conn->rfd.fd, UA_SHUT_RDWR); + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Shutdown triggered", + (unsigned)conn->rfd.fd); + + /* Add to the delayed callback list. Will be cleaned up in the next + * iteration. */ + UA_DelayedCallback *dc = &conn->rfd.dc; + dc->callback = TCP_delayedClose; + dc->application = cm; + dc->context = conn; + + /* Don't use the "public" el->addDelayedCallback. It takes a lock. */ + dc->next = el->delayedCallbacks; + el->delayedCallbacks = dc; +} + +static UA_StatusCode +TCP_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)cm->eventSource.eventLoop; + UA_LOCK(&el->elMutex); + + UA_FD fd = (UA_FD)connectionId; + TCP_FD *conn = (TCP_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd); + if(!conn) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Cannot close TCP connection %u - not found", + (unsigned)connectionId); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADNOTFOUND; + } + + TCP_shutdown(cm, conn); + + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_GOOD; +} + +static UA_StatusCode +TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, + const UA_KeyValueMap *params, UA_ByteString *buf) { + /* Don't have a lock and don't take a lock. As the connectionId is the fd, + * no need to to a lookup and access internal data strucures. */ + UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)cm->eventSource.eventLoop)->elMutex, 0); + + /* Prevent OS signals when sending to a closed socket */ + int flags = MSG_NOSIGNAL; + + struct pollfd tmp_poll_fd; + tmp_poll_fd.fd = (UA_FD)connectionId; + tmp_poll_fd.events = UA_POLLOUT; + + /* Send the full buffer. This may require several calls to send */ + size_t nWritten = 0; + do { + ssize_t n = 0; + do { + UA_LOG_DEBUG(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Attempting to send", (unsigned)connectionId); + size_t bytes_to_send = buf->length - nWritten; + n = UA_send((UA_FD)connectionId, + (const char*)buf->data + nWritten, + bytes_to_send, flags); + if(n < 0) { + /* An error we cannot recover from? */ + if(UA_ERRNO != UA_INTERRUPTED && UA_ERRNO != UA_WOULDBLOCK && + UA_ERRNO != UA_AGAIN) + goto shutdown; + + /* Poll for the socket resources to become available and retry + * (blocking) */ + int poll_ret; + do { + poll_ret = UA_poll(&tmp_poll_fd, 1, 100); + if(poll_ret < 0 && UA_ERRNO != UA_INTERRUPTED) + goto shutdown; + } while(poll_ret <= 0); + } + } while(n < 0); + nWritten += (size_t)n; + } while(nWritten < buf->length); + + /* Free the buffer */ + UA_ByteString_clear(buf); + return UA_STATUSCODE_GOOD; + + shutdown: + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_ERROR(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Send failed with error %s", + (unsigned)connectionId, errno_str)); + TCP_shutdownConnection(cm, connectionId); + UA_ByteString_clear(buf); + return UA_STATUSCODE_BADCONNECTIONCLOSED; +} + +/* Create a listen-socket that waits for incoming connections */ +static UA_StatusCode +TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *params, + void *application, void *context, + UA_ConnectionManager_connectionCallback connectionCallback, + UA_Boolean validate) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop; + UA_LOCK_ASSERT(&el->elMutex, 1); + + /* Get the port parameter */ + const UA_UInt16 *port = (const UA_UInt16*) + UA_KeyValueMap_getScalar(params, TCPConfigParameters[TCP_PARAMINDEX_PORT].name, + &UA_TYPES[UA_TYPES_UINT16]); + UA_assert(port); /* existence is checked before */ + + /* Get the address parameter */ + const UA_Variant *addrs = + UA_KeyValueMap_get(params, TCPConfigParameters[TCP_PARAMINDEX_ADDR].name); + size_t addrsSize = 0; + if(addrs) { + UA_assert(addrs->type == &UA_TYPES[UA_TYPES_STRING]); + if(UA_Variant_isScalar(addrs)) + addrsSize = 1; + else + addrsSize = addrs->arrayLength; + } + + /* Get the reuseaddr parameter */ + UA_Boolean reuseaddr = false; + const UA_Boolean *reuseaddrTmp = (const UA_Boolean*) + UA_KeyValueMap_getScalar(params, TCPConfigParameters[TCP_PARAMINDEX_REUSE].name, + &UA_TYPES[UA_TYPES_BOOLEAN]); + if(reuseaddrTmp) + reuseaddr = *reuseaddrTmp; + +#ifdef UA_ENABLE_ALLOW_REUSEADDR + reuseaddr = true; +#endif + + /* Undefined or empty addresses array -> listen on all interfaces */ + if(addrsSize == 0) { + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Listening on all interfaces"); + return TCP_registerListenSockets(pcm, NULL, *port, application, + context, connectionCallback, validate, reuseaddr); + } + + /* Iterate over the configured hostnames */ + UA_String *hostStrings = (UA_String*)addrs->data; + UA_StatusCode retval = UA_STATUSCODE_BADINTERNALERROR; + for(size_t i = 0; i < addrsSize; i++) { + char hostname[512]; + if(hostStrings[i].length >= sizeof(hostname)) + continue; + memcpy(hostname, hostStrings[i].data, hostStrings->length); + hostname[hostStrings->length] = '\0'; + if(TCP_registerListenSockets(pcm, hostname, *port, application, + context, connectionCallback, validate, reuseaddr) == UA_STATUSCODE_GOOD) + retval = UA_STATUSCODE_GOOD; + } + return retval; +} + +/* Open a TCP connection to a remote host */ +static UA_StatusCode +TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *params, + void *application, void *context, + UA_ConnectionManager_connectionCallback connectionCallback, + UA_Boolean validate) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop; + UA_LOCK_ASSERT(&el->elMutex, 1); + + /* Get the connection parameters */ + char hostname[UA_MAXHOSTNAME_LENGTH]; + char portStr[UA_MAXPORTSTR_LENGTH]; + + /* Prepare the port parameter as a string */ + const UA_UInt16 *port = (const UA_UInt16*) + UA_KeyValueMap_getScalar(params, TCPConfigParameters[TCP_PARAMINDEX_PORT].name, + &UA_TYPES[UA_TYPES_UINT16]); + UA_assert(port); /* existence is checked before */ + mp_snprintf(portStr, UA_MAXPORTSTR_LENGTH, "%d", *port); + + /* Prepare the hostname string */ + const UA_String *addr = (const UA_String*) + UA_KeyValueMap_getScalar(params, TCPConfigParameters[TCP_PARAMINDEX_ADDR].name, + &UA_TYPES[UA_TYPES_STRING]); + if(!addr) { + UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Open TCP Connection: No hostname defined, aborting"); + return UA_STATUSCODE_BADINTERNALERROR; + } + if(addr->length >= UA_MAXHOSTNAME_LENGTH) { + UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, + "TCP\t| Open TCP Connection: Hostname too long, aborting"); + return UA_STATUSCODE_BADINTERNALERROR; + } + strncpy(hostname, (const char*)addr->data, addr->length); + hostname[addr->length] = 0; + + UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Open a connection to \"%s\" on port %s", hostname, portStr); + + /* Create the socket description from the connectString + * TODO: Make this non-blocking */ + struct addrinfo hints, *info; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + int error = getaddrinfo(hostname, portStr, &hints, &info); + if(error != 0) { +#ifdef _WIN32 + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Lookup of %s failed (%s)", + hostname, errno_str)); +#else + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Lookup of %s failed (%s)", + hostname, gai_strerror(error)); +#endif + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Create a socket */ + UA_FD newSock = socket(info->ai_family, info->ai_socktype, info->ai_protocol); + if(newSock == UA_INVALID_FD) { + freeaddrinfo(info); + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Could not create socket to connect to %s (%s)", + hostname, errno_str)); + return UA_STATUSCODE_BADDISCONNECT; + } + + /* Set the socket options */ + UA_StatusCode res = UA_STATUSCODE_GOOD; + res |= UA_EventLoopPOSIX_setNonBlocking(newSock); + res |= UA_EventLoopPOSIX_setNoSigPipe(newSock); + res |= TCP_setNoNagle(newSock); + if(res != UA_STATUSCODE_GOOD) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Could not set socket options: %s", errno_str)); + freeaddrinfo(info); + UA_close(newSock); + return res; + } + + /* Only validate, don't actually open the connection */ + if(validate) { + freeaddrinfo(info); + UA_close(newSock); + return UA_STATUSCODE_GOOD; + } + + /* Non-blocking connect */ + error = UA_connect(newSock, info->ai_addr); + freeaddrinfo(info); + if(error != 0 && + UA_ERRNO != UA_INPROGRESS && + UA_ERRNO != UA_WOULDBLOCK) { + UA_LOG_SOCKET_ERRNO_WRAP( + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Connecting the socket to %s failed (%s)", + hostname, errno_str)); + UA_close(newSock); + return UA_STATUSCODE_BADDISCONNECT; + } + + /* Allocate the UA_RegisteredFD */ + TCP_FD *newConn = (TCP_FD*)UA_calloc(1, sizeof(TCP_FD)); + if(!newConn) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Error allocating memory for the socket", + (unsigned)newSock); + UA_close(newSock); + return UA_STATUSCODE_BADOUTOFMEMORY; + } + + newConn->rfd.fd = newSock; + newConn->rfd.es = &pcm->cm.eventSource; + newConn->rfd.eventSourceCB = (UA_FDCallback)TCP_connectionSocketCallback; + newConn->rfd.listenEvents = UA_FDEVENT_OUT; /* Switched to _IN once the + * connection is open */ + newConn->applicationCB = connectionCallback; + newConn->application = application; + newConn->context = context; + + /* Register the fd to trigger when output is possible (the connection is open) */ + res = UA_EventLoopPOSIX_registerFD(el, &newConn->rfd); + if(res != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Registering the socket to connect to %s failed", hostname); + UA_close(newSock); + UA_free(newConn); + return res; + } + + /* Register internally in the EventSource */ + ZIP_INSERT(UA_FDTree, &pcm->fds, &newConn->rfd); + pcm->fdsSize++; + + UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Opening a connection to \"%s\" on port %s", + (unsigned)newSock, hostname, portStr); + + /* Signal the new connection to the application as asynchonously opening */ + UA_UNLOCK(&el->elMutex); + connectionCallback(&pcm->cm, (uintptr_t)newSock, + application, &newConn->context, + UA_CONNECTIONSTATE_OPENING, &UA_KEYVALUEMAP_NULL, + UA_BYTESTRING_NULL); + UA_LOCK(&el->elMutex); + + return UA_STATUSCODE_GOOD; +} + +static UA_StatusCode +TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, + void *application, void *context, + UA_ConnectionManager_connectionCallback connectionCallback) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + UA_LOCK(&el->elMutex); + + if(cm->eventSource.state != UA_EVENTSOURCESTATE_STARTED) { + UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Cannot open a connection for a " + "ConnectionManager that is not started"); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Check the parameters */ + UA_StatusCode res = + UA_KeyValueRestriction_validate(el->eventLoop.logger, "TCP", + &TCPConfigParameters[1], + TCP_PARAMETERSSIZE-1, params); + if(res != UA_STATUSCODE_GOOD) { + UA_UNLOCK(&el->elMutex); + return res; + } + + /* Only validate the parameters? */ + UA_Boolean validate = false; + const UA_Boolean *validateParam = (const UA_Boolean*) + UA_KeyValueMap_getScalar(params, + TCPConfigParameters[TCP_PARAMINDEX_VALIDATE].name, + &UA_TYPES[UA_TYPES_BOOLEAN]); + if(validateParam) + validate = *validateParam; + + /* Listen or active connection? */ + UA_Boolean listen = false; + const UA_Boolean *listenParam = (const UA_Boolean*) + UA_KeyValueMap_getScalar(params, + TCPConfigParameters[TCP_PARAMINDEX_LISTEN].name, + &UA_TYPES[UA_TYPES_BOOLEAN]); + if(listenParam) + listen = *listenParam; + + if(listen) { + res = TCP_openPassiveConnection(pcm, params, application, context, + connectionCallback, validate); + } else { + res = TCP_openActiveConnection(pcm, params, application, context, + connectionCallback, validate); + } + + UA_UNLOCK(&el->elMutex); + return res; +} + +static UA_StatusCode +TCP_eventSourceStart(UA_ConnectionManager *cm) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + if(!el) + return UA_STATUSCODE_BADINTERNALERROR; + + UA_LOCK(&el->elMutex); + + /* Check the state */ + if(cm->eventSource.state != UA_EVENTSOURCESTATE_STOPPED) { + UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| To start the ConnectionManager, it has to be " + "registered in an EventLoop and not started yet"); + UA_UNLOCK(&el->elMutex); + return UA_STATUSCODE_BADINTERNALERROR; + } + + /* Check the parameters */ + UA_StatusCode res = + UA_KeyValueRestriction_validate(el->eventLoop.logger, "TCP", + TCPConfigParameters, 1, + &cm->eventSource.params); + if(res != UA_STATUSCODE_GOOD) + goto finish; + + /* Allocate the rx buffer */ + res = UA_EventLoopPOSIX_allocateRXBuffer(pcm); + if(res != UA_STATUSCODE_GOOD) + goto finish; + + /* Set the EventSource to the started state */ + cm->eventSource.state = UA_EVENTSOURCESTATE_STARTED; + + finish: + UA_UNLOCK(&el->elMutex); + return res; +} + +static void * +TCP_shutdownCB(void *application, UA_RegisteredFD *rfd) { + UA_ConnectionManager *cm = (UA_ConnectionManager*)application; + TCP_shutdown(cm, (TCP_FD*)rfd); + return NULL; +} + +static void +TCP_eventSourceStop(UA_ConnectionManager *cm) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + (void)el; + + UA_LOCK(&el->elMutex); + + UA_LOG_INFO(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + "TCP\t| Shutting down the ConnectionManager"); + + /* Prevent new connections to open */ + cm->eventSource.state = UA_EVENTSOURCESTATE_STOPPING; + + /* Shutdown all existing connection */ + ZIP_ITER(UA_FDTree, &pcm->fds, TCP_shutdownCB, cm); + + /* All sockets closed? Otherwise iterate some more. */ + TCP_checkStopped(pcm); + + UA_UNLOCK(&el->elMutex); +} + +static UA_StatusCode +TCP_eventSourceDelete(UA_ConnectionManager *cm) { + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; + if(cm->eventSource.state >= UA_EVENTSOURCESTATE_STARTING) { + UA_LOG_ERROR(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_EVENTLOOP, + "TCP\t| The EventSource must be stopped before it can be deleted"); + return UA_STATUSCODE_BADINTERNALERROR; + } + + UA_ByteString_clear(&pcm->rxBuffer); + UA_KeyValueMap_clear(&cm->eventSource.params); + UA_String_clear(&cm->eventSource.name); + UA_free(cm); + + return UA_STATUSCODE_GOOD; +} + +static const char *tcpName = "tcp"; + +UA_ConnectionManager * +UA_ConnectionManager_new_POSIX_TCP(const UA_String eventSourceName) { + UA_POSIXConnectionManager *cm = (UA_POSIXConnectionManager*) + UA_calloc(1, sizeof(UA_POSIXConnectionManager)); + if(!cm) + return NULL; + + cm->cm.eventSource.eventSourceType = UA_EVENTSOURCETYPE_CONNECTIONMANAGER; + UA_String_copy(&eventSourceName, &cm->cm.eventSource.name); + cm->cm.eventSource.start = (UA_StatusCode (*)(UA_EventSource *))TCP_eventSourceStart; + cm->cm.eventSource.stop = (void (*)(UA_EventSource *))TCP_eventSourceStop; + cm->cm.eventSource.free = (UA_StatusCode (*)(UA_EventSource *))TCP_eventSourceDelete; + cm->cm.protocol = UA_STRING((char*)(uintptr_t)tcpName); + cm->cm.openConnection = TCP_openConnection; + cm->cm.allocNetworkBuffer = UA_EventLoopPOSIX_allocNetworkBuffer; + cm->cm.freeNetworkBuffer = UA_EventLoopPOSIX_freeNetworkBuffer; + cm->cm.sendWithConnection = TCP_sendWithConnection; + cm->cm.closeConnection = TCP_shutdownConnection; + return &cm->cm; +} diff --git a/src/arch/posix/mbed_tcp.cpp b/src/arch/posix/mbed_tcp.cpp new file mode 100644 index 0000000..458efb1 --- /dev/null +++ b/src/arch/posix/mbed_tcp.cpp @@ -0,0 +1,74 @@ +#include "mbed_tcp.h" + +int mbed_send(UA_FD fd, const void * data, size_t size, int ignored) +{ + TCPSocket * sock = (TCPSocket *)fd; + nsapi_value_or_error_t const rc = sock->send(data, static_cast(size)); + return rc; +} + +int mbed_recv(UA_FD fd, void * data, nsapi_size_t size, int ignored) +{ + TCPSocket * sock = (TCPSocket *)fd; + nsapi_value_or_error_t const rc = sock->recv(data, size); + + if (rc == NSAPI_ERROR_OK) { + printf("mbed_recv: rc = NSAPI_ERROR_OK"); + } + else if (rc > 0) { + printf("mbed_recv: got %d on %x, data: %x, size: %d\n", rc, fd, (uint32_t)data, size); + } + else { + printf("mbed_recv error: rc = %d", rc); + } + + return rc; +} + +int mbed_close(UA_FD fd) +{ + TCPSocket * sock = (TCPSocket *)fd; + nsapi_error_t const rc = sock->close(); + return rc; +} + +int mbed_connect(UA_FD fd, SocketAddress* addr) +{ + TCPSocket * sock = (TCPSocket *)fd; + nsapi_error_t const rc = sock->connect(*addr); + return rc; +} + +int mbed_setsockopt(UA_FD fd, int level, int optname, const void *optval, unsigned optlen) +{ + TCPSocket * sock = (TCPSocket *)fd; + nsapi_error_t const rc = sock->setsockopt(level, optname, optval, optlen); + return rc; +} + +int mbed_getsockopt(UA_FD fd, int level, int optname, void *optval, unsigned *optlen) +{ + TCPSocket * sock = (TCPSocket *)fd; + nsapi_error_t const rc = sock->getsockopt(level, optname, optval, optlen); + return rc; +} + +extern rtos::EventFlags _events; +void event(Socket * s) +{ + _events.set((uint32_t)s); + //printf("Processing event: %x\n", (uint32_t)s); +} + +TCPSocket * socket(int /* family */, int /* type */, int /* proto */) +{ + TCPSocket * tcp_socket = new TCPSocket(); + tcp_socket->sigio(mbed::callback(event, tcp_socket)); + tcp_socket->open(NetworkInterface::get_default_instance()); + return tcp_socket; +} + +void freeaddrinfo(void * /* c */) +{ + /* Not implemented. */ +} diff --git a/src/arch/posix/mbed_tcp.h b/src/arch/posix/mbed_tcp.h new file mode 100644 index 0000000..9c7d818 --- /dev/null +++ b/src/arch/posix/mbed_tcp.h @@ -0,0 +1,15 @@ +#pragma once + +#include "../eventloop_posix.h" + +#include "mbed.h" + +int mbed_send(UA_FD fd, const void * data, size_t size, int ignored); +int mbed_recv(UA_FD fd, void * data, nsapi_size_t size, int ignored); +int mbed_close(UA_FD fd); +int mbed_connect(UA_FD fd, SocketAddress* addr); +int mbed_setsockopt(UA_FD fd, int level, int optname, const void *optval, unsigned optlen); +int mbed_getsockopt(UA_FD fd, int level, int optname, void *optval, unsigned *optlen); + +TCPSocket * socket(int family, int type, int proto); +void freeaddrinfo(void * c); diff --git a/src/arch/posix/ua_architecture.h b/src/arch/posix/ua_architecture.h new file mode 100644 index 0000000..bf0835e --- /dev/null +++ b/src/arch/posix/ua_architecture.h @@ -0,0 +1,60 @@ +/* This work is licensed under a Creative Commons CCZero 1.0 Universal License. + * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. + * + * Copyright 2016-2017 (c) Julius Pfrommer, Fraunhofer IOSB + * Copyright 2017 (c) Stefan Profanter, fortiss GmbH + */ + +#ifndef PLUGINS_ARCH_POSIX_UA_ARCHITECTURE_H_ +#define PLUGINS_ARCH_POSIX_UA_ARCHITECTURE_H_ + +#include "../../open62541.h" + +#ifdef UA_ARCHITECTURE_POSIX + +_UA_BEGIN_DECLS + +#define UA_IPV6 1 +#define UA_SOCKET uintptr_t +#define UA_INVALID_SOCKET -1 +#define UA_ERRNO errno +#define UA_INTERRUPTED EINTR +#define UA_AGAIN EAGAIN /* the same as wouldblock on nearly every system */ +#define UA_INPROGRESS EINPROGRESS +#define UA_WOULDBLOCK EWOULDBLOCK +#define UA_POLLIN POLLIN +#define UA_POLLOUT POLLOUT +#define UA_SHUT_RDWR SHUT_RDWR + +#define UA_getnameinfo(sa, salen, host, hostlen, serv, servlen, flags) \ + getnameinfo(sa, salen, host, hostlen, serv, servlen, flags) +#define UA_poll poll +#define UA_send mbed_send +#define UA_recv mbed_recv +#define UA_close mbed_close +#define UA_connect mbed_connect +#define UA_getsockopt mbed_getsockopt +#define UA_setsockopt mbed_setsockopt +#define UA_inet_pton inet_pton +#if UA_IPV6 +# define UA_if_nametoindex if_nametoindex +#endif + +#define UA_clean_errno(STR_FUN) (errno == 0 ? (char*) "None" : (STR_FUN)(errno)) + +#define UA_LOG_SOCKET_ERRNO_WRAP(LOG) { \ + char *errno_str = UA_clean_errno(strerror); \ + LOG; \ + errno = 0; \ +} +#define UA_LOG_SOCKET_ERRNO_GAI_WRAP(LOG) { \ + const char *errno_str = UA_clean_errno(gai_strerror); \ + LOG; \ + errno = 0; \ +} + +_UA_END_DECLS + +#endif /* UA_ARCHITECTURE_POSIX */ + +#endif /* PLUGINS_ARCH_POSIX_UA_ARCHITECTURE_H_ */ diff --git a/src/arch/mbed/ua_clock_mbed.cpp b/src/arch/posix/ua_clock.cpp similarity index 100% rename from src/arch/mbed/ua_clock_mbed.cpp rename to src/arch/posix/ua_clock.cpp diff --git a/src/arch/mbed/ua_eventloop_mbed.cpp b/src/arch/posix/ua_eventloop_mbed.cpp.bak similarity index 99% rename from src/arch/mbed/ua_eventloop_mbed.cpp rename to src/arch/posix/ua_eventloop_mbed.cpp.bak index 7208a6a..015beb0 100644 --- a/src/arch/mbed/ua_eventloop_mbed.cpp +++ b/src/arch/posix/ua_eventloop_mbed.cpp.bak @@ -6,7 +6,7 @@ * Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes) */ -#include "ua_eventloop_mbed.h" +#include "ua_eventloop_mbed.h.bak" #include extern "C" int clock_gettime(clockid_t clk_id, struct timespec *tp); diff --git a/src/arch/mbed/ua_eventloop_mbed.h b/src/arch/posix/ua_eventloop_mbed.h.bak similarity index 100% rename from src/arch/mbed/ua_eventloop_mbed.h rename to src/arch/posix/ua_eventloop_mbed.h.bak diff --git a/src/arch/mbed/ua_eventloop_mbed_fakefd.cpp b/src/arch/posix/ua_eventloop_mbed_fakefd.cpp.bak similarity index 99% rename from src/arch/mbed/ua_eventloop_mbed_fakefd.cpp rename to src/arch/posix/ua_eventloop_mbed_fakefd.cpp.bak index 9ad5f0b..df73a56 100644 --- a/src/arch/mbed/ua_eventloop_mbed_fakefd.cpp +++ b/src/arch/posix/ua_eventloop_mbed_fakefd.cpp.bak @@ -5,7 +5,7 @@ * Copyright 2021 (c) Fraunhofer IOSB (Author: Julius Pfrommer) */ -#include "ua_eventloop_mbed.h" +#include "ua_eventloop_mbed.h.bak" #if !defined(UA_HAVE_EPOLL) diff --git a/src/arch/mbed/ua_mbed_tcp.cpp b/src/arch/posix/ua_mbed_tcp.cpp.bak similarity index 99% rename from src/arch/mbed/ua_mbed_tcp.cpp rename to src/arch/posix/ua_mbed_tcp.cpp.bak index 0f59fcf..ddaa220 100644 --- a/src/arch/mbed/ua_mbed_tcp.cpp +++ b/src/arch/posix/ua_mbed_tcp.cpp.bak @@ -12,7 +12,7 @@ extern "C" { #include "unistd.h" -#include "ua_eventloop_mbed.h" +#include "ua_eventloop_mbed.h.bak" #include "../../deps/mp_printf.h" } diff --git a/src/deps/open62541_queue.h b/src/deps/open62541_queue.h new file mode 100644 index 0000000..ab20b6a --- /dev/null +++ b/src/deps/open62541_queue.h @@ -0,0 +1,648 @@ +/* $OpenBSD: queue.h,v 1.38 2013/07/03 15:05:21 fgsch Exp $ */ +/* $NetBSD: queue.h,v 1.11 1996/05/16 05:17:14 mycroft Exp $ */ + +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + */ + +#ifndef _SYS_QUEUE_H_ +#define _SYS_QUEUE_H_ + +/* + * This file defines five types of data structures: singly-linked lists, + * lists, simple queues, tail queues, and circular queues. + * + * + * A singly-linked list is headed by a single forward pointer. The elements + * are singly linked for minimum space and pointer manipulation overhead at + * the expense of O(n) removal for arbitrary elements. New elements can be + * added to the list after an existing element or at the head of the list. + * Elements being removed from the head of the list should use the explicit + * macro for this purpose for optimum efficiency. A singly-linked list may + * only be traversed in the forward direction. Singly-linked lists are ideal + * for applications with large datasets and few or no removals or for + * implementing a LIFO queue. + * + * A list is headed by a single forward pointer (or an array of forward + * pointers for a hash table header). The elements are doubly linked + * so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before + * or after an existing element or at the head of the list. A list + * may only be traversed in the forward direction. + * + * A simple queue is headed by a pair of pointers, one the head of the + * list and the other to the tail of the list. The elements are singly + * linked to save space, so elements can only be removed from the + * head of the list. New elements can be added to the list before or after + * an existing element, at the head of the list, or at the end of the + * list. A simple queue may only be traversed in the forward direction. + * + * A tail queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or + * after an existing element, at the head of the list, or at the end of + * the list. A tail queue may be traversed in either direction. + * + * A circle queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or after + * an existing element, at the head of the list, or at the end of the list. + * A circle queue may be traversed in either direction, but has a more + * complex end of list detection. + * + * For details on the use of these macros, see the queue(3) manual page. + */ + +#if defined(QUEUE_MACRO_DEBUG) || (defined(_KERNEL) && defined(DIAGNOSTIC)) +#define _Q_INVALIDATE(a) (a) = ((void *)-1) +#else +#define _Q_INVALIDATE(a) +#endif + +/* + * Singly-linked List definitions. + */ +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} + +#define SLIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define SLIST_ENTRY(type) \ +struct { \ + struct type *sle_next; /* next element */ \ +} + +/* + * Singly-linked List access methods. + */ +#define SLIST_FIRST(head) ((head)->slh_first) +#define SLIST_END(head) NULL +#define SLIST_EMPTY(head) (SLIST_FIRST(head) == SLIST_END(head)) +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) + +#define SLIST_FOREACH(var, head, field) \ + for((var) = SLIST_FIRST(head); \ + (var) != SLIST_END(head); \ + (var) = SLIST_NEXT(var, field)) + +#define SLIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = SLIST_FIRST(head); \ + (var) && ((tvar) = SLIST_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * Singly-linked List functions. + */ +#define SLIST_INIT(head) do { \ + SLIST_FIRST(head) = SLIST_END(head); \ +} while(0) + +#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \ + (elm)->field.sle_next = (slistelm)->field.sle_next; \ + (slistelm)->field.sle_next = (elm); \ +} while (0) + +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.sle_next = (head)->slh_first; \ + (head)->slh_first = (elm); \ +} while (0) + +#define SLIST_REMOVE_AFTER(elm, field) do { \ + (elm)->field.sle_next = (elm)->field.sle_next->field.sle_next; \ +} while (0) + +#define SLIST_REMOVE_HEAD(head, field) do { \ + (head)->slh_first = (head)->slh_first->field.sle_next; \ +} while (0) + +#define SLIST_REMOVE(head, elm, type, field) do { \ + if ((head)->slh_first == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } else { \ + struct type *curelm = (head)->slh_first; \ + \ + while (curelm->field.sle_next != (elm)) \ + curelm = curelm->field.sle_next; \ + curelm->field.sle_next = \ + curelm->field.sle_next->field.sle_next; \ + _Q_INVALIDATE((elm)->field.sle_next); \ + } \ +} while (0) + +/* + * List definitions. + */ +#define LIST_HEAD(name, type) \ +struct name { \ + struct type *lh_first; /* first element */ \ +} + +#define LIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define LIST_ENTRY(type) \ +struct { \ + struct type *le_next; /* next element */ \ + struct type **le_prev; /* address of previous next element */ \ +} + +/* + * List access methods + */ +#define LIST_FIRST(head) ((head)->lh_first) +#define LIST_END(head) NULL +#define LIST_EMPTY(head) (LIST_FIRST(head) == LIST_END(head)) +#define LIST_NEXT(elm, field) ((elm)->field.le_next) + +#define LIST_FOREACH(var, head, field) \ + for((var) = LIST_FIRST(head); \ + (var)!= LIST_END(head); \ + (var) = LIST_NEXT(var, field)) + +#define LIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = LIST_FIRST(head); \ + (var) && ((tvar) = LIST_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * List functions. + */ +#define LIST_INIT(head) do { \ + LIST_FIRST(head) = LIST_END(head); \ +} while (0) + +#define LIST_INSERT_AFTER(listelm, elm, field) do { \ + if (((elm)->field.le_next = (listelm)->field.le_next) != NULL) \ + (listelm)->field.le_next->field.le_prev = \ + &(elm)->field.le_next; \ + (listelm)->field.le_next = (elm); \ + (elm)->field.le_prev = &(listelm)->field.le_next; \ +} while (0) + +#define LIST_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.le_prev = (listelm)->field.le_prev; \ + (elm)->field.le_next = (listelm); \ + *(listelm)->field.le_prev = (elm); \ + (listelm)->field.le_prev = &(elm)->field.le_next; \ +} while (0) + +#define LIST_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.le_next = (head)->lh_first) != NULL) \ + (head)->lh_first->field.le_prev = &(elm)->field.le_next;\ + (head)->lh_first = (elm); \ + (elm)->field.le_prev = &(head)->lh_first; \ +} while (0) + +#define LIST_REMOVE(elm, field) do { \ + if ((elm)->field.le_next != NULL) \ + (elm)->field.le_next->field.le_prev = \ + (elm)->field.le_prev; \ + *(elm)->field.le_prev = (elm)->field.le_next; \ + _Q_INVALIDATE((elm)->field.le_prev); \ + _Q_INVALIDATE((elm)->field.le_next); \ +} while (0) + +#define LIST_REPLACE(elm, elm2, field) do { \ + if (((elm2)->field.le_next = (elm)->field.le_next) != NULL) \ + (elm2)->field.le_next->field.le_prev = \ + &(elm2)->field.le_next; \ + (elm2)->field.le_prev = (elm)->field.le_prev; \ + *(elm2)->field.le_prev = (elm2); \ + _Q_INVALIDATE((elm)->field.le_prev); \ + _Q_INVALIDATE((elm)->field.le_next); \ +} while (0) + +/* + * Simple queue definitions. + */ +#define SIMPLEQ_HEAD(name, type) \ +struct name { \ + struct type *sqh_first; /* first element */ \ + struct type **sqh_last; /* addr of last next element */ \ +} + +#define SIMPLEQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).sqh_first } + +#define SIMPLEQ_ENTRY(type) \ +struct { \ + struct type *sqe_next; /* next element */ \ +} + +/* + * Simple queue access methods. + */ +#define SIMPLEQ_FIRST(head) ((head)->sqh_first) +#define SIMPLEQ_END(head) NULL +#define SIMPLEQ_EMPTY(head) (SIMPLEQ_FIRST(head) == SIMPLEQ_END(head)) +#define SIMPLEQ_NEXT(elm, field) ((elm)->field.sqe_next) + +#define SIMPLEQ_FOREACH(var, head, field) \ + for((var) = SIMPLEQ_FIRST(head); \ + (var) != SIMPLEQ_END(head); \ + (var) = SIMPLEQ_NEXT(var, field)) + +#define SIMPLEQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = SIMPLEQ_FIRST(head); \ + (var) && ((tvar) = SIMPLEQ_NEXT(var, field), 1); \ + (var) = (tvar)) + +/* + * Simple queue functions. + */ +#define SIMPLEQ_INIT(head) do { \ + (head)->sqh_first = NULL; \ + (head)->sqh_last = &(head)->sqh_first; \ +} while (0) + +#define SIMPLEQ_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.sqe_next = (head)->sqh_first) == NULL) \ + (head)->sqh_last = &(elm)->field.sqe_next; \ + (head)->sqh_first = (elm); \ +} while (0) + +#define SIMPLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.sqe_next = NULL; \ + *(head)->sqh_last = (elm); \ + (head)->sqh_last = &(elm)->field.sqe_next; \ +} while (0) + +#define SIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if (((elm)->field.sqe_next = (listelm)->field.sqe_next) == NULL)\ + (head)->sqh_last = &(elm)->field.sqe_next; \ + (listelm)->field.sqe_next = (elm); \ +} while (0) + +#define SIMPLEQ_REMOVE_HEAD(head, field) do { \ + if (((head)->sqh_first = (head)->sqh_first->field.sqe_next) == NULL) \ + (head)->sqh_last = &(head)->sqh_first; \ +} while (0) + +#define SIMPLEQ_REMOVE_AFTER(head, elm, field) do { \ + if (((elm)->field.sqe_next = (elm)->field.sqe_next->field.sqe_next) \ + == NULL) \ + (head)->sqh_last = &(elm)->field.sqe_next; \ +} while (0) + +/* + * XOR Simple queue definitions. + */ +#define XSIMPLEQ_HEAD(name, type) \ +struct name { \ + struct type *sqx_first; /* first element */ \ + struct type **sqx_last; /* addr of last next element */ \ + unsigned long sqx_cookie; \ +} + +#define XSIMPLEQ_ENTRY(type) \ +struct { \ + struct type *sqx_next; /* next element */ \ +} + +/* + * XOR Simple queue access methods. + */ +#define XSIMPLEQ_XOR(head, ptr) ((__typeof(ptr))((head)->sqx_cookie ^ \ + (unsigned long)(ptr))) +#define XSIMPLEQ_FIRST(head) XSIMPLEQ_XOR(head, ((head)->sqx_first)) +#define XSIMPLEQ_END(head) NULL +#define XSIMPLEQ_EMPTY(head) (XSIMPLEQ_FIRST(head) == XSIMPLEQ_END(head)) +#define XSIMPLEQ_NEXT(head, elm, field) XSIMPLEQ_XOR(head, ((elm)->field.sqx_next)) + + +#define XSIMPLEQ_FOREACH(var, head, field) \ + for ((var) = XSIMPLEQ_FIRST(head); \ + (var) != XSIMPLEQ_END(head); \ + (var) = XSIMPLEQ_NEXT(head, var, field)) + +#define XSIMPLEQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = XSIMPLEQ_FIRST(head); \ + (var) && ((tvar) = XSIMPLEQ_NEXT(head, var, field), 1); \ + (var) = (tvar)) + +/* + * XOR Simple queue functions. + */ +#define XSIMPLEQ_INIT(head) do { \ + arc4random_buf(&(head)->sqx_cookie, sizeof((head)->sqx_cookie)); \ + (head)->sqx_first = XSIMPLEQ_XOR(head, NULL); \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(head)->sqx_first); \ +} while (0) + +#define XSIMPLEQ_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.sqx_next = (head)->sqx_first) == \ + XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ + (head)->sqx_first = XSIMPLEQ_XOR(head, (elm)); \ +} while (0) + +#define XSIMPLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.sqx_next = XSIMPLEQ_XOR(head, NULL); \ + *(XSIMPLEQ_XOR(head, (head)->sqx_last)) = XSIMPLEQ_XOR(head, (elm)); \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ +} while (0) + +#define XSIMPLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if (((elm)->field.sqx_next = (listelm)->field.sqx_next) == \ + XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ + (listelm)->field.sqx_next = XSIMPLEQ_XOR(head, (elm)); \ +} while (0) + +#define XSIMPLEQ_REMOVE_HEAD(head, field) do { \ + if (((head)->sqx_first = XSIMPLEQ_XOR(head, \ + (head)->sqx_first)->field.sqx_next) == XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = XSIMPLEQ_XOR(head, &(head)->sqx_first); \ +} while (0) + +#define XSIMPLEQ_REMOVE_AFTER(head, elm, field) do { \ + if (((elm)->field.sqx_next = XSIMPLEQ_XOR(head, \ + (elm)->field.sqx_next)->field.sqx_next) \ + == XSIMPLEQ_XOR(head, NULL)) \ + (head)->sqx_last = \ + XSIMPLEQ_XOR(head, &(elm)->field.sqx_next); \ +} while (0) + + +/* + * Tail queue definitions. + */ +#define TAILQ_HEAD(name, type) \ +struct name { \ + struct type *tqh_first; /* first element */ \ + struct type **tqh_last; /* addr of last next element */ \ +} + +#define TAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).tqh_first } + +#define TAILQ_ENTRY(type) \ +struct { \ + struct type *tqe_next; /* next element */ \ + struct type **tqe_prev; /* address of previous next element */ \ +} + +/* + * tail queue access methods + */ +#define TAILQ_FIRST(head) ((head)->tqh_first) +#define TAILQ_END(head) NULL +#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) +#define TAILQ_LAST(head, headname) \ + (*(((struct headname *)((head)->tqh_last))->tqh_last)) +/* XXX */ +#define TAILQ_PREV(elm, headname, field) \ + (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last)) +#define TAILQ_EMPTY(head) \ + (TAILQ_FIRST(head) == TAILQ_END(head)) + +#define TAILQ_FOREACH(var, head, field) \ + for((var) = TAILQ_FIRST(head); \ + (var) != TAILQ_END(head); \ + (var) = TAILQ_NEXT(var, field)) + +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST(head); \ + (var) != TAILQ_END(head) && \ + ((tvar) = TAILQ_NEXT(var, field), 1); \ + (var) = (tvar)) + + +#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \ + for((var) = TAILQ_LAST(head, headname); \ + (var) != TAILQ_END(head); \ + (var) = TAILQ_PREV(var, headname, field)) + +#define TAILQ_FOREACH_REVERSE_SAFE(var, head, headname, field, tvar) \ + for ((var) = TAILQ_LAST(head, headname); \ + (var) != TAILQ_END(head) && \ + ((tvar) = TAILQ_PREV(var, headname, field), 1); \ + (var) = (tvar)) + +/* + * Tail queue functions. + */ +#define TAILQ_INIT(head) do { \ + (head)->tqh_first = NULL; \ + (head)->tqh_last = &(head)->tqh_first; \ +} while (0) + +#define TAILQ_INSERT_HEAD(head, elm, field) do { \ + if (((elm)->field.tqe_next = (head)->tqh_first) != NULL) \ + (head)->tqh_first->field.tqe_prev = \ + &(elm)->field.tqe_next; \ + else \ + (head)->tqh_last = &(elm)->field.tqe_next; \ + (head)->tqh_first = (elm); \ + (elm)->field.tqe_prev = &(head)->tqh_first; \ +} while (0) + +#define TAILQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.tqe_next = NULL; \ + (elm)->field.tqe_prev = (head)->tqh_last; \ + *(head)->tqh_last = (elm); \ + (head)->tqh_last = &(elm)->field.tqe_next; \ +} while (0) + +#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if (((elm)->field.tqe_next = (listelm)->field.tqe_next) != NULL)\ + (elm)->field.tqe_next->field.tqe_prev = \ + &(elm)->field.tqe_next; \ + else \ + (head)->tqh_last = &(elm)->field.tqe_next; \ + (listelm)->field.tqe_next = (elm); \ + (elm)->field.tqe_prev = &(listelm)->field.tqe_next; \ +} while (0) + +#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.tqe_prev = (listelm)->field.tqe_prev; \ + (elm)->field.tqe_next = (listelm); \ + *(listelm)->field.tqe_prev = (elm); \ + (listelm)->field.tqe_prev = &(elm)->field.tqe_next; \ +} while (0) + +#define TAILQ_REMOVE(head, elm, field) do { \ + if (((elm)->field.tqe_next) != NULL) \ + (elm)->field.tqe_next->field.tqe_prev = \ + (elm)->field.tqe_prev; \ + else \ + (head)->tqh_last = (elm)->field.tqe_prev; \ + *(elm)->field.tqe_prev = (elm)->field.tqe_next; \ + _Q_INVALIDATE((elm)->field.tqe_prev); \ + _Q_INVALIDATE((elm)->field.tqe_next); \ +} while (0) + +#define TAILQ_REPLACE(head, elm, elm2, field) do { \ + if (((elm2)->field.tqe_next = (elm)->field.tqe_next) != NULL) \ + (elm2)->field.tqe_next->field.tqe_prev = \ + &(elm2)->field.tqe_next; \ + else \ + (head)->tqh_last = &(elm2)->field.tqe_next; \ + (elm2)->field.tqe_prev = (elm)->field.tqe_prev; \ + *(elm2)->field.tqe_prev = (elm2); \ + _Q_INVALIDATE((elm)->field.tqe_prev); \ + _Q_INVALIDATE((elm)->field.tqe_next); \ +} while (0) + +/* + * Circular queue definitions. + */ +#define CIRCLEQ_HEAD(name, type) \ +struct name { \ + struct type *cqh_first; /* first element */ \ + struct type *cqh_last; /* last element */ \ +} + +#define CIRCLEQ_HEAD_INITIALIZER(head) \ + { CIRCLEQ_END(&head), CIRCLEQ_END(&head) } + +#define CIRCLEQ_ENTRY(type) \ +struct { \ + struct type *cqe_next; /* next element */ \ + struct type *cqe_prev; /* previous element */ \ +} + +/* + * Circular queue access methods + */ +#define CIRCLEQ_FIRST(head) ((head)->cqh_first) +#define CIRCLEQ_LAST(head) ((head)->cqh_last) +#define CIRCLEQ_END(head) ((void *)(head)) +#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next) +#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev) +#define CIRCLEQ_EMPTY(head) \ + (CIRCLEQ_FIRST(head) == CIRCLEQ_END(head)) + +#define CIRCLEQ_FOREACH(var, head, field) \ + for((var) = CIRCLEQ_FIRST(head); \ + (var) != CIRCLEQ_END(head); \ + (var) = CIRCLEQ_NEXT(var, field)) + +#define CIRCLEQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = CIRCLEQ_FIRST(head); \ + (var) != CIRCLEQ_END(head) && \ + ((tvar) = CIRCLEQ_NEXT(var, field), 1); \ + (var) = (tvar)) + +#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \ + for((var) = CIRCLEQ_LAST(head); \ + (var) != CIRCLEQ_END(head); \ + (var) = CIRCLEQ_PREV(var, field)) + +#define CIRCLEQ_FOREACH_REVERSE_SAFE(var, head, headname, field, tvar) \ + for ((var) = CIRCLEQ_LAST(head, headname); \ + (var) != CIRCLEQ_END(head) && \ + ((tvar) = CIRCLEQ_PREV(var, headname, field), 1); \ + (var) = (tvar)) + +/* + * Circular queue functions. + */ +#define CIRCLEQ_INIT(head) do { \ + (head)->cqh_first = CIRCLEQ_END(head); \ + (head)->cqh_last = CIRCLEQ_END(head); \ +} while (0) + +#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + (elm)->field.cqe_next = (listelm)->field.cqe_next; \ + (elm)->field.cqe_prev = (listelm); \ + if ((listelm)->field.cqe_next == CIRCLEQ_END(head)) \ + (head)->cqh_last = (elm); \ + else \ + (listelm)->field.cqe_next->field.cqe_prev = (elm); \ + (listelm)->field.cqe_next = (elm); \ +} while (0) + +#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \ + (elm)->field.cqe_next = (listelm); \ + (elm)->field.cqe_prev = (listelm)->field.cqe_prev; \ + if ((listelm)->field.cqe_prev == CIRCLEQ_END(head)) \ + (head)->cqh_first = (elm); \ + else \ + (listelm)->field.cqe_prev->field.cqe_next = (elm); \ + (listelm)->field.cqe_prev = (elm); \ +} while (0) + +#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.cqe_next = (head)->cqh_first; \ + (elm)->field.cqe_prev = CIRCLEQ_END(head); \ + if ((head)->cqh_last == CIRCLEQ_END(head)) \ + (head)->cqh_last = (elm); \ + else \ + (head)->cqh_first->field.cqe_prev = (elm); \ + (head)->cqh_first = (elm); \ +} while (0) + +#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.cqe_next = CIRCLEQ_END(head); \ + (elm)->field.cqe_prev = (head)->cqh_last; \ + if ((head)->cqh_first == CIRCLEQ_END(head)) \ + (head)->cqh_first = (elm); \ + else \ + (head)->cqh_last->field.cqe_next = (elm); \ + (head)->cqh_last = (elm); \ +} while (0) + +#define CIRCLEQ_REMOVE(head, elm, field) do { \ + if ((elm)->field.cqe_next == CIRCLEQ_END(head)) \ + (head)->cqh_last = (elm)->field.cqe_prev; \ + else \ + (elm)->field.cqe_next->field.cqe_prev = \ + (elm)->field.cqe_prev; \ + if ((elm)->field.cqe_prev == CIRCLEQ_END(head)) \ + (head)->cqh_first = (elm)->field.cqe_next; \ + else \ + (elm)->field.cqe_prev->field.cqe_next = \ + (elm)->field.cqe_next; \ + _Q_INVALIDATE((elm)->field.cqe_prev); \ + _Q_INVALIDATE((elm)->field.cqe_next); \ +} while (0) + +#define CIRCLEQ_REPLACE(head, elm, elm2, field) do { \ + if (((elm2)->field.cqe_next = (elm)->field.cqe_next) == \ + CIRCLEQ_END(head)) \ + (head)->cqh_last = (elm2); \ + else \ + (elm2)->field.cqe_next->field.cqe_prev = (elm2); \ + if (((elm2)->field.cqe_prev = (elm)->field.cqe_prev) == \ + CIRCLEQ_END(head)) \ + (head)->cqh_first = (elm2); \ + else \ + (elm2)->field.cqe_prev->field.cqe_next = (elm2); \ + _Q_INVALIDATE((elm)->field.cqe_prev); \ + _Q_INVALIDATE((elm)->field.cqe_next); \ +} while (0) + +#endif /* !_SYS_QUEUE_H_ */