diff --git a/docs/API/API-functions.md b/docs/API/API-functions.md index 74fbc506f..753395af6 100644 --- a/docs/API/API-functions.md +++ b/docs/API/API-functions.md @@ -629,41 +629,109 @@ the listener socket to accept group connections SRTSOCKET srt_accept(SRTSOCKET lsn, struct sockaddr* addr, int* addrlen); ``` -Accepts a pending connection, then creates and returns a new socket or -group ID that handles this connection. The group and socket can be -distinguished by checking the `SRTGROUP_MASK` bit on the returned ID. - -* `lsn`: the listener socket previously configured by [`srt_listen`](#srt_listen) -* `addr`: the IP address and port specification for the remote party +Extracts the first connection request on the queue of pending connections for +the listening socket, `lsn`, then creates and returns a new socket or group ID +that handles this connection. The group and socket can be distinguished by +checking the `SRTGROUP_MASK` bit on the returned ID. Note that by default group +connections will be rejected - this feature can be only enabled on demand (see +below). + +* `lsn`: the listening socket +* `addr`: a location to store the remote IP address and port for the connection * `addrlen`: INPUT: size of `addr` pointed object. OUTPUT: real size of the returned object -**NOTE:** `addr` is allowed to be NULL, in which case it's understood that the -application is not interested in the address from which the connection originated. -Otherwise `addr` should specify an object into which the address will be written, -and `addrlen` must also specify a variable to contain the object size. Note also -that in the case of group connection only the initial connection that -establishes the group connection is returned, together with its address. As -member connections are added or broken within the group, you can obtain this -information through [`srt_group_data`](#srt_group_data) or the data filled by -[`srt_sendmsg2`](#srt_sendmsg) and [`srt_recvmsg2`](#srt_recvmsg2). - -If the `lsn` listener socket is configured for blocking mode -([`SRTO_RCVSYN`](API-socket-options.md#SRTO_RCVSYN) set to true, default), -the call will block until the incoming connection is ready. Otherwise, the -call always returns immediately. The `SRT_EPOLL_IN` epoll event should be -checked on the `lsn` socket prior to calling this function in that case. - -If the pending connection is a group connection (initiated on the peer side by -calling the connection function using a group ID, and permitted on the listener -socket by the [`SRTO_GROUPCONNECT`](API-socket-options.md#SRTO_GROUPCONNECT) -flag), then the value returned is a group ID. This function then creates a new -group, as well as a new socket for this connection, that will be added to the -group. Once the group is created this way, further connections within the same -group, as well as sockets for them, will be created in the background. The -[`SRT_EPOLL_UPDATE`](#SRT_EPOLL_UPDATE) event is raised on the `lsn` socket when -a new background connection is attached to the group, although it's usually for -internal use only. +General requirements for a parameter correctness: + +* `lsn` must be first [bound](#srt_bind) and [listening](#srt_listen) + +* `addr` may be NULL, or otherwise it must be a pointer to an object +that can be treated as an instance of `sockaddr_in` or `sockaddr_in6` + +* `addrlen` should be a pointer to a variable set to the size of the object +specified in `addr`, if `addr` is not NULL. Otherwise it's ignored. + +If `addr` is not NULL, the information about the source IP address and +port of the peer will be written into this object. Note that whichever +type of object is expected here (`sockaddr_in` or `sockaddr_in6`), it +depends on the address type used in the `srt_bind` call for `lsn`. +If unsure in a particular situation, it is recommended that you use +`sockaddr_storage` or `srt::sockaddr_any`. + +If the `lsn` listener socket is in the blocking mode (if +[`SRTO_RCVSYN`](API-socket-options.md#SRTO_RCVSYN) is set to true, +which is default), the call will block until the incoming connection is ready +for extraction. Otherwise, the call always returns immediately, possibly with +failure, if there was no pending connection waiting on the listening socket +`lsn`. + +The listener socket can be checked for any pending connections prior to calling +`srt_accept` by checking the `SRT_EPOLL_ACCEPT` epoll event (which is an alias +to `SRT_EPOLL_IN`). This event might be spurious in certain cases though, for +example, when the connection has been closed by the peer or broken before the +application extracts it. The call to `srt_accept` would then still fail in +such a case. + +In order to allow the listening socket `lsn` to accept a group connection, +the [`SRTO_GROUPCONNECT`](API-socket-options.md#SRTO_GROUPCONNECT) socket option +for the listening socket must be set to 1. Note that single socket connections +can still be reported to that socket. The application can distinguish the socket +and group connection by checking the `SRTGROUP_MASK` bit on the returned +successful value. There are some important differences to single socket +connections: + +1. Accepting a group connection can be done only once per connection, even +though particular member connections can get broken or established while +the group is connected. The actual connection reporter (listener) is a socket, +like before, but once you call `srt_accept` and receive this group ID, it is +the group considered connected, and any member connections of the same group +will be handled in the background. + +2. If a group was extracted from the `srt_accept` call, the address reported in +`addr` parameter is still the address of the connection that has triggered the +group connection extraction. The information about all member links in the +group at the moment can be obtained at any time through +[`srt_group_data`](#srt_group_data) or the data filled by +[`srt_sendmsg2`](#srt_sendmsg2) and [`srt_recvmsg2`](#srt_recvmsg2) +in the [`SRT_MSGCTRL`](#SRT_MSGCTRL) structure. + +3. Listening sockets are not bound to groups anyhow. You can allow multiple +listening sockets to accept group connections and the connection extracted +from the listener, if it is declared to be a group member, will join its +group, no matter which of the listening sockets has received the connection +request. This feature is prone to more tricky rules, however: + + * If you use multiple listener sockets, all of them in blocking mode, + allowed for group connections, and receiving connection requests for + the same group at the moment, and you run one thread per `srt_accept` + call, it is undefined, which of them will extract the group ID + for the connection, but still only one will, while the others will + continue blocking. If you want to use only one thread for accepting + connections from potentially multiple listening sockets in the blocking + mode, you should use [`srt_accept_bond`](#srt_accept_bond) instead. + Note though that this function is actually a wrapper that changes locally + to the nonblocking mode on all these listeners and uses epoll internally. + + * If at the moment multiple listener sockets have received connection + request and you query them all for readiness epoll flags (by calling + an epoll waiting function), all of them will get the `SRT_EPOLL_ACCEPT` + flag set, but still only one of them will return the group ID from the + `srt_accept` call. After this call, from all listener sockets in the + whole application the `SRT_EPOLL_ACCEPT` flag, that was set by the reason + of a pending connection for the same group, will be withdrawn (that is, + it will be cleared if there are no other pending connections). This is + then yet another situation when this flag can be spurious. + +4. If you query a listening socket for epoll flags after the `srt_accept` +function has once returned the group ID, the listening sockets that have +received new member connection requests within that group will report only the +[`SRT_EPOLL_UPDATE`](#SRT_EPOLL_UPDATE) flag. This flag is edge-triggered-only +because there is no operation you can perform in response in order to clear +this flag. This flag is mostly used internally and the application may use it +if it would like to trigger updating the current group information due to +having one newly added member connection. + + | Returns | | |:----------------------------- |:----------------------------------------------------------------------- | @@ -673,7 +741,7 @@ internal use only. | Errors | | |:--------------------------------- |:----------------------------------------------------------------------- | -| [`SRT_EINVPARAM`](#srt_einvparam) | NULL specified as `addrlen`, when `addr` is not NULL | +| [`SRT_EINVPARAM`](#srt_einvparam) | Invalid `addr` or `addrlen` (see requirements in the begininng) | | [`SRT_EINVSOCK`](#srt_einvsock) | `lsn` designates no valid socket ID. | | [`SRT_ENOLISTEN`](#srt_enolisten) | `lsn` is not set up as a listener ([`srt_listen`](#srt_listen) not called). | | [`SRT_EASYNCRCV`](#srt_easyncrcv) | No connection reported so far. This error is reported only in the non-blocking mode | diff --git a/docs/API/API-socket-options.md b/docs/API/API-socket-options.md index a06f8556d..438b90637 100644 --- a/docs/API/API-socket-options.md +++ b/docs/API/API-socket-options.md @@ -505,6 +505,11 @@ allowed must take this into consideration. It's up to the caller of this function to make this distinction and to take appropriate action depending on the type of entity returned. +Note: this flag should be altered **before** calling `srt_listen`. If you do +this after this call, you might have some pending group connections in the +meantime that will be rejected because group connections are not **yet** +allowed on this listener socket. + When this flag is set to 1 on an accepted socket that is passed to the listener callback handler, it means that this socket is created for a group connection and it will become a member of a group. Note that in this case diff --git a/srtcore/api.cpp b/srtcore/api.cpp index bb5dd64fe..0c6df0ac0 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -681,6 +681,17 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, goto ERR_ROLLBACK; } + // Acceptance of the group will have to be done through accepting + // of one of the pending sockets. There can be, however, multiple + // such sockets at a time, some of them might get broken before + // being accepted, and therefore we need to make all sockets ready. + // But then, acceptance of a group may happen only once, so if any + // sockets of the same group were submitted to accept, they must + // be removed from the accept queue at this time. + should_submit_to_accept = g->groupPending(); + + /* XXX remove if no longer informational + // Check if this is the first socket in the group. // If so, give it up to accept, otherwise just do nothing // The client will be informed about the newly added connection at the @@ -696,6 +707,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, break; } } + */ // Update the status in the group so that the next // operation can include the socket in the group operation. @@ -708,11 +720,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, gm->rcvstate = SRT_GST_IDLE; gm->laststatus = SRTS_CONNECTED; - if (!g->m_bConnected) - { - HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED"); - g->m_bConnected = true; - } + g->setGroupConnected(); // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily, // but groupwise connections could be accepted from multiple listeners for the same group! @@ -797,6 +805,15 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, // acknowledge INTERNAL users waiting for new connections on the listening socket // that are reported when a new socket is connected within an already connected group. m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true); +#if ENABLE_BONDING + // Note that the code in this current IF branch can only be executed in case + // of group members. Otherwise should_submit_to_accept will be always true. + if (ns->m_GroupOf) + { + HLOGC(gmlog.Debug, log << "GROUP UPDATE $" << ns->m_GroupOf->id() << " per connected socket @" << ns->m_SocketID); + m_EPoll.update_events(ns->m_GroupOf->id(), ns->m_GroupOf->m_sPollID, SRT_EPOLL_UPDATE, true); + } +#endif CGlobEvent::triggerEvent(); } @@ -843,6 +860,68 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, return 1; } +SRT_EPOLL_T srt::CUDTSocket::getListenerEvents() +{ + // You need to check EVERY socket that has been queued + // and verify its internals. With independent socket the + // matter is simple - if it's present, you light up the + // SRT_EPOLL_ACCEPT flag. + +#if !ENABLE_BONDING + ScopedLock accept_lock (m_AcceptLock); + + // Make it simplified here - nonempty container = have acceptable sockets. + // Might make sometimes spurious acceptance, but this can also happen when + // the incoming accepted socket was suddenly broken. + return m_QueuedSockets.empty() ? 0 : int(SRT_EPOLL_ACCEPT); + +#else // Could do #endif here, but the compiler would complain about unreachable code. + + map sockets_copy; + { + ScopedLock accept_lock (m_AcceptLock); + sockets_copy = m_QueuedSockets; + } + return CUDT::uglobal().checkQueuedSocketsEvents(sockets_copy); + +#endif +} + +#if ENABLE_BONDING +int srt::CUDTUnited::checkQueuedSocketsEvents(const map& sockets) +{ + SRT_EPOLL_T flags = 0; + + // But with the member sockets an appropriate check must be + // done first: if this socket belongs to a group that is + // already in the connected state, you should light up the + // SRT_EPOLL_UPDATE flag instead. This flag is only for + // internal informing the waiters on the listening sockets + // that they should re-read the group list and re-check readiness. + + // Now we can do lock once and for all + for (map::const_iterator i = sockets.begin(); i != sockets.end(); ++i) + { + CUDTSocket* s = locateSocket_LOCKED(i->first); + if (!s) + continue; // wiped in the meantime - ignore + + // If this pending socket is a group member, but the group + // to which it belongs is NOT waiting to be accepted, then + // light up the UPDATE event only. Light up ACCEPT only if + // this is a single socket, or this single socket has turned + // the mirror group to be first time available for accept(), + // and this accept() hasn't been done yet. + if (s->m_GroupOf && !s->m_GroupOf->groupPending()) + flags |= SRT_EPOLL_UPDATE; + else + flags |= SRT_EPOLL_ACCEPT; + } + + return flags; +} +#endif + // static forwarder int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq) { @@ -1176,12 +1255,16 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int // it's a theoretically possible scenario if (s->m_GroupOf) { - u = s->m_GroupOf->m_GroupID; - s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure - + CUDTGroup* g = s->m_GroupOf; // Mark the beginning of the connection at the moment // when the group ID is returned to the app caller - s->m_GroupOf->m_stats.tsLastSampleTime = steady_clock::now(); + g->m_stats.tsLastSampleTime = steady_clock::now(); + + HLOGC(cnlog.Debug, log << "accept: reporting group $" << g->m_GroupID << " instead of member socket @" << u); + u = g->m_GroupID; + s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure + g->m_bPending = false; + CUDT::uglobal().removePendingForGroup(g); } else { @@ -1201,6 +1284,79 @@ SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int return u; } +#if ENABLE_BONDING + +// [[using locked(m_GlobControlLock)]] +void srt::CUDTUnited::removePendingForGroup(const CUDTGroup* g) +{ + // We don't have a list of listener sockets that have ever + // reported a pending connection for a group, so the only + // way to find them is to ride over the list of all sockets... + + list members; + g->getMemberSockets((members)); + + for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i) + { + CUDTSocket* s = i->second; + // Check if any of them is a listener socket... + + /* XXX This is left for information only that we are only + interested with listener sockets - with the current + implementation checking it is pointless because the + m_QueuedSockets structure is present in every socket + anyway even if it's not a listener, and only listener + sockets may have this container nonempty. So checking + the container should suffice. + + if (!s->core().m_bListening) + continue; + */ + + if (s->m_QueuedSockets.empty()) + continue; + + // Somehow fortunate for us that it's a set, so we + // can simply check if this allegedly listener socket + // contains any of them. + for (list::iterator m = members.begin(), mx = m; m != members.end(); m = mx) + { + ++mx; + std::map::iterator q = s->m_QueuedSockets.find(*m); + if (q != s->m_QueuedSockets.end()) + { + HLOGC(cnlog.Debug, log << "accept: listener @" << s->m_SocketID + << " had ququed member @" << *m << " -- removed"); + // Found an intersection socket. + // Remove it from the listener queue + s->m_QueuedSockets.erase(q); + + // NOTE ALSO that after this removal the queue may be EMPTY, + // and if so, the listener socket should be no longer ready for accept. + if (s->m_QueuedSockets.empty()) + { + m_EPoll.update_events(s->m_SocketID, s->core().m_sPollID, SRT_EPOLL_ACCEPT, false); + } + + // and remove it also from the members list. + // This can be done safely because we use a SAFE LOOP. + // We can also do it safely because a socket may be + // present in only one listener socket in the whole app. + members.erase(m); + } + } + + // It may happen that the list of members can be + // eventually purged even if we haven't checked every socket. + // If it happens so, quit immediately because there's nothing + // left to do. + if (members.empty()) + return; + } +} + +#endif + int srt::CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen) { // Here both srcname and tarname must be specified @@ -2391,14 +2547,19 @@ int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int } #endif - CUDTSocket* s = locateSocket(u); - if (s) + // The call to epoll_add_usock_INTERNAL is expected + // to be called under m_GlobControlLock, so use this lock here, too. { - ret = epoll_add_usock_INTERNAL(eid, s, events); - } - else - { - throw CUDTException(MJ_NOTSUP, MN_SIDINVAL); + ScopedLock cs (m_GlobControlLock); + CUDTSocket* s = locateSocket_LOCKED(u); + if (s) + { + ret = epoll_add_usock_INTERNAL(eid, s, events); + } + else + { + throw CUDTException(MJ_NOTSUP, MN_SIDINVAL); + } } return ret; diff --git a/srtcore/api.h b/srtcore/api.h index 48e7827f8..0e5272ccf 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -171,6 +171,8 @@ class CUDTSocket unsigned int m_uiBackLog; //< maximum number of connections in queue + SRT_EPOLL_T getListenerEvents(); + // XXX A refactoring might be needed here. // There are no reasons found why the socket can't contain a list iterator to a @@ -286,6 +288,12 @@ class CUDTUnited int& w_error, CUDT*& w_acpu); +#if ENABLE_BONDING + SRT_ATTR_REQUIRES(m_GlobControlLock) + int checkQueuedSocketsEvents(const std::map& sockets); + void removePendingForGroup(const CUDTGroup* g); +#endif + int installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq); int installConnectHook(const SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index eca2b2069..c31d2a6fc 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -3329,7 +3329,8 @@ SRTSOCKET srt::CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint3 // This can only happen on a listener (it's only called on a site that is // HSD_RESPONDER), so it was a response for a groupwise connection. // Therefore such a group shall always be considered opened. - gp->setOpen(); + // It's also set pending and it stays this way until accepted. + gp->setOpenPending(); HLOGC(gmlog.Debug, log << CONID() << "makeMePeerOf: no group has peer=$" << peergroup << " - creating new mirror group $" @@ -11762,6 +11763,24 @@ void srt::CUDT::addEPoll(const int eid) m_sPollID.insert(eid); leaveCS(uglobal().m_EPoll.m_EPollLock); + if (m_bListening) + { + // A listener socket can only get readiness on SRT_EPOLL_ACCEPT + // (which has the same value as SRT_EPOLL_IN), or sometimes + // also SRT_EPOLL_UPDATE. All interesting fields for that purpose + // are contained in the CUDTSocket class, so redirect there. + SRT_EPOLL_T events = m_parent->getListenerEvents(); + + // Only light up the events that were returned, do nothing if none is ready, + // the "no event" state is the default. + if (events) + uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, events, true); + + // You don't check anything else here - a listener socket can be only + // used for listening and nothing else. + return; + } + if (!stillConnected()) return; diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 8cd8440c7..e6ef44897 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -500,18 +500,23 @@ int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int6 ScopedLock pg(m_EPollLock); map::iterator p = m_mPolls.find(eid); if (p == m_mPolls.end()) + { + LOGC(ealog.Error, log << "epoll_uwait: E" << eid << " doesn't exist"); throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); + } CEPollDesc& ed = p->second; if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty()) { // Empty EID is not allowed, report error. + LOGC(ealog.Error, log << "epoll_uwait: E" << eid << " is empty (use SRT_EPOLL_ENABLE_EMPTY to allow)"); throw CUDTException(MJ_NOTSUP, MN_EEMPTY); } if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0)) { - // Empty EID is not allowed, report error. + // Empty container is not allowed, report error. + LOGC(ealog.Error, log << "epoll_uwait: empty output container with E" << eid << " (use SRT_EPOLL_ENABLE_OUTPUTCHECK to allow)"); throw CUDTException(MJ_NOTSUP, MN_INVAL); } @@ -519,6 +524,7 @@ int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int6 { // XXX Add error log // uwait should not be used with EIDs subscribed to system sockets + LOGC(ealog.Error, log << "epoll_uwait: E" << eid << " is subscribed to system sckets (not allowed for uwait)"); throw CUDTException(MJ_NOTSUP, MN_INVAL); } @@ -530,11 +536,20 @@ int srt::CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int6 ++total; if (total > fdsSize) + { + HLOGC(ealog.Debug, log << "epoll_uwait: output container size=" << fdsSize << " insufficient to report all sockets"); break; + } fdsSet[pos] = *i; + IF_HEAVY_LOGGING(std::ostringstream out); + IF_HEAVY_LOGGING(out << "epoll_uwait: Notice: fd=" << i->fd << " events="); + IF_HEAVY_LOGGING(PrintEpollEvent(out, i->events, 0)); + + SRT_ATR_UNUSED const bool was_edge = ed.checkEdge(i++); // NOTE: potentially deletes `i` + IF_HEAVY_LOGGING(out << (was_edge ? "(^)" : "")); + HLOGP(ealog.Debug, out.str()); - ed.checkEdge(i++); // NOTE: potentially deletes `i` } if (total) return total; @@ -869,6 +884,12 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, const return -1; // still, ignored. } + if (uid == SRT_INVALID_SOCK || uid == 0) + { + LOGC(eilog.Fatal, log << "epoll/update: IPE: invalid 'uid' submitted for update!"); + return -1; + } + int nupdated = 0; vector lost; diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 5601cdeee..7e755da6e 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -274,6 +274,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_RcvBaseSeqNo(SRT_SEQNO_NONE) , m_bOpened(false) , m_bConnected(false) + , m_bPending(false) , m_bClosing(false) , m_iLastSchedSeqNo(SRT_SEQNO_NONE) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) @@ -4079,6 +4080,7 @@ void CUDTGroup::setGroupConnected() { if (!m_bConnected) { + HLOGC(cnlog.Debug, log << "GROUP: First socket connected, SETTING GROUP CONNECTED"); // Switch to connected state and give appropriate signal m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_CONNECT, true); m_bConnected = true; @@ -4151,6 +4153,16 @@ void CUDTGroup::updateLatestRcv(CUDTSocket* s) } } +void CUDTGroup::getMemberSockets(std::list& w_ids) const +{ + ScopedLock gl (m_GroupLock); + + for (cgli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) + { + w_ids.push_back(gi->id); + } +} + void CUDTGroup::activateUpdateEvent(bool still_have_items) { // This function actually reacts on the fact that a socket diff --git a/srtcore/group.h b/srtcore/group.h index 56f1456f8..e73856df1 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -101,6 +101,7 @@ class CUDTGroup typedef std::list group_t; typedef group_t::iterator gli_t; + typedef group_t::const_iterator cgli_t; typedef std::vector< std::pair > sendable_t; struct Sendstate @@ -203,6 +204,11 @@ class CUDTGroup return m_Group.empty(); } + bool groupPending() + { + return m_bPending; + } + void setGroupConnected(); int send(const char* buf, int len, SRT_MSGCTRL& w_mc); @@ -399,7 +405,7 @@ class CUDTGroup void getGroupCount(size_t& w_size, bool& w_still_alive); srt::CUDTUnited& m_Global; - srt::sync::Mutex m_GroupLock; + mutable srt::sync::Mutex m_GroupLock; SRTSOCKET m_GroupID; SRTSOCKET m_PeerGroupID; @@ -428,6 +434,8 @@ class CUDTGroup gli_t begin() { return m_List.begin(); } gli_t end() { return m_List.end(); } + cgli_t begin() const { return m_List.begin(); } + cgli_t end() const { return m_List.end(); } bool empty() { return m_List.empty(); } void push_back(const SocketData& data) { m_List.push_back(data); ++m_SizeCache; } void clear() @@ -661,8 +669,21 @@ class CUDTGroup // from the first delivering socket will be taken as a good deal. sync::atomic m_RcvBaseSeqNo; - bool m_bOpened; // Set to true when at least one link is at least pending - bool m_bConnected; // Set to true on first link confirmed connected + /// True: at least one socket has joined the group in at least pending state + bool m_bOpened; + + /// True: at least one socket is connected, even if pending from the listener + bool m_bConnected; + + /// True: this group was created on the listner side for the first socket + /// that is pending connection, so the group is about to be reported for the + /// srt_accept() call, but the application hasn't retrieved the group yet. + /// Not in use in case of caller-side groups. + // NOTE: using atomic in otder to allow this variable to be changed independently + // on any mutex locks. + sync::atomic m_bPending; + + /// True: the group was requested to close and it should not allow any operations. bool m_bClosing; // There's no simple way of transforming config @@ -736,7 +757,14 @@ class CUDTGroup // Required after the call on newGroup on the listener side. // On the listener side the group is lazily created just before // accepting a new socket and therefore always open. - void setOpen() { m_bOpened = true; } + // However, after creation it will be still waiting for being + // extracted by the application in `srt_accept`, and until then + // it stays as pending. + void setOpenPending() + { + m_bOpened = true; + m_bPending = true; + } std::string CONID() const { @@ -796,6 +824,8 @@ class CUDTGroup void updateLatestRcv(srt::CUDTSocket*); + void getMemberSockets(std::list&) const; + // Property accessors SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, id, m_GroupID); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRTSOCKET, peerid, m_PeerGroupID); diff --git a/test/test_bonding.cpp b/test/test_bonding.cpp index 0e48c8a04..1d38e3a41 100644 --- a/test/test_bonding.cpp +++ b/test/test_bonding.cpp @@ -6,8 +6,11 @@ #include "gtest/gtest.h" #include "test_env.h" +#include "apputil.hpp" // Note: declares CreateAddr, but not srt::CreateAddr #include "srt.h" +#include "logging_api.h" +#include "common.h" #include "netinet_any.h" TEST(Bonding, SRTConnectGroup) @@ -376,7 +379,7 @@ TEST(Bonding, Options) #endif int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 1), SRT_ERROR); started = true; @@ -413,7 +416,7 @@ TEST(Bonding, Options) } // Now the thread is accepting, so we call the connect. - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); SRTSOCKET member = srt_connect(grp, sa.get(), sa.size()); // We've released the mutex and signaled the CV, so accept should proceed now. @@ -507,7 +510,7 @@ TEST(Bonding, InitialFailure) int allow = 1; ASSERT_NE(srt_setsockflag(lsn, SRTO_GROUPCONNECT, &allow, sizeof allow), SRT_ERROR); - sockaddr_any sa = CreateAddr("127.0.0.1", 5555, AF_INET); + sockaddr_any sa = srt::CreateAddr("127.0.0.1", 5555, AF_INET); ASSERT_NE(srt_bind(lsn, sa.get(), sa.size()), SRT_ERROR); ASSERT_NE(srt_listen(lsn, 5), SRT_ERROR); @@ -564,3 +567,122 @@ TEST(Bonding, InitialFailure) srt_close(lsn); } +void SetLongSilenceTolerant(const SRTSOCKET s) +{ + int longtime = 100000; + + srt_setsockflag(s, SRTO_CONNTIMEO, &longtime, sizeof longtime); + srt_setsockflag(s, SRTO_PEERIDLETIMEO, &longtime, sizeof longtime); +} + +TEST(Bonding, DeadLinkUpdate) +{ + using namespace std; + using namespace std::chrono; + + srt::TestInit srtinit; + + SRTSOCKET listener = srt_create_socket(); + const SRTSOCKET group = srt_create_group(SRT_GTYPE_BACKUP); + + SetLongSilenceTolerant(listener); + SetLongSilenceTolerant(group); + + srt::sockaddr_any sa(AF_INET); + + inet_pton(AF_INET, "127.0.0.1", sa.get_addr()); + + sa.hport(5555); + + srt_bind(listener, sa.get(), sa.size()); + srt::setopt(listener)[SRTO_GROUPCONNECT] = 1; + srt_listen(listener, 1); + char srcbuf [] = "1234ABCD"; + + thread td = thread([&]() { + cout << "[T] Connecting 1...\n"; + const SRTSOCKET member1 = srt_connect(group, sa.get(), sa.size()); + EXPECT_NE(member1, SRT_INVALID_SOCK); + // Now wait 3s + cout << "[T] Link 1 established. Wait 3s...\n"; + this_thread::sleep_for(seconds(3)); + + cout << "[T] Connecting 2...\n"; + // Make a second connection + const SRTSOCKET member2 = srt_connect(group, sa.get(), sa.size()); + EXPECT_NE(member2, SRT_INVALID_SOCK); + + if (member2 == SRT_INVALID_SOCK || member1 == SRT_INVALID_SOCK) + { + srt_close(member1); + srt_close(member2); + cout << "[T] Test already failed, exitting\n"; + return; + } + + cout << "[T] Link 2 established. Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing link 1...\n"; + // Now close the first connection + srt_close(member1); + + // Now send the data and see if they are received + cout << "[T] Sending: size=" << (sizeof srcbuf) << " Content: '" << srcbuf << "'...\n"; + int nsent = srt_send(group, srcbuf, sizeof srcbuf); + EXPECT_NE(nsent, -1) << "srt_send:" << srt_getlasterror_str(); + + cout << "[T] Wait 3s...\n"; + // Again wait 3s + this_thread::sleep_for(seconds(3)); + + cout << "[T] Killing the group and exitting.\n"; + // And close + srt_close(group); + cout << "[T] exit\n"; + }); + + cout << "Accepting (10s timeout)...\n"; + // Using srt_accept_bond to apply accept timeout + SRTSOCKET lsnra [] = { listener }; + const SRTSOCKET acp = srt_accept_bond(lsnra, 1, 10*1000); + + EXPECT_NE(acp, -1) << "srt_accept:" << srt_getlasterror_str(); + EXPECT_EQ(acp & SRTGROUP_MASK, SRTGROUP_MASK); + + // Close and set up the listener again. + srt_close(listener); + if (acp != SRT_ERROR) + { + listener = srt_create_socket(); + srt_bind(listener, sa.get(), sa.size()); + srt::setopt(listener)[SRTO_GROUPCONNECT] = 1; + srt_listen(listener, 1); + + cout << "Group accepted. Receiving...\n"; + char buf[1316] = ""; + const int nrecv = srt_recv(acp, buf, 1316); + int syserr, err; + err = srt_getlasterror(&syserr); + EXPECT_NE(nrecv, -1) << "srt_recv:" << srt_getlasterror_str(); + + cout << "Received: val=" << nrecv << " Content: '" << buf << "'\n"; + if (nrecv == -1) + { + cout << "ERROR: " << srt_strerror(err, syserr) << endl; + cout << "STATUS: " << srt_logging::SockStatusStr(srt_getsockstate(acp)) << endl; + } + else + { + EXPECT_EQ(strcmp(srcbuf, buf), 0); + } + + cout << "Closing.\n"; + srt_close(acp); + srt_close(listener); + } + + td.join(); +} + diff --git a/test/test_epoll.cpp b/test/test_epoll.cpp index a8a88aa7a..335a0b3ef 100644 --- a/test/test_epoll.cpp +++ b/test/test_epoll.cpp @@ -7,7 +7,7 @@ #include "test_env.h" #include "api.h" #include "epoll.h" - +#include "apputil.hpp" using namespace std; using namespace srt; @@ -544,6 +544,409 @@ TEST(CEPoll, ThreadedUpdate) } } +void testListenerReady(const bool LATE_CALL, size_t nmembers) +{ + bool is_single = true; + bool want_sleep = !TestEnv::me->OptionPresent("nosleep"); + + sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons(5555); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + TestInit init; + + SRTSOCKET server_sock, caller_sock; + server_sock = srt_create_socket(); + + if (nmembers > 0) + { + caller_sock = srt_create_group(SRT_GTYPE_BROADCAST); + int on = 1; + EXPECT_NE(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + is_single = false; + } + else + { + caller_sock = srt_create_socket(); + nmembers = 1; // Set to 1 so that caller starts at least once. + } + + srt_bind(server_sock, (sockaddr*)& sa, sizeof(sa)); + srt_listen(server_sock, nmembers+1); + + srt::setopt(server_sock)[SRTO_RCVSYN] = false; + + // Ok, the listener socket is ready; now make a call, but + // do not do anything on the listener socket yet. + + std::cout << "Using " << (LATE_CALL ? "LATE" : "EARLY") << " call\n"; + + std::vector> connect_res; + + if (LATE_CALL) + { + // We don't need the caller to be async, it can hang up here. + for (size_t i = 0; i < nmembers; ++i) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa, i]() { + std::cout << "[T:" << i << "] CALLING\n"; + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + } + + std::cout << "STARTED connecting...\n"; + } + + if (want_sleep) + { + std::cout << "Sleeping 1s...\n"; + this_thread::sleep_for(chrono::milliseconds(1000)); + } + + // What is important is that the accepted socket is now reporting in + // on the listener socket. So let's create an epoll. + + int eid = srt_epoll_create(); + int eid_postcheck = srt_epoll_create(); + + // and add this listener to it + int modes = SRT_EPOLL_IN; + int modes_postcheck = SRT_EPOLL_IN | SRT_EPOLL_UPDATE; + EXPECT_NE(srt_epoll_add_usock(eid, server_sock, &modes), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid_postcheck, server_sock, &modes_postcheck), SRT_ERROR); + + if (!LATE_CALL) + { + // We don't need the caller to be async, it can hang up here. + for (size_t i = 0; i < nmembers; ++i) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa, i]() { + std::cout << "[T:" << i << "] CALLING\n"; + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + } + + std::cout << "STARTED connecting...\n"; + } + + std::cout << "Waiting for readiness...\n"; + // And see now if the waiting accepted socket reports it. + SRT_EPOLL_EVENT fdset[1]; + EXPECT_EQ(srt_epoll_uwait(eid, fdset, 1, 5000), 1); + + std::cout << "Accepting...\n"; + sockaddr_in scl; + int sclen = sizeof scl; + SRTSOCKET sock = srt_accept(server_sock, (sockaddr*)& scl, &sclen); + EXPECT_NE(sock, SRT_INVALID_SOCK); + + if (nmembers > 1) + { + std::cout << "With >1 members, check if there's still UPDATE pending\n"; + // Spawn yet another connection within the group, just to get the update + auto extra_call = std::async(std::launch::async, [&caller_sock, &sa]() { + std::cout << "[T:X] CALLING (expected failure)\n"; + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); + // For 2+ members, additionally check if there AREN'T any + // further acceptance members, but there are UPDATEs. + EXPECT_EQ(srt_epoll_uwait(eid_postcheck, fdset, 1, 5000), 1); + + // SUBSCRIBED EVENTS: IN, UPDATE. + // expected: UPDATE only. + EXPECT_EQ(SRT_EPOLL_OPT(fdset[0].events), SRT_EPOLL_UPDATE); + SRTSOCKET joined = extra_call.get(); + EXPECT_NE(joined, SRT_INVALID_SOCK); + std::cout << Sprint("Extra joined: @", joined, "\n"); + } + + std::vector gdata; + + if (!is_single) + { + EXPECT_EQ(sock & SRTGROUP_MASK, SRTGROUP_MASK); + // +1 because we have added one more caller to check UPDATE event. + size_t inoutlen = nmembers+1; + gdata.resize(inoutlen); + int groupndata = srt_group_data(sock, gdata.data(), (&inoutlen)); + EXPECT_NE(groupndata, SRT_ERROR); + + std::ostringstream sout; + if (groupndata == SRT_ERROR) + sout << "ERROR: " << srt_getlasterror_str() << " OUTLEN: " << inoutlen << std::endl; + else + { + // Just to display the members + sout << "(Listener) Members: "; + + for (int i = 0; i < groupndata; ++i) + sout << "@" << gdata[i].id << " "; + sout << std::endl; + } + + std::cout << sout.str(); + } + + std::cout << "Joining connector thread(s)\n"; + for (size_t i = 0; i < nmembers; ++i) + { + std::cout << "Join: #" << i << ":\n"; + SRTSOCKET called_socket = connect_res[i].get(); + std::cout << "... " << called_socket << std::endl; + EXPECT_NE(called_socket, SRT_INVALID_SOCK); + } + + if (!is_single) + { + EXPECT_EQ(caller_sock & SRTGROUP_MASK, SRTGROUP_MASK); + // +1 because we have added one more caller to check UPDATE event. + size_t inoutlen = nmembers+1; + gdata.resize(inoutlen); + int groupndata = srt_group_data(caller_sock, gdata.data(), (&inoutlen)); + EXPECT_NE(groupndata, SRT_ERROR); + + std::ostringstream sout; + if (groupndata == SRT_ERROR) + sout << "ERROR: " << srt_getlasterror_str() << " OUTLEN: " << inoutlen << std::endl; + else + { + // Just to display the members + sout << "(Caller) Members: "; + + for (int i = 0; i < groupndata; ++i) + sout << "@" << gdata[i].id << " "; + sout << std::endl; + } + + std::cout << sout.str(); + + if (want_sleep) + { + std::cout << "Sleep for 3 seconds to avoid closing-in-between\n"; + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + } + + std::cout << "Releasing EID resources and all sockets\n"; + + srt_epoll_release(eid); + srt_epoll_release(eid_postcheck); + + srt_close(server_sock); + srt_close(caller_sock); + srt_close(sock); +} + +TEST(CEPoll, EarlyListenerReady) +{ + testListenerReady(false, 0); +} + +TEST(CEPoll, LateListenerReady) +{ + testListenerReady(true, 0); +} + +#if ENABLE_BONDING + +TEST(CEPoll, EarlyGroupListenerReady_1) +{ + testListenerReady(false, 1); +} + +TEST(CEPoll, LateGroupListenerReady_1) +{ + testListenerReady(true, 1); +} + +TEST(CEPoll, EarlyGroupListenerReady_3) +{ + testListenerReady(false, 3); +} + +TEST(CEPoll, LateGroupListenerReady_3) +{ + testListenerReady(true, 3); +} + + +void testMultipleListenerReady(const bool LATE_CALL) +{ + sockaddr_in sa; + memset(&sa, 0, sizeof sa); + sa.sin_family = AF_INET; + sa.sin_port = htons(5555); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa.sin_addr), 1); + + sockaddr_in sa2; + memset(&sa2, 0, sizeof sa2); + sa2.sin_family = AF_INET; + sa2.sin_port = htons(5556); + ASSERT_EQ(inet_pton(AF_INET, "127.0.0.1", &sa2.sin_addr), 1); + + TestInit init; + + SRTSOCKET server_sock, server_sock2, caller_sock; + server_sock = srt_create_socket(); + server_sock2 = srt_create_socket(); + + caller_sock = srt_create_group(SRT_GTYPE_BROADCAST); + int on = 1; + EXPECT_NE(srt_setsockflag(server_sock, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + EXPECT_NE(srt_setsockflag(server_sock2, SRTO_GROUPCONNECT, &on, sizeof on), SRT_ERROR); + + srt_bind(server_sock, (sockaddr*)& sa, sizeof(sa)); + srt_listen(server_sock, 3); + srt::setopt(server_sock)[SRTO_RCVSYN] = false; + + srt_bind(server_sock2, (sockaddr*)& sa2, sizeof(sa2)); + srt_listen(server_sock2, 3); + srt::setopt(server_sock2)[SRTO_RCVSYN] = false; + + // Ok, the listener socket is ready; now make a call, but + // do not do anything on the listener socket yet. + + std::cout << "Using " << (LATE_CALL ? "LATE" : "EARLY") << " call\n"; + + std::vector> connect_res; + + if (LATE_CALL) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa2]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa2)); + })); + + + std::cout << "STARTED connecting...\n"; + } + + std::cout << "Sleeping 1s...\n"; + this_thread::sleep_for(chrono::milliseconds(1000)); + + // What is important is that the accepted socket is now reporting in + // on the listener socket. So let's create an epoll. + + int eid = srt_epoll_create(); + int eid_postcheck = srt_epoll_create(); + + // and add this listener to it + int modes = SRT_EPOLL_IN; + int modes_postcheck = SRT_EPOLL_IN | SRT_EPOLL_UPDATE; + EXPECT_NE(srt_epoll_add_usock(eid, server_sock, &modes), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid, server_sock2, &modes), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid_postcheck, server_sock, &modes_postcheck), SRT_ERROR); + EXPECT_NE(srt_epoll_add_usock(eid_postcheck, server_sock2, &modes_postcheck), SRT_ERROR); + + if (!LATE_CALL) + { + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + })); + + connect_res.push_back(std::async(std::launch::async, [&caller_sock, &sa2]() { + this_thread::sleep_for(chrono::milliseconds(1)); + return srt_connect(caller_sock, (sockaddr*)& sa2, sizeof(sa2)); + })); + + std::cout << "STARTED connecting...\n"; + } + + // Sleep to make sure that the connection process has started. + this_thread::sleep_for(chrono::milliseconds(100)); + + std::cout << "Waiting for readiness on @" << server_sock << " and @" << server_sock2 << "\n"; + // And see now if the waiting accepted socket reports it. + + // This time we should expect that the connection reports in + // on two listener sockets + SRT_EPOLL_EVENT fdset[2] = {}; + std::ostringstream out; + + int nready = srt_epoll_uwait(eid, fdset, 2, 5000); + EXPECT_EQ(nready, 2); + out << "Ready socks:"; + for (int i = 0; i < nready; ++i) + { + out << " @" << fdset[i].fd; + PrintEpollEvent(out, fdset[i].events); + } + out << std::endl; + std::cout << out.str(); + + std::cout << "Accepting...\n"; + sockaddr_in scl; + int sclen = sizeof scl; + + // We choose the SECOND one to extract the group connection. + SRTSOCKET sock = srt_accept(server_sock2, (sockaddr*)& scl, &sclen); + EXPECT_NE(sock, SRT_INVALID_SOCK); + + // Make sure this time that the accepted connection is a group. + EXPECT_EQ(sock & SRTGROUP_MASK, SRTGROUP_MASK); + + std::cout << "Check if there's still UPDATE pending\n"; + // Spawn yet another connection within the group, just to get the update + auto extra_call = std::async(std::launch::async, [&caller_sock, &sa]() { + return srt_connect(caller_sock, (sockaddr*)& sa, sizeof(sa)); + }); + // For 2+ members, additionally check if there AREN'T any + // further acceptance members, but there are UPDATEs. + // Note that if this was done AFTER accepting, the UPDATE would + // be only set one one socket. + nready = srt_epoll_uwait(eid_postcheck, fdset, 1, 5000); + EXPECT_EQ(nready, 1); + + std::cout << "Ready socks:"; + for (int i = 0; i < nready; ++i) + { + std::cout << " @" << fdset[i].fd; + PrintEpollEvent(std::cout, fdset[i].events); + } + std::cout << std::endl; + + // SUBSCRIBED EVENTS: IN, UPDATE. + // expected: UPDATE only. + EXPECT_EQ(SRT_EPOLL_OPT(fdset[0].events), SRT_EPOLL_UPDATE); + EXPECT_NE(extra_call.get(), SRT_INVALID_SOCK); + + std::cout << "Joining connector thread(s)\n"; + for (size_t i = 0; i < connect_res.size(); ++i) + { + EXPECT_NE(connect_res[i].get(), SRT_INVALID_SOCK); + } + + srt_epoll_release(eid); + srt_epoll_release(eid_postcheck); + + srt_close(server_sock); + srt_close(server_sock2); + srt_close(caller_sock); + srt_close(sock); +} + +TEST(CEPoll, EarlyGroupMultiListenerReady) +{ + testMultipleListenerReady(false); +} + +TEST(CEPoll, LateGroupMultiListenerReady) +{ + testMultipleListenerReady(true); +} + + + +#endif + class TestEPoll: public srt::Test {