Skip to content

Commit

Permalink
[Windows] Use fd_set according to Winsock2 specifics
Browse files Browse the repository at this point in the history
Fixes swiftlang/swift#73532.

On Windows, socket handles in a `fd_set` are not represented as
bit flags as in Berkeley sockets. While we have no `fd_set` dynamic
growth in this implementation, the `FD_SETSIZE` defined as 1024
in `CoreFoundation_Prefix.h` should be enough for majority of tasks.
  • Loading branch information
lxbndr committed May 13, 2024
1 parent 0607287 commit d3871a5
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 6 deletions.
80 changes: 74 additions & 6 deletions CoreFoundation/RunLoop.subproj/CFSocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,30 @@ CF_INLINE int __CFSocketLastError(void) {
}

CF_INLINE CFIndex __CFSocketFdGetSize(CFDataRef fdSet) {
#if TARGET_OS_WIN32
if (CFDataGetLength(fdSet) == 0) {
return 0;
}
return FD_SETSIZE;
#else
return NBBY * CFDataGetLength(fdSet);
#endif
}

CF_INLINE Boolean __CFSocketFdSet(CFSocketNativeHandle sock, CFMutableDataRef fdSet) {
/* returns true if a change occurred, false otherwise */
Boolean retval = false;
if (INVALID_SOCKET != sock && 0 <= sock) {
fd_set *fds;
#if TARGET_OS_WIN32
if (CFDataGetLength(fdSet) == 0) {
CFDataIncreaseLength(fdSet, sizeof(fd_set));
fds = (fd_set *)CFDataGetMutableBytePtr(fdSet);
FD_ZERO(fds);
} else {
fds = (fd_set *)CFDataGetMutableBytePtr(fdSet);
}
#else
CFIndex numFds = NBBY * CFDataGetLength(fdSet);
fd_mask *fds_bits;
if (sock >= numFds) {
Expand All @@ -230,9 +247,11 @@ CF_INLINE Boolean __CFSocketFdSet(CFSocketNativeHandle sock, CFMutableDataRef fd
} else {
fds_bits = (fd_mask *)CFDataGetMutableBytePtr(fdSet);
}
if (!FD_ISSET(sock, (fd_set *)fds_bits)) {
fds = (fd_set *)fds_bits;
#endif
if (!FD_ISSET(sock, fds)) {
retval = true;
FD_SET(sock, (fd_set *)fds_bits);
FD_SET(sock, fds);
}
}
return retval;
Expand Down Expand Up @@ -416,6 +435,15 @@ CF_INLINE Boolean __CFSocketFdClr(CFSocketNativeHandle sock, CFMutableDataRef fd
/* returns true if a change occurred, false otherwise */
Boolean retval = false;
if (INVALID_SOCKET != sock && 0 <= sock) {
#if TARGET_OS_WIN32
if (CFDataGetLength(fdSet) > 0) {
fd_set *fds = (fd_set *)CFDataGetMutableBytePtr(fdSet);
if (FD_ISSET(sock, fds)) {
retval = true;
FD_CLR(sock, fds);
}
}
#else
CFIndex numFds = NBBY * CFDataGetLength(fdSet);
fd_mask *fds_bits;
if (sock < numFds) {
Expand All @@ -425,6 +453,7 @@ CF_INLINE Boolean __CFSocketFdClr(CFSocketNativeHandle sock, CFMutableDataRef fd
FD_CLR(sock, (fd_set *)fds_bits);
}
}
#endif
}
return retval;
}
Expand Down Expand Up @@ -1188,6 +1217,27 @@ static void
clearInvalidFileDescriptors(CFMutableDataRef d)
{
if (d) {
#if TARGET_OS_WIN32
if (CFDataGetLength(d) == 0) {
return;
}

fd_set *fds = (fd_set *)CFDataGetMutableBytePtr(d);
fd_set invalidFds;
FD_ZERO(&invalidFds);
// Gather all invalid sockets into invalidFds set
for (u_int idx = 0; idx < fds->fd_count; idx++) {
SOCKET socket = fds->fd_array[idx];
if (! __CFNativeSocketIsValid(socket)) {
FD_SET(socket, &invalidFds);
}
}
// Remove invalid sockets from source set
for (u_int idx = 0; idx < invalidFds.fd_count; idx++) {
SOCKET socket = invalidFds.fd_array[idx];
FD_CLR(socket, fds);
}
#else
SInt32 count = __CFSocketFdGetSize(d);
fd_set* s = (fd_set*) CFDataGetMutableBytePtr(d);
for (SInt32 idx = 0; idx < count; idx++) {
Expand All @@ -1196,14 +1246,13 @@ clearInvalidFileDescriptors(CFMutableDataRef d)
FD_CLR(idx, s);
}
}
#endif
}
}

static void
manageSelectError()
manageSelectError(SInt32 selectError)
{
SInt32 selectError = __CFSocketLastError();

__CFSOCKETLOG("socket manager received error %ld from select", (long)selectError);

if (EBADF == selectError) {
Expand Down Expand Up @@ -1263,8 +1312,15 @@ static void *__CFSocketManager(void * arg)
SInt32 nrfds, maxnrfds, fdentries = 1;
SInt32 rfds, wfds;
fd_set *exceptfds = NULL;
#if TARGET_OS_WIN32
fd_set *writefds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, sizeof(fd_set), 0);
fd_set *readfds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, sizeof(fd_set), 0);
FD_ZERO(writefds);
FD_ZERO(readfds);
#else
fd_set *writefds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, fdentries * sizeof(fd_mask), 0);
fd_set *readfds = (fd_set *)CFAllocatorAllocate(kCFAllocatorSystemDefault, fdentries * sizeof(fd_mask), 0);
#endif
fd_set *tempfds;
SInt32 idx, cnt;
uint8_t buffer[256];
Expand All @@ -1290,6 +1346,11 @@ static void *__CFSocketManager(void * arg)
free(readBuffer);
free(writeBuffer);
#endif

#if TARGET_OS_WIN32
// This parameter is ignored by `select` from Winsock2 API
maxnrfds = INT_MAX;
#else
rfds = __CFSocketFdGetSize(__CFReadSocketsFds);
wfds = __CFSocketFdGetSize(__CFWriteSocketsFds);
maxnrfds = __CFMax(rfds, wfds);
Expand All @@ -1300,6 +1361,7 @@ static void *__CFSocketManager(void * arg)
}
memset(writefds, 0, fdentries * sizeof(fd_mask));
memset(readfds, 0, fdentries * sizeof(fd_mask));
#endif
CFDataGetBytes(__CFWriteSocketsFds, CFRangeMake(0, CFDataGetLength(__CFWriteSocketsFds)), (UInt8 *)writefds);
CFDataGetBytes(__CFReadSocketsFds, CFRangeMake(0, CFDataGetLength(__CFReadSocketsFds)), (UInt8 *)readfds);

