Skip to content

Commit

Permalink
multi: multi_wait improvements
Browse files Browse the repository at this point in the history
 - only call `multi_getsock()` once for all transfers
 - realloc pollset array on demand
 - fold repeated sockets

Closes curl#13150
  • Loading branch information
icing authored and bagder committed Apr 25, 2024
1 parent 303bb87 commit 2d2c27e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 60 deletions.
129 changes: 73 additions & 56 deletions lib/multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,29 @@ static void reset_socket_fdwrite(curl_socket_t s)
}
#endif

static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len,
unsigned int inc, bool *is_malloced)
{
struct pollfd *new_fds, *old_fds = *pfds;
unsigned int new_len = *pfds_len + inc;

new_fds = calloc(new_len, sizeof(struct pollfd));
if(!new_fds) {
if(*is_malloced)
free(old_fds);
*pfds = NULL;
*pfds_len = 0;
return CURLM_OUT_OF_MEMORY;
}
memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd));
if(*is_malloced)
free(old_fds);
*pfds = new_fds;
*pfds_len = new_len;
*is_malloced = TRUE;
return CURLM_OK;
}

#define NUM_POLLS_ON_STACK 10

static CURLMcode multi_wait(struct Curl_multi *multi,
Expand All @@ -1302,12 +1325,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct Curl_easy *data;
struct easy_pollset ps;
size_t i;
unsigned int nfds = 0;
unsigned int curlfds;
long timeout_internal;
int retcode = 0;
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct pollfd *ufds = &a_few_on_stack[0];
unsigned int ufds_len = NUM_POLLS_ON_STACK;
unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */
bool ufds_malloc = FALSE;
#ifdef USE_WINSOCK
WSANETWORKEVENTS wsa_events;
Expand All @@ -1326,84 +1349,68 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
if(timeout_ms < 0)
return CURLM_BAD_FUNCTION_ARGUMENT;

/* Count up how many fds we have from the multi handle */
memset(&ps, 0, sizeof(ps));
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
nfds += ps.num;
}

/* If the internally desired timeout is actually shorter than requested from
the outside, then use the shorter time! But only if the internal timer
is actually larger than -1! */
(void)multi_timeout(multi, &timeout_internal);
if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms))
timeout_ms = (int)timeout_internal;

curlfds = nfds; /* number of internal file descriptors */
nfds += extra_nfds; /* add the externally provided ones */

#ifdef ENABLE_WAKEUP
#ifdef USE_WINSOCK
if(use_wakeup) {
#else
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
#endif
++nfds;
}
#endif

if(nfds > NUM_POLLS_ON_STACK) {
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
big, so at 2^29 sockets this value might wrap. When a process gets
the capability to actually handle over 500 million sockets this
calculation needs an integer overflow check. */
ufds = malloc(nfds * sizeof(struct pollfd));
if(!ufds)
return CURLM_OUT_OF_MEMORY;
ufds_malloc = TRUE;
}
nfds = 0;
memset(ufds, 0, ufds_len * sizeof(struct pollfd));
memset(&ps, 0, sizeof(ps));

/* only do the second loop if we found descriptors in the first stage run
above */

if(curlfds) {
/* Add the curl handles to our pollfds first */
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
/* Add the curl handles to our pollfds first */
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);

for(i = 0; i < ps.num; i++) {
struct pollfd *ufd = &ufds[nfds++];
for(i = 0; i < ps.num; i++) {
short events = 0;
#ifdef USE_WINSOCK
long mask = 0;
long mask = 0;
#endif
ufd->fd = ps.sockets[i];
ufd->events = 0;
if(ps.actions[i] & CURL_POLL_IN) {
if(ps.actions[i] & CURL_POLL_IN) {
#ifdef USE_WINSOCK
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
#endif
ufd->events |= POLLIN;
}
if(ps.actions[i] & CURL_POLL_OUT) {
events |= POLLIN;
}
if(ps.actions[i] & CURL_POLL_OUT) {
#ifdef USE_WINSOCK
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(ps.sockets[i]);
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(ps.sockets[i]);
#endif
ufd->events |= POLLOUT;
events |= POLLOUT;
}
if(events) {
if(nfds && ps.sockets[i] == ufds[nfds-1].fd) {
ufds[nfds-1].events |= events;
}
else {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = ps.sockets[i];
ufds[nfds].events = events;
++nfds;
}
}
#ifdef USE_WINSOCK
if(mask) {
if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) {
if(ufds_malloc)
free(ufds);
return CURLM_INTERNAL_ERROR;
}
#endif
}
#endif
}
}

