From 2d2c27e5a319665fdfbe75422dac55ff3a2218b6 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Wed, 20 Mar 2024 08:08:43 +0100 Subject: [PATCH] multi: multi_wait improvements - only call `multi_getsock()` once for all transfers - realloc pollset array on demand - fold repeated sockets Closes #13150 --- lib/multi.c | 129 +++++++++++++++++++++++----------------- tests/http/scorecard.py | 17 ++++-- 2 files changed, 86 insertions(+), 60 deletions(-) diff --git a/lib/multi.c b/lib/multi.c index 7e7590d60f8b..ea7961e34a7a 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -1289,6 +1289,29 @@ static void reset_socket_fdwrite(curl_socket_t s) } #endif +static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len, + unsigned int inc, bool *is_malloced) +{ + struct pollfd *new_fds, *old_fds = *pfds; + unsigned int new_len = *pfds_len + inc; + + new_fds = calloc(new_len, sizeof(struct pollfd)); + if(!new_fds) { + if(*is_malloced) + free(old_fds); + *pfds = NULL; + *pfds_len = 0; + return CURLM_OUT_OF_MEMORY; + } + memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd)); + if(*is_malloced) + free(old_fds); + *pfds = new_fds; + *pfds_len = new_len; + *is_malloced = TRUE; + return CURLM_OK; +} + #define NUM_POLLS_ON_STACK 10 static CURLMcode multi_wait(struct Curl_multi *multi, @@ -1302,12 +1325,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi, struct Curl_easy *data; struct easy_pollset ps; size_t i; - unsigned int nfds = 0; - unsigned int curlfds; long timeout_internal; int retcode = 0; struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; struct pollfd *ufds = &a_few_on_stack[0]; + unsigned int ufds_len = NUM_POLLS_ON_STACK; + unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */ bool ufds_malloc = FALSE; #ifdef USE_WINSOCK WSANETWORKEVENTS wsa_events; @@ -1326,13 +1349,6 @@ static CURLMcode multi_wait(struct Curl_multi *multi, if(timeout_ms < 0) return CURLM_BAD_FUNCTION_ARGUMENT; - /* Count up how many fds we have from the multi handle */ - memset(&ps, 0, sizeof(ps)); - for(data = multi->easyp; data; data = data->next) { - multi_getsock(data, &ps); - nfds += ps.num; - } - /* If the internally desired timeout is actually shorter than requested from the outside, then use the shorter time! But only if the internal timer is actually larger than -1! */ @@ -1340,70 +1356,61 @@ static CURLMcode multi_wait(struct Curl_multi *multi, if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms)) timeout_ms = (int)timeout_internal; - curlfds = nfds; /* number of internal file descriptors */ - nfds += extra_nfds; /* add the externally provided ones */ - -#ifdef ENABLE_WAKEUP -#ifdef USE_WINSOCK - if(use_wakeup) { -#else - if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { -#endif - ++nfds; - } -#endif - - if(nfds > NUM_POLLS_ON_STACK) { - /* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes - big, so at 2^29 sockets this value might wrap. When a process gets - the capability to actually handle over 500 million sockets this - calculation needs an integer overflow check. */ - ufds = malloc(nfds * sizeof(struct pollfd)); - if(!ufds) - return CURLM_OUT_OF_MEMORY; - ufds_malloc = TRUE; - } nfds = 0; + memset(ufds, 0, ufds_len * sizeof(struct pollfd)); + memset(&ps, 0, sizeof(ps)); - /* only do the second loop if we found descriptors in the first stage run - above */ - - if(curlfds) { - /* Add the curl handles to our pollfds first */ - for(data = multi->easyp; data; data = data->next) { - multi_getsock(data, &ps); + /* Add the curl handles to our pollfds first */ + for(data = multi->easyp; data; data = data->next) { + multi_getsock(data, &ps); - for(i = 0; i < ps.num; i++) { - struct pollfd *ufd = &ufds[nfds++]; + for(i = 0; i < ps.num; i++) { + short events = 0; #ifdef USE_WINSOCK - long mask = 0; + long mask = 0; #endif - ufd->fd = ps.sockets[i]; - ufd->events = 0; - if(ps.actions[i] & CURL_POLL_IN) { + if(ps.actions[i] & CURL_POLL_IN) { #ifdef USE_WINSOCK - mask |= FD_READ|FD_ACCEPT|FD_CLOSE; + mask |= FD_READ|FD_ACCEPT|FD_CLOSE; #endif - ufd->events |= POLLIN; - } - if(ps.actions[i] & CURL_POLL_OUT) { + events |= POLLIN; + } + if(ps.actions[i] & CURL_POLL_OUT) { #ifdef USE_WINSOCK - mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; - reset_socket_fdwrite(ps.sockets[i]); + mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; + reset_socket_fdwrite(ps.sockets[i]); #endif - ufd->events |= POLLOUT; + events |= POLLOUT; + } + if(events) { + if(nfds && ps.sockets[i] == ufds[nfds-1].fd) { + ufds[nfds-1].events |= events; } + else { + if(nfds >= ufds_len) { + if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc)) + return CURLM_OUT_OF_MEMORY; + } + DEBUGASSERT(nfds < ufds_len); + ufds[nfds].fd = ps.sockets[i]; + ufds[nfds].events = events; + ++nfds; + } + } #ifdef USE_WINSOCK + if(mask) { if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) { if(ufds_malloc) free(ufds); return CURLM_INTERNAL_ERROR; } -#endif } +#endif } } + curl_nfds = nfds; /* what curl internally used in ufds */ + /* Add external file descriptions from poll-like struct curl_waitfd */ for(i = 0; i < extra_nfds; i++) { #ifdef USE_WINSOCK @@ -1422,6 +1429,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi, return CURLM_INTERNAL_ERROR; } #endif + if(nfds >= ufds_len) { + if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc)) + return CURLM_OUT_OF_MEMORY; + } + DEBUGASSERT(nfds < ufds_len); ufds[nfds].fd = extra_fds[i].fd; ufds[nfds].events = 0; if(extra_fds[i].events & CURL_WAIT_POLLIN) @@ -1436,6 +1448,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi, #ifdef ENABLE_WAKEUP #ifndef USE_WINSOCK if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { + if(nfds >= ufds_len) { + if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc)) + return CURLM_OUT_OF_MEMORY; + } + DEBUGASSERT(nfds < ufds_len); ufds[nfds].fd = multi->wakeup_pair[0]; ufds[nfds].events = POLLIN; ++nfds; @@ -1475,7 +1492,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, struct, the bit values of the actual underlying poll() implementation may not be the same as the ones in the public libcurl API! */ for(i = 0; i < extra_nfds; i++) { - unsigned r = ufds[curlfds + i].revents; + unsigned r = ufds[curl_nfds + i].revents; unsigned short mask = 0; #ifdef USE_WINSOCK curl_socket_t s = extra_fds[i].fd; @@ -1508,7 +1525,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, #ifdef USE_WINSOCK /* Count up all our own sockets that had activity, and remove them from the event. */ - if(curlfds) { + if(curl_nfds) { for(data = multi->easyp; data; data = data->next) { multi_getsock(data, &ps); @@ -1529,7 +1546,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, #else #ifdef ENABLE_WAKEUP if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { - if(ufds[curlfds + extra_nfds].revents & POLLIN) { + if(ufds[curl_nfds + extra_nfds].revents & POLLIN) { char buf[64]; ssize_t nread; while(1) { diff --git a/tests/http/scorecard.py b/tests/http/scorecard.py index 446a1bc5cfb8..d4e19c101deb 100644 --- a/tests/http/scorecard.py +++ b/tests/http/scorecard.py @@ -49,13 +49,15 @@ def __init__(self, env: Env, nghttpx: Optional[Nghttpx], caddy: Optional[Caddy], verbose: int, - curl_verbose: int): + curl_verbose: int, + download_parallel: int = 0): self.verbose = verbose self.env = env self.httpd = httpd self.nghttpx = nghttpx self.caddy = caddy self._silent_curl = not curl_verbose + self._download_parallel = download_parallel def info(self, msg): if self.verbose > 0: @@ -138,6 +140,7 @@ def transfer_single(self, url: str, proto: str, count: int): return { 'count': count, 'samples': sample_size, + 'max-parallel': 1, 'speed': mean(samples) if len(samples) else -1, 'errors': errors, 'stats': RunProfile.AverageStats(profiles), @@ -164,6 +167,7 @@ def transfer_serial(self, url: str, proto: str, count: int): return { 'count': count, 'samples': sample_size, + 'max-parallel': 1, 'speed': mean(samples) if len(samples) else -1, 'errors': errors, 'stats': RunProfile.AverageStats(profiles), @@ -174,6 +178,7 @@ def transfer_parallel(self, url: str, proto: str, count: int): samples = [] errors = [] profiles = [] + max_parallel = self._download_parallel if self._download_parallel > 0 else count url = f'{url}?[0-{count - 1}]' self.info(f'parallel...') for i in range(sample_size): @@ -182,7 +187,7 @@ def transfer_parallel(self, url: str, proto: str, count: int): with_headers=False, with_profile=True, extra_args=['--parallel', - '--parallel-max', str(count)]) + '--parallel-max', str(max_parallel)]) err = self._check_downloads(r, count) if err: errors.append(err) @@ -193,6 +198,7 @@ def transfer_parallel(self, url: str, proto: str, count: int): return { 'count': count, 'samples': sample_size, + 'max-parallel': max_parallel, 'speed': mean(samples) if len(samples) else -1, 'errors': errors, 'stats': RunProfile.AverageStats(profiles), @@ -436,7 +442,7 @@ def print_score(self, score): for mkey, mval in server_score[sskey].items(): if mkey not in measures: measures.append(mkey) - m_names[mkey] = f'{mkey}({mval["count"]}x)' + m_names[mkey] = f'{mkey}({mval["count"]}x{mval["max-parallel"]})' print('Downloads') print(f' {"Server":<8} {"Size":>8}', end='') @@ -543,6 +549,8 @@ def main(): default=None, help="evaluate download size") parser.add_argument("--download-count", action='store', type=int, default=50, help="perform that many downloads") + parser.add_argument("--download-parallel", action='store', type=int, + default=0, help="perform that many downloads in parallel (default all)") parser.add_argument("-r", "--requests", action='store_true', default=False, help="evaluate requests") parser.add_argument("--request-count", action='store', type=int, @@ -607,7 +615,8 @@ def main(): assert caddy.start() card = ScoreCard(env=env, httpd=httpd, nghttpx=nghttpx, caddy=caddy, - verbose=args.verbose, curl_verbose=args.curl_verbose) + verbose=args.verbose, curl_verbose=args.curl_verbose, + download_parallel=args.download_parallel) score = card.score_proto(proto=protocol, handshakes=handshakes, downloads=downloads,