Expand Down Expand Up @@ -1345,7 +1407,13 @@ static void *__CFSocketManager(void * arg)
}
#endif

SInt32 error = 0;
nrfds = select(maxnrfds, readfds, writefds, exceptfds, pTimeout);
if (nrfds < 0) {
// Store error as early as possible, as the code below could
// reset it and make late check unreliable.
error = __CFSocketLastError();
}

#if defined(LOG_CFSOCKET) && defined(DEBUG_POLLING_SELECT)
__CFSOCKETLOG("socket manager woke from select, ret=%ld", (long)nrfds);
Expand Down Expand Up @@ -1434,7 +1502,7 @@ static void *__CFSocketManager(void * arg)
}

if (0 > nrfds) {
manageSelectError();
manageSelectError(error);
continue;
}
if (FD_ISSET(__CFWakeupSocketPair[1], readfds)) {
Expand Down
44 changes: 44 additions & 0 deletions Tests/Foundation/Tests/TestSocketPort.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,55 @@ class TestSocketPort : XCTestCase {
}
}

func testSendingMultipleMessagesRemoteToLocal() throws {
var localPorts = [SocketPort]()
var remotePorts = [SocketPort]()
var delegates = [TestPortDelegateWithBlock]()

let data = Data("I cannot weave".utf8)

for _ in 0..<128 {
let local = try XCTUnwrap(SocketPort(tcpPort: 0))
let tcpPort = try UInt16(XCTUnwrap(tcpOrUdpPort(of: local)))
let remote = try XCTUnwrap(SocketPort(remoteWithTCPPort: tcpPort, host: "localhost"))

let received = expectation(description: "Message received")

let localDelegate = TestPortDelegateWithBlock { message in
XCTAssertEqual(message.components as? [AnyHashable], [data as NSData])
received.fulfill()
}

localPorts.append(local)
remotePorts.append(remote)
delegates.append(localDelegate)

local.setDelegate(localDelegate)
local.schedule(in: .main, forMode: .default)
remote.schedule(in: .main, forMode: .default)
}

withExtendedLifetime(delegates) {
for remote in remotePorts {
let sent = remote.send(before: Date(timeIntervalSinceNow: 5), components: NSMutableArray(array: [data]), from: nil, reserved: 0)
XCTAssertTrue(sent)
}
waitForExpectations(timeout: 5.0)
}

for port in localPorts + remotePorts {
port.setDelegate(nil)
port.remove(from: .main, forMode: .default)
port.invalidate()
}
}

static var allTests: [(String, (TestSocketPort) -> () throws -> Void)] {
return [
("testRemoteSocketPortsAreUniqued", testRemoteSocketPortsAreUniqued),
("testInitPicksATCPPort", testInitPicksATCPPort),
("testSendingOneMessageRemoteToLocal", testSendingOneMessageRemoteToLocal),
("testSendingMultipleMessagesRemoteToLocal", testSendingMultipleMessagesRemoteToLocal),
]
}
}

0 comments on commit d3871a5

Please sign in to comment.