curl_nfds = nfds; /* what curl internally used in ufds */

/* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) {
#ifdef USE_WINSOCK
Expand All @@ -1422,6 +1429,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
return CURLM_INTERNAL_ERROR;
}
#endif
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = extra_fds[i].fd;
ufds[nfds].events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN)
Expand All @@ -1436,6 +1448,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef ENABLE_WAKEUP
#ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = multi->wakeup_pair[0];
ufds[nfds].events = POLLIN;
++nfds;
Expand Down Expand Up @@ -1475,7 +1492,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned r = ufds[curlfds + i].revents;
unsigned r = ufds[curl_nfds + i].revents;
unsigned short mask = 0;
#ifdef USE_WINSOCK
curl_socket_t s = extra_fds[i].fd;
Expand Down Expand Up @@ -1508,7 +1525,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef USE_WINSOCK
/* Count up all our own sockets that had activity,
and remove them from the event. */
if(curlfds) {
if(curl_nfds) {

for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
Expand All @@ -1529,7 +1546,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#else
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(ufds[curlfds + extra_nfds].revents & POLLIN) {
if(ufds[curl_nfds + extra_nfds].revents & POLLIN) {
char buf[64];
ssize_t nread;
while(1) {
Expand Down
17 changes: 13 additions & 4 deletions tests/http/scorecard.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ def __init__(self, env: Env,
nghttpx: Optional[Nghttpx],
caddy: Optional[Caddy],
verbose: int,
curl_verbose: int):
curl_verbose: int,
download_parallel: int = 0):
self.verbose = verbose
self.env = env
self.httpd = httpd
self.nghttpx = nghttpx
self.caddy = caddy
self._silent_curl = not curl_verbose
self._download_parallel = download_parallel

def info(self, msg):
if self.verbose > 0:
Expand Down Expand Up @@ -138,6 +140,7 @@ def transfer_single(self, url: str, proto: str, count: int):
return {
'count': count,
'samples': sample_size,
'max-parallel': 1,
'speed': mean(samples) if len(samples) else -1,
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
Expand All @@ -164,6 +167,7 @@ def transfer_serial(self, url: str, proto: str, count: int):
return {
'count': count,
'samples': sample_size,
'max-parallel': 1,
'speed': mean(samples) if len(samples) else -1,
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
Expand All @@ -174,6 +178,7 @@ def transfer_parallel(self, url: str, proto: str, count: int):
samples = []
errors = []
profiles = []
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]'
self.info(f'parallel...')
for i in range(sample_size):
Expand All @@ -182,7 +187,7 @@ def transfer_parallel(self, url: str, proto: str, count: int):
with_headers=False,
with_profile=True,
extra_args=['--parallel',
'--parallel-max', str(count)])
'--parallel-max', str(max_parallel)])
err = self._check_downloads(r, count)
if err:
errors.append(err)
Expand All @@ -193,6 +198,7 @@ def transfer_parallel(self, url: str, proto: str, count: int):
return {
'count': count,
'samples': sample_size,
'max-parallel': max_parallel,
'speed': mean(samples) if len(samples) else -1,
'errors': errors,
'stats': RunProfile.AverageStats(profiles),
Expand Down Expand Up @@ -436,7 +442,7 @@ def print_score(self, score):
for mkey, mval in server_score[sskey].items():
if mkey not in measures:
measures.append(mkey)
m_names[mkey] = f'{mkey}({mval["count"]}x)'
m_names[mkey] = f'{mkey}({mval["count"]}x{mval["max-parallel"]})'

print('Downloads')
print(f' {"Server":<8} {"Size":>8}', end='')
Expand Down Expand Up @@ -543,6 +549,8 @@ def main():
default=None, help="evaluate download size")
parser.add_argument("--download-count", action='store', type=int,
default=50, help="perform that many downloads")
parser.add_argument("--download-parallel", action='store', type=int,
default=0, help="perform that many downloads in parallel (default all)")
parser.add_argument("-r", "--requests", action='store_true',
default=False, help="evaluate requests")
parser.add_argument("--request-count", action='store', type=int,
Expand Down Expand Up @@ -607,7 +615,8 @@ def main():
assert caddy.start()

card = ScoreCard(env=env, httpd=httpd, nghttpx=nghttpx, caddy=caddy,
verbose=args.verbose, curl_verbose=args.curl_verbose)
verbose=args.verbose, curl_verbose=args.curl_verbose,
download_parallel=args.download_parallel)
score = card.score_proto(proto=protocol,
handshakes=handshakes,
downloads=downloads,
Expand Down

0 comments on commit 2d2c27e

Please sign in to comment.