diff --git a/.github/workflows/Dockerfile.buster b/.github/workflows/Dockerfile.buster index 9c394c9681..8f405e8a9a 100644 --- a/.github/workflows/Dockerfile.buster +++ b/.github/workflows/Dockerfile.buster @@ -16,6 +16,7 @@ RUN apt-get update && \ rm -f "$TEMP_DEB" && \ echo "deb http://apt.postgresql.org/pub/repos/apt buster-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \ wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \ + apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29 && \ apt-get update && \ apt-get install -y --no-install-recommends \ git cmake-data=3.18* cmake=3.18* make g++ gperf netcat \ diff --git a/builtin-functions/_functions.txt b/builtin-functions/_functions.txt index bdadcb4c00..da5ba4a0a2 100644 --- a/builtin-functions/_functions.txt +++ b/builtin-functions/_functions.txt @@ -1085,7 +1085,7 @@ function curl_errno ($curl_handle ::: int) ::: int; function curl_close ($curl_handle ::: int) ::: void; /** @kphp-extern-func-info resumable */ -function curl_exec_concurrently($curl_handle ::: int, $timeout ::: float = 1.0): ?string; +function curl_exec_concurrently($curl_handle ::: int, $timeout ::: float = 1.0): string|false; function curl_multi_init () ::: int; function curl_multi_add_handle ($multi_handle ::: int, $curl_handle ::: int) ::: int|false; diff --git a/net/net-aes-keys.cpp b/net/net-aes-keys.cpp index cc93746186..97d1520701 100644 --- a/net/net-aes-keys.cpp +++ b/net/net-aes-keys.cpp @@ -22,6 +22,8 @@ #include "common/server/signals.h" #include "common/wrappers/memory-utils.h" +DEFINE_VERBOSITY(net_crypto_aes) + static_assert(AES_KEY_MIN_LEN >= sizeof(((aes_key_t *) NULL)->id), "key_size"); static aes_key_t **aes_loaded_keys; @@ -47,7 +49,7 @@ bool aes_key_add(aes_key_t *aes_key) { aes_key_t *added_key = aes_loaded_keys[i]; if (aes_key->id == added_key->id || !strcmp(aes_key->filename, added_key->filename)) { - vkprintf(2, "Cannot add AES key %d(\"%s\"): already added %d(\"%s\")\n", aes_key->id, aes_key->filename, added_key->id, added_key->filename); + tvkprintf(net_crypto_aes, 1, "Cannot add AES key %d(\"%s\"): already added %d(\"%s\")\n", aes_key->id, aes_key->filename, added_key->id, added_key->filename); return false; } } @@ -55,7 +57,7 @@ bool aes_key_add(aes_key_t *aes_key) { aes_loaded_keys = static_cast(realloc(aes_loaded_keys, sizeof(aes_key) * (aes_loaded_keys_size + 1))); aes_loaded_keys[aes_loaded_keys_size++] = aes_key; - vkprintf(1, "Add AES key %u(\"%s\")\n", aes_key->id, aes_key->filename); + tvkprintf(net_crypto_aes, 1, "Add AES key %u(\"%s\")\n", aes_key->id, aes_key->filename); return true; } @@ -65,7 +67,7 @@ static bool aes_key_set_default(const char *filename) { for (size_t i = 0; i < aes_loaded_keys_size; ++i) { aes_key_t *key = aes_loaded_keys[i]; if (!strcmp(key->filename, filename)) { - vkprintf(1, "Setting default AES key to: %d(\"%s\")\n", key->id, key->filename); + tvkprintf(net_crypto_aes, 1, "Setting default AES key to: %d(\"%s\")\n", key->id, key->filename); default_aes_key = key; return true; } @@ -91,23 +93,23 @@ aes_key_t *aes_key_load_memory(const char* filename, uint8_t *key, int32_t key_l static aes_key_t *aes_key_load_fd(int fd, const char *filename) { struct stat st; if (fstat(fd, &st) == -1) { - vkprintf(2, "Cannot fstat() AES key fd: %d(\"%s\"): %m\n", fd, filename); + tvkprintf(net_crypto_aes, 1, "Cannot fstat() AES key fd: %d(\"%s\"): %m\n", fd, filename); return NULL; } if (st.st_size < AES_KEY_MIN_LEN) { - vkprintf(2, "Ignoring too small AES key: %jd(min %d)\n", (intmax_t)(st.st_size), AES_KEY_MIN_LEN); + tvkprintf(net_crypto_aes, 1, "Ignoring too small AES key: %jd(min %d)\n", (intmax_t)(st.st_size), AES_KEY_MIN_LEN); return NULL; } if (st.st_size > AES_KEY_MAX_LEN) { - vkprintf(2, "Ignoring too large AES key: %jd(max %d)\n", (intmax_t)(st.st_size), AES_KEY_MAX_LEN); + tvkprintf(net_crypto_aes, 1, "Ignoring too large AES key: %jd(max %d)\n", (intmax_t)(st.st_size), AES_KEY_MAX_LEN); return NULL; } char buffer[AES_KEY_MAX_LEN]; if (!read_exact(fd, buffer, st.st_size)) { - vkprintf(2, "Cannot read AES key fd: %d: %m\n", fd); + tvkprintf(net_crypto_aes, 1, "Cannot read AES key fd: %d: %m\n", fd); return NULL; } @@ -125,7 +127,7 @@ static bool aes_key_load_file(int fd, const char *path) { close(fd); if (!key) { free(tmp_path); - vkprintf(1, "Cannot load AES key from fd: %d(\"%s\"): %m\n", fd, path); + tvkprintf(net_crypto_aes, 1, "Cannot load AES key from fd: %d(\"%s\"): %m\n", fd, path); return false; } @@ -145,14 +147,14 @@ static bool aes_key_load_dir(int fd) { const int fd = openat(dir_fd, entry->d_name, O_NOFOLLOW); if (fd == -1) { if(errno != ELOOP) { - vkprintf(1, "Cannot openat() AES key dir entry: \"%s\": %m\n", entry->d_name); + tvkprintf(net_crypto_aes, 1, "Cannot openat() AES key dir entry: \"%s\": %m\n", entry->d_name); } continue; } struct stat st; if (fstat(fd, &st) == -1) { - vkprintf(1, "Cannot fstatat() AES key dir entry: \"%s\": %m\n", entry->d_name); + tvkprintf(net_crypto_aes, 1, "Cannot fstatat() AES key dir entry: \"%s\": %m\n", entry->d_name); continue; } @@ -168,7 +170,7 @@ static bool aes_key_load_dir(int fd) { char buffer[NAME_MAX + 1]; if (readlinkat(dir_fd, "default", buffer, sizeof(buffer)) == -1) { assert(!closedir(dir)); - vkprintf(1, "Cannot readlinkat() \"default\" AES key symlink\n"); + tvkprintf(net_crypto_aes, 1, "Cannot readlinkat() \"default\" AES key symlink\n"); return false; } assert(!closedir(dir)); @@ -179,14 +181,14 @@ static bool aes_key_load_dir(int fd) { bool aes_key_load_path(const char *path) { const int fd = open(path, O_RDONLY); if (fd == -1) { - vkprintf(1, "Cannot open() AES key path: \"%s\": %m\n", path); + tvkprintf(net_crypto_aes, 1, "Cannot open() AES key path: \"%s\": %m\n", path); return false; } struct stat st; if (fstat(fd, &st) == -1) { close(fd); - vkprintf(1, "Cannot fstat() AES key path fd: %d: %m\n", fd); + tvkprintf(net_crypto_aes, 1, "Cannot fstat() AES key path fd: %d: %m\n", fd); return false; } @@ -199,7 +201,7 @@ bool aes_key_load_path(const char *path) { } close(fd); - vkprintf(1, "Unexpected file type for AES key path: %u\n", S_IFMT & st.st_mode); + tvkprintf(net_crypto_aes, 1, "Unexpected file type for AES key path: %u\n", S_IFMT & st.st_mode); return false; } diff --git a/net/net-aes-keys.h b/net/net-aes-keys.h index 5dd35f9ca1..786e458ffc 100644 --- a/net/net-aes-keys.h +++ b/net/net-aes-keys.h @@ -8,9 +8,13 @@ #include #include +#include "common/kprintf.h" + #define AES_KEY_MIN_LEN 32 #define AES_KEY_MAX_LEN 256 +DECLARE_VERBOSITY(net_crypto_aes); + struct aes_key { const char *filename; int32_t id; diff --git a/net/net-connections.cpp b/net/net-connections.cpp index 01337e4c3d..c610a6e53b 100644 --- a/net/net-connections.cpp +++ b/net/net-connections.cpp @@ -50,6 +50,8 @@ #define USE_EPOLLET 0 #define MAX_RECONNECT_INTERVAL 20 +DEFINE_VERBOSITY(net_connections) + static int bucket_salt; int max_connection; @@ -185,6 +187,7 @@ int set_write_timer(struct connection *c); 15.08.2013: now C_WANTRD is never cleared anymore (we don't understand what bug we were fixing originally by this) */ int server_writer(struct connection *c) { + tvkprintf(net_connections, 3, "server write to conn %d\n", c->fd); int r, s, t = 0, check_watermark; char *to; @@ -223,7 +226,7 @@ int server_writer(struct connection *c) { set_write_timer(c); break; } - vkprintf(7, "limited write to connection %d by %d bytes\n", c->fd, max_bytes); + tvkprintf(net_connections, 4, "limited write to connection %d by %d bytes\n", c->fd, max_bytes); } if (c->limit_per_write && max_bytes > c->limit_per_write) { @@ -243,21 +246,15 @@ int server_writer(struct connection *c) { kprintf("Too much EAGAINs for connection %d (%s), dropping\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint)); fail_connection(c, -123); } + } else { + tvkprintf(net_connections, 1, "writev(): %m\n"); } } else { c->eagain_count = 0; } - if (verbosity > 2) { - if (r < 0) { - perror("send()"); - } - if (verbosity > 6) { - kprintf("send/writev() to %d: %d written out of %d in %d chunks at %p (%.*s)\n", c->fd, r, s, iovcnt, to, ((unsigned)r < 64) ? r : 64, to); - } else { - kprintf("send/writev() to %d: %d written out of %d in %d chunks\n", c->fd, r, s, iovcnt); - } - } + tvkprintf(net_connections, 4, "send/writev() to %d: %d written out of %d in %d chunks at %p (%.*s)\n", + c->fd, r, s, iovcnt, to, ((unsigned)r < 64) ? r : 64, to); if (r > 0) { @@ -315,6 +312,7 @@ int server_writer(struct connection *c) { NEED_MORE_BYTES=0x7fffffff : need at least one byte more */ int server_reader(struct connection *c) { + tvkprintf(net_connections, 3, "server read from conn %d\n", c->fd); int res = 0, r, r1, s; char *to; @@ -341,7 +339,7 @@ int server_reader(struct connection *c) { s = get_write_space(&c->In); if (s <= 0) { - vkprintf(0, "error while reading from connection #%d (type %p(%s); in.bytes=%d, out.bytes=%d, tmp.bytes=%d; peer %s): cannot allocate read buffer\n", + kprintf("error while reading from connection #%d (type %p(%s); in.bytes=%d, out.bytes=%d, tmp.bytes=%d; peer %s): cannot allocate read buffer\n", c->fd, c->type, c->type ? c->type->title : "", c->In.total_bytes + c->In.unprocessed_bytes, c->Out.total_bytes + c->Out.unprocessed_bytes, c->Tmp ? c->Tmp->total_bytes : 0, sockaddr_storage_to_string(&c->remote_endpoint)); @@ -363,7 +361,7 @@ int server_reader(struct connection *c) { if (r >= 0) { assert(!(msg.msg_flags & MSG_TRUNC || msg.msg_flags & MSG_CTRUNC)); - vkprintf(3, "Ancillary data size: %" PRIu64 "\n", static_cast(msg.msg_controllen)); + tvkprintf(net_connections, 4, "Ancillary data size: %" PRIu64 "\n", static_cast(msg.msg_controllen)); if (c->type->ancillary_data_received) { for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { @@ -380,9 +378,9 @@ int server_reader(struct connection *c) { c->flags |= C_NORD; } - vkprintf(3, "recv() from %d: %d read out of %d\n", c->fd, r, s); + tvkprintf(net_connections, 4, "recv() from %d: %d read out of %d\n", c->fd, r, s); if (r < 0 && errno != EAGAIN) { - vkprintf(2, "recv(): %s\n", strerror(errno)); + tvkprintf(net_connections, 1, "recv(): %s\n", strerror(errno)); } if (r > 0) { @@ -413,9 +411,7 @@ int server_reader(struct connection *c) { advance_read_ptr(&c->In, r1); c->skip_bytes = s += r1; - if (verbosity > 2) { - fprintf(stderr, "skipped %d bytes, %d more to skip\n", r1, -s); - } + tvkprintf(net_connections, 4, "skipped %d bytes, %d more to skip\n", r1, -s); if (s) { continue; } @@ -426,8 +422,7 @@ int server_reader(struct connection *c) { if (r1 >= s) { c->skip_bytes = s = 0; } - - vkprintf(2, "fetched %d bytes, %d available bytes, %d more to load\n", r, r1, s ? s - r1 : 0); + tvkprintf(net_connections, 4, "fetched %d bytes, %d available bytes, %d more to load\n", r, r1, s ? s - r1 : 0); if (s) { continue; } @@ -499,6 +494,7 @@ int server_reader(struct connection *c) { int clear_connection_write_timeout(struct connection *c); int server_close_connection(struct connection *c, int who __attribute__((unused))) { + tvkprintf(net_connections, 3, "server close conn %d\n", c->fd); struct conn_query *q; clear_connection_timeout(c); @@ -556,6 +552,7 @@ void compute_next_reconnect(conn_target_t *S) { } int client_close_connection(struct connection *c, int who __attribute__((unused))) { + tvkprintf(net_connections, 3, "client close conn %d\n", c->fd); struct conn_query *q; conn_target_t *S = c->target; @@ -642,6 +639,7 @@ static inline int compute_conn_events(struct connection *c) { #endif void close_special_connection(struct connection *c) { + tvkprintf(net_connections, 3, "close special conn %d\n", c->fd); if (c->basic_type != ct_listen) { --active_special_connections; on_active_special_connections_update_callback(); @@ -653,7 +651,7 @@ void close_special_connection(struct connection *c) { } int force_clear_connection(struct connection *c) { - vkprintf(3, "socket %d: forced closing\n", c->fd); + tvkprintf(net_connections, 3, "force clean conn %d\n", c->fd); if (c->status != conn_connecting) { active_connections--; if (c->flags & C_SPECIAL) { @@ -724,18 +722,17 @@ int server_read_write(struct connection *c) { int res, inconn_mask = c->flags | ~C_INCONN; event_t *ev = c->ev; - vkprintf(3, "BEGIN processing connection %d, status=%d, flags=%d, pending=%d; epoll_ready=%d, ev->ready=%d\n", c->fd, c->status, c->flags, c->pending_queries, + tvkprintf(net_connections, 3, "begin processing connection %d, status=%d, flags=%d, pending=%d; epoll_ready=%d, ev->ready=%d\n", c->fd, c->status, c->flags, c->pending_queries, ev->epoll_ready, ev->ready); c->flags |= C_INCONN; if (!c->interrupted && ev->epoll_ready & (EPOLLHUP | EPOLLERR | EPOLLRDHUP | EPOLLPRI)) { if ((ev->epoll_ready & EPOLLIN) && (c->flags & C_WANTRD) && !(c->flags & (C_NORD | C_FAILED | C_STOPREAD))) { - vkprintf(3, "reading buffered data from socket %d, before closing\n", c->fd); + tvkprintf(net_connections, 4, "reading buffered data from socket %d, before closing\n", c->fd); c->type->reader(c); } - - vkprintf(1 + !(ev->epoll_ready & EPOLLPRI), "socket %d: disconnected (epoll_ready=%02x), cleaning\n", c->fd, ev->epoll_ready); + tvkprintf(net_connections, 3 + !(ev->epoll_ready & EPOLLPRI), "socket %d: disconnected (epoll_ready=%02x), closing ignored %d\n", c->fd, ev->epoll_ready, c->ignored); // When connection is closed on client side, we have two options to do: // (1) Close connection on server side immediately // (2) Delay closing connection on server side, until we leave the ignore_user_abort "critical section" @@ -761,7 +758,7 @@ int server_read_write(struct connection *c) { if (c->status == conn_connecting) { /* connecting... */ if (ev->ready & EVT_WRITE) { - vkprintf(1, "socket #%d to %s becomes active\n", c->fd, sockaddr_storage_to_string(&c->target->endpoint)); + tvkprintf(net_connections, 4, "socket #%d to %s becomes active\n", c->fd, sockaddr_storage_to_string(&c->target->endpoint)); conn_target_t *S = c->target; S->active_outbound_connections++; active_outbound_connections++; @@ -775,7 +772,7 @@ int server_read_write(struct connection *c) { } c->type->check_ready(c); - vkprintf(3, "socket #%d: ready=%d\n", c->fd, c->ready); + tvkprintf(net_connections, 4, "socket #%d: ready=%d\n", c->fd, c->ready); } if (c->status == conn_connecting) { c->flags &= inconn_mask; @@ -831,7 +828,7 @@ int server_read_write(struct connection *c) { If we have run out of buffers for c->In, c->error = -1, res = -1. As much output bytes have been encrypted as possible. */ - vkprintf(3, "server_reader=%d, ready=%02x, state=%02x\n", res, c->ev->ready, c->ev->state); + tvkprintf(net_connections, 4, "server_reader=%d, ready=%02x, state=%02x\n", res, c->ev->ready, c->ev->state); if (res || c->skip_bytes) { /* we have processed as much inbound queries as possible, leaving main loop */ break; @@ -861,7 +858,7 @@ int server_read_write(struct connection *c) { } if (c->error || c->status == conn_error || (c->status == conn_write_close && !(c->flags & C_WANTWR)) || (c->flags & C_FAILED)) { - vkprintf(1, "socket %d: closing and cleaning (error code=%d)\n", c->fd, c->error); + tvkprintf(net_connections, 1, "conn %d: closing and cleaning (error code=%d)\n", c->fd, c->error); if (c->interrupted) { // Here is delayed connection closing described above at case (2) @@ -884,12 +881,13 @@ int server_read_write(struct connection *c) { c->flags &= inconn_mask; - vkprintf(3, "END processing connection %d, status=%d, flags=%d, pending=%d\n", c->fd, c->status, c->flags, c->pending_queries); + tvkprintf(net_connections, 3, "finish processing connection %d, status=%d, flags=%d, pending=%d\n", c->fd, c->status, c->flags, c->pending_queries); return compute_conn_events(c); } int server_read_write_gateway(int fd __attribute__((unused)), void *data, event_t *ev) { + tvkprintf(net_connections, 3, "server read write gateway on fd %d", fd); struct connection *c = (struct connection *)data; assert(c); assert(c->type); @@ -907,7 +905,7 @@ int server_read_write_gateway(int fd __attribute__((unused)), void *data, event_ if (ev->epoll_ready & EPOLLERR) { int error; if (!socket_error(c->fd, &error)) { - vkprintf(1, "got error for tcp socket #%d, %s : %s\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint), strerror(error)); + tvkprintf(net_connections, 1, "got error for tcp socket #%d, %s : %s\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint), strerror(error)); } } } @@ -917,7 +915,7 @@ int server_read_write_gateway(int fd __attribute__((unused)), void *data, event_ int conn_timer_wakeup_gateway(event_timer_t *et) { struct connection *c = container_of(et, struct connection, timer); - vkprintf(2, "ALARM: awakening connection %d at %p, status=%d, pending=%d\n", c->fd, c, c->status, c->pending_queries); + tvkprintf(net_connections, 3, "ALARM: awakening connection %d at %p, status=%d, pending=%d\n", c->fd, c, c->status, c->pending_queries); c->flags |= C_ALARM; put_event_into_heap(c->ev); return 0; @@ -925,7 +923,7 @@ int conn_timer_wakeup_gateway(event_timer_t *et) { int conn_write_timer_wakeup_gateway(event_timer_t *et) { struct connection *c = container_of(et, struct connection, write_timer); - vkprintf(2, "writer wakeup: awakening connection %d at %p, status=%d\n", c->fd, c, c->status); + tvkprintf(net_connections, 3, "writer wakeup: awakening connection %d at %p, status=%d\n", c->fd, c, c->status); if (out_total_processed_bytes(c) + out_total_unprocessed_bytes(c) > 0) { c->flags = (c->flags | C_WANTWR) & ~C_NOWR; } @@ -964,6 +962,7 @@ int clear_connection_write_timeout(struct connection *c) { } int fail_connection(struct connection *c, int err) { + tvkprintf(net_connections, 3, "fail on conn %d, err %d\n", c->fd, err); if (!(c->flags & C_FAILED)) { if (err != -17) { total_failed_connections++; @@ -1058,7 +1057,7 @@ int accept_new_connections(struct connection *cc) { const int cfd = accept4(cc->fd, (struct sockaddr *)&peer, &peer_addrlen, SOCK_CLOEXEC); if (cfd < 0) { if (!acc) { - vkprintf(errno == EAGAIN ? 1 : 0, "accept(%d) unexpectedly returns %d: %m\n", cc->fd, cfd); + tvkprintf(net_connections, 1, "accept(%d) unexpectedly returns %d: %m\n", cc->fd, cfd); } break; } @@ -1079,14 +1078,14 @@ int accept_new_connections(struct connection *cc) { if (getsockopt(cfd, SOL_SOCKET, SO_PEERCRED, &creds, &len) == 0) { creds_found = true; } else { - kprintf("Failed to dectect credentials by getsockopt, let's try message: %s\n", strerror(errno)); + kprintf("failed to dectect credentials by getsockopt, let's try message: %s\n", strerror(errno)); if (!socket_enable_unix_passcred(cfd)) { kprintf("Cannot enable SO_PASSCRED on UNIX socket: %s\n", strerror(errno)); close(cfd); continue; } } - vkprintf(2, "Enabled SO_PASSCRED on UNIX socket: fd %d\n", cfd); + tvkprintf(net_connections, 4, "Enabled SO_PASSCRED on UNIX socket: fd %d\n", cfd); } else { assert(peer_addrlen == sizeof(struct sockaddr_in)); assert(peer.ss_family == AF_INET); @@ -1094,7 +1093,7 @@ int accept_new_connections(struct connection *cc) { } char buffer_peer[SOCKADDR_STORAGE_BUFFER_SIZE], buffer_self[SOCKADDR_STORAGE_BUFFER_SIZE]; - vkprintf(1, "accepted incoming connection of type %s, flags=%x, at %s -> %s, fd=%d\n", cc->type->title, cc->flags, sockaddr_storage_to_buffer(&peer, buffer_peer), + tvkprintf(net_connections, 3, "accepted incoming connection of type %s, flags=%x, at %s -> %s, fd=%d\n", cc->type->title, cc->flags, sockaddr_storage_to_buffer(&peer, buffer_peer), sockaddr_storage_to_buffer(&self, buffer_self), cfd); int flags; @@ -1145,7 +1144,7 @@ int accept_new_connections(struct connection *cc) { if (cc->flags & C_UNIX) { c->flags |= C_UNIX; } - vkprintf(2, "Accepted connection flags: %x\n", c->flags); + tvkprintf(net_connections, 3, "accepted connection flags: %08x\n", c->flags); c->first_query = c->last_query = (struct conn_query *)c; if (c->type->init_accepted(c) >= 0) { @@ -1166,7 +1165,7 @@ int accept_new_connections(struct connection *cc) { if (cc->flags & C_SPECIAL) { c->flags |= C_SPECIAL; if (active_special_connections >= max_special_connections) { - vkprintf(active_special_connections >= max_special_connections + 16 ? 0 : 1, + tvkprintf(net_connections, active_special_connections >= max_special_connections + 16 ? 0 : 3, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n", active_special_connections, max_special_connections); } @@ -1203,7 +1202,7 @@ int server_check_ready(struct connection *c) { } void ancillary_data_received(struct connection *c, const struct cmsghdr *cmsg) { - vkprintf(2, "Ancillary data received\n"); + tvkprintf(net_connections, 4, "Ancillary data received\n"); if (c->flags & C_UNIX) { const void *payload = CMSG_DATA(cmsg); @@ -1212,7 +1211,7 @@ void ancillary_data_received(struct connection *c, const struct cmsghdr *cmsg) { if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) { assert(payload_size == sizeof(struct ucred)); const struct ucred *credentials = static_cast(payload); - vkprintf(1, "Credentials received: PID: %d, UID: %d, GID: %d\n", credentials->pid, credentials->uid, credentials->gid); + tvkprintf(net_connections, 4, "Credentials received: PID: %d, UID: %d, GID: %d\n", credentials->pid, credentials->uid, credentials->gid); c->credentials = *credentials; assert(socket_disable_unix_passcred(c->fd)); return; @@ -1562,13 +1561,12 @@ static void convert_target(const conn_target_t **source, conn_target_t *converte // `was_created` return value: 1 = created new, 2 = refcnt changed from 0 to 1, 0 = existed before with refcnt > 0. conn_target_t *create_target(const conn_target_t *source, int *was_created) { - vkprintf(2, "%s: %s\n", __func__, sockaddr_storage_to_string(&source->endpoint)); conn_target_t conn_target_converted; convert_target(&source, &conn_target_converted); conn_target_t *t = find_target(source, lookup, NULL); - vkprintf(2, "%s: %s t->refcnt %d\n", __func__, sockaddr_storage_to_string(&source->endpoint), t ? t->refcnt : 0); + tvkprintf(net_connections, 4, "%s: %s t->refcnt %d\n", __func__, sockaddr_storage_to_string(&source->endpoint), t ? t->refcnt : 0); if (t) { assert(t->refcnt >= 0); t->min_connections = source->min_connections; @@ -1619,7 +1617,7 @@ static int free_target(conn_target_t *S) { assert(S->first_conn == (struct connection *)S); assert(S->first_query == (struct conn_query *)S); - vkprintf(1, "Freeing unused target to %s\n", sockaddr_storage_to_string(&S->endpoint)); + tvkprintf(net_connections, 4, "Freeing unused target to %s\n", sockaddr_storage_to_string(&S->endpoint)); assert(S == find_target(S, erase, NULL)); remove_target_from_list(S); @@ -1640,7 +1638,7 @@ int destroy_target(conn_target_t *S) { assert(S); assert(S->type); assert(S->refcnt > 0); - vkprintf(1, "%s: %s refcnt %d\n", __func__, sockaddr_storage_to_string(&S->endpoint), S->refcnt); + tvkprintf(net_connections, 4, "%s: %s refcnt %d\n", __func__, sockaddr_storage_to_string(&S->endpoint), S->refcnt); if (!--S->refcnt) { remove_target_from_list(S); insert_target_into_list(S, &InactiveTargets); @@ -1752,19 +1750,17 @@ int create_new_connections(conn_target_t *S) { struct sockaddr_storage endpoint = S->endpoint; convert_endpoint(&S->endpoint, &endpoint); - vkprintf(2, "Creating NEW connection to %s\n", sockaddr_storage_to_string(&endpoint)); - const int cfd = S->type->create_outbound ? S->type->create_outbound(&endpoint) : make_client_socket(&endpoint); if (cfd < 0) { compute_next_reconnect(S); - vkprintf(1, "error connecting to %s: %m\n", sockaddr_storage_to_string(&endpoint)); + tvkprintf(net_connections, 1, "error connecting to %s: %m\n", sockaddr_storage_to_string(&endpoint)); return count; } if (cfd >= MAX_EVENTS || cfd >= MAX_CONNECTIONS) { close(cfd); compute_next_reconnect(S); - vkprintf(1, "out of sockets when connecting to %s\n", sockaddr_storage_to_string(&endpoint)); + tvkprintf(net_connections, 1, "out of sockets when connecting to %s\n", sockaddr_storage_to_string(&endpoint)); return count; } @@ -1813,7 +1809,7 @@ int create_new_connections(conn_target_t *S) { } char buffer_local[SOCKADDR_STORAGE_BUFFER_SIZE], buffer_remote[SOCKADDR_STORAGE_BUFFER_SIZE]; - vkprintf(2, "Created new outbound connection %s -> %s\n", sockaddr_storage_to_buffer(&c->local_endpoint, buffer_local), + tvkprintf(net_connections, 4, "created new outbound connection %s -> %s\n", sockaddr_storage_to_buffer(&c->local_endpoint, buffer_local), sockaddr_storage_to_buffer(&c->remote_endpoint, buffer_remote)); if (c->type->init_outbound(c) >= 0) { @@ -1837,7 +1833,7 @@ int create_new_connections(conn_target_t *S) { h->prev->next = c; h->prev = c; - vkprintf(2, "outbound connection: handle %d to %s\n", c->fd, sockaddr_storage_to_string(&endpoint)); + tvkprintf(net_connections, 4, "bind handle %d to %s\n", c->fd, sockaddr_storage_to_buffer(&c->remote_endpoint, buffer_remote)); } return count; } @@ -1952,7 +1948,7 @@ void install_client_connection(conn_target_t *S, int fd) { int conn_event_wakeup_gateway(event_timer_t *et) { struct conn_query *q = container_of(et, struct conn_query, timer); - vkprintf(2, "ALARM: awakened pending query %p [%d -> %d]\n", q, q->requester ? q->requester->fd : -1, q->outbound ? q->outbound->fd : -1); + tvkprintf(net_connections, 4, "ALARM: awakened pending query %p [%d -> %d]\n", q, q->requester ? q->requester->fd : -1, q->outbound ? q->outbound->fd : -1); return q->cq_type->wakeup(q); } @@ -1993,23 +1989,23 @@ int push_conn_query_into_list(struct conn_query *q, struct conn_query *h) { } int insert_conn_query(struct conn_query *q) { - vkprintf(2, "insert_conn_query(%p)\n", q); + tvkprintf(net_connections, 4, "insert_conn_query(%p)\n", q); struct conn_query *h = (struct conn_query *)q->outbound; return insert_conn_query_into_list(q, h); } int push_conn_query(struct conn_query *q) { - vkprintf(2, "push_conn_query(%p)\n", q); + tvkprintf(net_connections, 4, "push_conn_query(%p)\n", q); struct conn_query *h = (struct conn_query *)q->outbound; return push_conn_query_into_list(q, h); } int delete_conn_query(struct conn_query *q) { if (!q->prev && !q->next) { - vkprintf(2, "delete_conn_query (%p) at second time\n", q); + tvkprintf(net_connections, 4, "delete_conn_query (%p) at second time\n", q); return 0; } - vkprintf(2, "delete_conn_query (%p)\n", q); + tvkprintf(net_connections, 4, "delete_conn_query (%p)\n", q); assert(q->prev && q->next); q->next->prev = q->prev; q->prev->next = q->next; @@ -2017,7 +2013,7 @@ int delete_conn_query(struct conn_query *q) { if (q->requester && q->requester->generation == q->req_generation) { if (!--q->requester->pending_queries) { /* wake up master */ - vkprintf(2, "socket %d was the last one, waking master %d\n", q->outbound ? q->outbound->fd : -1, q->requester->fd); + tvkprintf(net_connections, 4, "socket %d was the last one, waking master %d\n", q->outbound ? q->outbound->fd : -1, q->requester->fd); if (!q->requester->ev->in_queue) { put_event_into_heap(q->requester->ev); } @@ -2033,11 +2029,11 @@ int delete_conn_query(struct conn_query *q) { // arseny30: added for php-engine int delete_conn_query_from_requester(struct conn_query *q) { - vkprintf(2, "delete_conn_query_from_requester (%p)\n", q); + tvkprintf(net_connections, 4, "delete_conn_query_from_requester (%p)\n", q); if (q->requester && q->requester->generation == q->req_generation) { if (!--q->requester->pending_queries) { /* wake up master */ - vkprintf(2, "socket %d was the last one, waking master %d\n", q->outbound ? q->outbound->fd : -1, q->requester->fd); + tvkprintf(net_connections, 4, "socket %d was the last one, waking master %d\n", q->outbound ? q->outbound->fd : -1, q->requester->fd); if (!q->requester->ev->in_queue) { put_event_into_heap(q->requester->ev); } @@ -2141,7 +2137,7 @@ void cond_dump_connection_buffers_stats() { int write_out_chk(struct connection *c, const void *data, int len) { int res = write_out(&c->Out, data, len); if (res < len && !(c->flags & C_FAILED)) { - vkprintf(0, "error while writing to connection #%d (type %p(%s); in.bytes=%d, out.bytes=%d, tmp.bytes=%d; peer %s): %d bytes written out of %d\n", c->fd, + kprintf("error while writing to connection #%d (type %p(%s); in.bytes=%d, out.bytes=%d, tmp.bytes=%d; peer %s): %d bytes written out of %d\n", c->fd, c->type, c->type ? c->type->title : "", c->In.total_bytes + c->In.unprocessed_bytes, c->Out.total_bytes + c->Out.unprocessed_bytes, c->Tmp ? c->Tmp->total_bytes : 0, sockaddr_storage_to_string(&c->remote_endpoint), res, len); fail_connection(c, -666); diff --git a/net/net-connections.h b/net/net-connections.h index 7f4548e77e..2b8cbf5754 100644 --- a/net/net-connections.h +++ b/net/net-connections.h @@ -10,12 +10,15 @@ #include #include "common/macos-ports.h" +#include "common/kprintf.h" #include "net/net-buffers.h" #include "net/net-events.h" #include "net/net-msg.h" #include "net/net-sockaddr-storage.h" +DECLARE_VERBOSITY(net_connections); + #define MAX_TARGETS 65536 #define PRIME_TARGETS 99961 diff --git a/net/net-crypto-aes.cpp b/net/net-crypto-aes.cpp index ffa9971628..33987578e2 100644 --- a/net/net-crypto-aes.cpp +++ b/net/net-crypto-aes.cpp @@ -141,7 +141,7 @@ static int initialize_pseudo_random() { if (fd >= 0) { r = read(fd, rand_buf, 16); if (r > 0) { - vkprintf(2, "added %d bytes of real entropy to the seed\n", r); + tvkprintf(net_crypto_aes, 4, "added %d bytes of real entropy to the seed\n", r); } close(fd); diff --git a/net/net-dc.cpp b/net/net-dc.cpp index 2a9b1ba6b8..580c409b68 100644 --- a/net/net-dc.cpp +++ b/net/net-dc.cpp @@ -74,7 +74,7 @@ int is_same_data_center(struct connection *c, int is_client) { } char buffer_local[SOCKADDR_STORAGE_BUFFER_SIZE], buffer_remote[SOCKADDR_STORAGE_BUFFER_SIZE]; - vkprintf(3, "check_same_data_center(%d): %s -> %s\n", c->fd, sockaddr_storage_to_buffer(&c->local_endpoint, buffer_local), + vkprintf(4, "check_same_data_center(%d): %s -> %s\n", c->fd, sockaddr_storage_to_buffer(&c->local_endpoint, buffer_local), sockaddr_storage_to_buffer(&c->remote_endpoint, buffer_remote)); if (c->flags & C_IPV6) { // we don't use it now assert(c->local_endpoint.ss_family == AF_INET6); diff --git a/net/net-events.h b/net/net-events.h index c22c1bd079..e3116d0018 100644 --- a/net/net-events.h +++ b/net/net-events.h @@ -92,12 +92,8 @@ static inline int epoll_fetch_events(int timeout) { events = 0; } if (events < 0) { - tvkprintf(net_events, 0, "epoll_wait(): %m\n"); + kprintf("epoll_wait(): %m\n"); } - if (verbosity > 2 && events) { - tvkprintf(net_events, 0, "epoll_wait(%d, ...) = %d\n", main_thread_reactor.epoll_fd, events); - } - net_reactor_fetch_events(&main_thread_reactor, events); return events; diff --git a/net/net-http-server.cpp b/net/net-http-server.cpp index a4d97606fa..f4915b2099 100644 --- a/net/net-http-server.cpp +++ b/net/net-http-server.cpp @@ -98,8 +98,6 @@ struct http_server_functions default_http_server = { int hts_default_execute (struct connection *c, int op) { struct hts_data *D = HTS_DATA(c); - vkprintf(1, "http_server: op=%d, header_size=%d\n", op, D->header_size); - if (op != htqt_empty) { netw_queries++; if (op != htqt_get) { @@ -133,6 +131,7 @@ int hts_init_accepted (struct connection *c __attribute__((unused))) {//TODO: th } int hts_close_connection (struct connection *c, int who) { + tvkprintf(net_connections, 3, "server close http conn %d\n", c->fd); http_connections--; if (HTS_FUNC(c)->ht_close != NULL) { @@ -208,6 +207,7 @@ int write_http_error (struct connection *c, int code) { } int hts_parse_execute (struct connection *c) { + tvkprintf(net_connections, 3, "server start processing http conn %d\n", c->fd); struct hts_data *D = HTS_DATA(c); char *ptr, *ptr_s, *ptr_e; int len; @@ -222,7 +222,7 @@ int hts_parse_execute (struct connection *c) { } while (ptr < ptr_e && c->parse_state != htqp_done) { - + tvkprintf(net_connections, 4, "server parse conn %d in state %d\n", c->fd, c->parse_state); switch (c->parse_state) { case htqp_start: //fprintf (stderr, "htqp_start: ptr=%p (%.8s), hsize=%d, qf=%d, words=%d\n", ptr, ptr, D->header_size, D->query_flags, D->query_words); @@ -632,6 +632,7 @@ int hts_parse_execute (struct connection *c) { int hts_std_wakeup (struct connection *c) { + tvkprintf(net_connections, 3, "server standard http wakeup on conn %d\n", c->fd); if (c->status == conn_wait_net || c->status == conn_wait_aio) { c->status = conn_expect_query; HTS_FUNC(c)->ht_wakeup (c); @@ -647,6 +648,7 @@ int hts_std_wakeup (struct connection *c) { } int hts_std_alarm (struct connection *c) { + tvkprintf(net_connections, 3, "server standard http alarm on conn %d\n", c->fd); HTS_FUNC(c)->ht_alarm (c); if (c->Out.total_bytes > 0) { c->flags |= C_WANTWR; diff --git a/net/net-ifnet.cpp b/net/net-ifnet.cpp index 44252ffbef..6f2a7e1a13 100644 --- a/net/net-ifnet.cpp +++ b/net/net-ifnet.cpp @@ -85,7 +85,7 @@ unsigned get_my_ipv4() { if (force_ipv4_mask != 0xff000000 || force_ipv4_ip != (10 << 24)) { assert(my_ip != 0 && "can't choose ip in given subnet"); } - vkprintf (1, "using main IP %d.%d.%d.%d/%d at interface %s\n", (my_ip >> 24), (my_ip >> 16) & 255, (my_ip >> 8) & 255, my_ip & 255, + vkprintf (2, "using main IP %d.%d.%d.%d/%d at interface %s\n", (my_ip >> 24), (my_ip >> 16) & 255, (my_ip >> 8) & 255, my_ip & 255, __builtin_clz(~my_netmask), my_iface ?: "(none)"); freeifaddrs(ifa_first); return my_ip; @@ -106,10 +106,10 @@ int get_my_ipv6(unsigned char ipv6[16]) { continue; } memcpy(ip, &((struct sockaddr_in6 *)ifa->ifa_addr)->sin6_addr, 16); - vkprintf (2, "test IP %s at interface %s\n", ipv6_to_print(ip), ifa->ifa_name); + vkprintf (4, "test IP %s at interface %s\n", ipv6_to_print(ip), ifa->ifa_name); if ((ip[0] & 0xf0) != 0x30 && (ip[0] & 0xf0) != 0x20) { - vkprintf (2, "not a global ipv6 address\n"); + vkprintf (4, "not a global ipv6 address\n"); continue; } @@ -139,7 +139,7 @@ int get_my_ipv6(unsigned char ipv6[16]) { m++; } } - vkprintf (1, "using main IP %s/%d at interface %s\n", ipv6_to_print(ipv6), m, my_iface); + vkprintf (2, "using main IP %s/%d at interface %s\n", ipv6_to_print(ipv6), m, my_iface); freeifaddrs(ifa_first); return 1; } diff --git a/net/net-memcache-client.cpp b/net/net-memcache-client.cpp index 1ec72c5de9..5a8cbd408a 100644 --- a/net/net-memcache-client.cpp +++ b/net/net-memcache-client.cpp @@ -364,7 +364,7 @@ static int mcc_parse_execute (struct connection *c) { D->response_len += len; if (D->response_flags & 48) { - vkprintf (0, "bad response from memcache server at %s, connection %d\n", sockaddr_storage_to_string(&c->remote_endpoint), c->fd); + kprintf("bad response from memcache server at %s, connection %d\n", sockaddr_storage_to_string(&c->remote_endpoint), c->fd); c->error = -1; return 0; } @@ -420,7 +420,7 @@ static int mcc_parse_execute (struct connection *c) { return 0; } } else if (D->response_type == mcrt_NONCE) { - vkprintf (0, "bad response in NONCE from memcache server at %s, connection %d\n", sockaddr_storage_to_string(&c->remote_endpoint), c->fd); + kprintf("bad response in NONCE from memcache server at %s, connection %d\n", sockaddr_storage_to_string(&c->remote_endpoint), c->fd); c->status = conn_error; c->error = -1; return 0; @@ -450,7 +450,7 @@ static int mcc_parse_execute (struct connection *c) { } if (D->response_flags & 48) { //write_out (&c->Out, "CLIENT_ERROR\r\n", 14); - vkprintf (0, "bad response from memcache server at %s, connection %d\n", sockaddr_storage_to_string(&c->remote_endpoint), c->fd); + kprintf("bad response from memcache server at %s, connection %d\n", sockaddr_storage_to_string(&c->remote_endpoint), c->fd); c->status = conn_error; c->error = -1; return 0; @@ -471,7 +471,7 @@ static int mcc_parse_execute (struct connection *c) { int mcc_connected (struct connection *c) { c->last_query_sent_time = precise_now; - vkprintf (2, "connection #%d: connected, crypto_flags = %d\n", c->fd, MCC_DATA(c)->crypto_flags); + vkprintf(4, "connection #%d: connected, crypto_flags = %d\n", c->fd, MCC_DATA(c)->crypto_flags); if (MCC_FUNC(c)->mc_check_perm) { int res = MCC_FUNC(c)->mc_check_perm (c); if (res < 0) { @@ -627,7 +627,7 @@ int mcc_start_crypto (struct connection *c, char *key, int key_len) { int mcc_flush_query (struct connection *c) { if (c->crypto) { int pad_bytes = c->type->crypto_needed_output_bytes (c); - vkprintf(2, "mcc_flush_query: padding with %d bytes\n", pad_bytes); + vkprintf(4, "mc: flush query (padding with %d bytes)\n", pad_bytes); if (pad_bytes > 0) { static char pad_str[16] = {'\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n', '\n'}; assert (pad_bytes <= 15); diff --git a/net/net-memcache-server.cpp b/net/net-memcache-server.cpp index 881d21efa3..6beebaefd5 100644 --- a/net/net-memcache-server.cpp +++ b/net/net-memcache-server.cpp @@ -127,7 +127,7 @@ int mcs_execute (struct connection *c, int op) { skip = D->query_len - D->key_offset - D->key_len; assert (advance_skip_read_ptr (&c->In, skip) == skip); - vkprintf(1, "mc_set: op=%d, key '%s', key_len=%d, flags=%lld, time=%lld, value_len=%lld\n", op, key_buffer, D->key_len, D->args[0], D->args[1], D->args[2]); + vkprintf(4, "mc: set op=%d, key '%s', key_len=%d, flags=%lld, time=%lld, value_len=%lld\n", op, key_buffer, D->key_len, D->args[0], D->args[1], D->args[2]); restart_set: @@ -177,7 +177,7 @@ int mcs_execute (struct connection *c, int op) { key_buffer[D->key_len] = 0; D->query_type = op -= mct_set_resume - mct_set; - vkprintf(1, "mc_set_resume: op=%d, key '%s', key_len=%d, flags=%lld, time=%lld, value_len=%lld\n", op, key_buffer, D->key_len, D->args[0], D->args[1], D->args[2]); + vkprintf(4, "mc: set_resume op=%d, key '%s', key_len=%d, flags=%lld, time=%lld, value_len=%lld\n", op, key_buffer, D->key_len, D->args[0], D->args[1], D->args[2]); goto restart_set; @@ -270,10 +270,10 @@ int mcs_execute (struct connection *c, int op) { key_buffer[D->key_len] = 0; if (op == mct_delete) { - vkprintf(1, "mc_delete: key '%s', key_len=%d\n", key_buffer, D->key_len); + vkprintf(4, "mc_delete: key '%s', key_len=%d\\n", key_buffer, D->key_len); res = MCS_FUNC(c)->mc_delete (c, key_buffer, D->key_len); } else { - vkprintf(1, "mc_incr: op=%d, key '%s', key_len=%d, arg=%lld\n", op, key_buffer, D->key_len, D->args[0]); + vkprintf(4, "mc_incr: op=%d, key '%s', key_len=%d, arg=%lld\n", op, key_buffer, D->key_len, D->args[0]); res = MCS_FUNC(c)->mc_incr (c, op - mct_incr, key_buffer, D->key_len, D->args[0]); } @@ -329,7 +329,7 @@ int mcs_parse_execute (struct connection *c) { int len; long long tt; - vkprintf(2, "c->pending_queries = %d, c->status = %d\n", c->pending_queries, c->status); + vkprintf(4, "in mcs_execute c->pending_queries = %d, c->status = %d\n", c->pending_queries, c->status); while (c->status == conn_expect_query || c->status == conn_reading_query) { len = nbit_ready_bytes (&c->Q); ptr = ptr_s = static_cast(nbit_get_ptr (&c->Q)); @@ -588,7 +588,7 @@ int mcs_parse_execute (struct connection *c) { if (!strncmp (key_buffer, "@#$AuTh$#@", 10)) { assert (advance_skip_read_ptr (&c->In, D->query_len) == D->query_len); - vkprintf(1, "got AUTH: delete '%s'\n", key_buffer); + vkprintf(4, "mc got AUTH: delete '%s'\n", key_buffer); if (c->In.total_bytes) { c->status = conn_error; @@ -630,9 +630,9 @@ int mcs_parse_execute (struct connection *c) { if (!MCS_FUNC(c)->execute) { MCS_FUNC(c)->execute = mcs_execute; } - vkprintf(2, "c->pending_queries = %d, c->status = %d\n", c->pending_queries, c->status); + vkprintf(4, "mc c->pending_queries = %d, c->status = %d\n", c->pending_queries, c->status); int res = MCS_FUNC(c)->execute (c, D->query_type); - vkprintf(2, "c->pending_queries = %d, c->status = %d, res = %d\n", c->pending_queries, c->status, res); + vkprintf(4, "mc c->pending_queries = %d, c->status = %d, res = %d\n", c->pending_queries, c->status, res); if (res > 0) { c->status = conn_reading_query; return res; // need more bytes @@ -648,7 +648,6 @@ int mcs_parse_execute (struct connection *c) { c->status = conn_expect_query; } - vkprintf(2, "c->pending_queries = %d, c->status = %d\n", c->pending_queries, c->status); assert ((c->pending_queries && (c->status == conn_wait_net || c->status == conn_wait_aio)) || (!c->pending_queries && c->status == conn_expect_query) || c->status == conn_error); if (c->status == conn_wait_net || c->status == conn_wait_aio) { @@ -729,7 +728,7 @@ int return_one_key_list (struct connection *c, const char *key, int key_len, int const size_t buff_size = 16; static char buff[buff_size]; - vkprintf(1, "result = %d\n", res); + vkprintf(4, "result = %d\n", res); if (!R_cnt) { if (res == 0x7fffffff) { @@ -802,7 +801,7 @@ int return_one_key_list_long(struct connection *c, const char *key, int key_len, const size_t buf_size = 16; static char buff[buf_size]; - vkprintf(1, "result = %d\n", res); + vkprintf(4, "result = %d\n", res); if (!R_cnt) { if (res == 0x7fffffff) { diff --git a/net/net-mysql-client.cpp b/net/net-mysql-client.cpp index 87e11cd610..9a569c842e 100644 --- a/net/net-mysql-client.cpp +++ b/net/net-mysql-client.cpp @@ -168,7 +168,7 @@ int sqlc_flush_packet (struct connection *c, int packet_len) { } assert (packet_len == pad_bytes); } - vkprintf (2, "sqlc_flush_query: padding with %d bytes\n", pad_bytes); + vkprintf(4, "sqlc_flush_query: padding with %d bytes\n", pad_bytes); if (pad_bytes > 0) { static char pad_str[16]; assert (pad_bytes <= 15); @@ -191,13 +191,13 @@ static int sqlc_inner_authorise (struct connection *c) { struct mysql_auth_packet_end *T; char scramble_len = 20; - vkprintf(2, "server_auth_packet received, len=%d\n", D->packet_len); + vkprintf(4, "server_auth_packet received, len=%d\n", D->packet_len); if (len >= 0x800) { - vkprintf(1, "server_auth_packet too large\n"); + vkprintf(4, "server_auth_packet too large\n"); return -1; } if (len < 46) { - vkprintf(1, "server_auth_packet too small\n"); + vkprintf(4, "server_auth_packet too small\n"); return -1; } assert (force_ready_bytes (&c->In, len+4) >= len+4); @@ -205,7 +205,7 @@ static int sqlc_inner_authorise (struct connection *c) { q = p + len; if (*p != 10) { - vkprintf(1, "server_auth_packet has bad protocol version\n"); + vkprintf(4, "server_auth_packet has bad protocol version\n"); return -1; } @@ -218,7 +218,7 @@ static int sqlc_inner_authorise (struct connection *c) { } if (p == q) { - vkprintf(1, "unterminated version string in server_auth_packet\n"); + vkprintf(4, "unterminated version string in server_auth_packet\n"); return -1; } @@ -233,27 +233,27 @@ static int sqlc_inner_authorise (struct connection *c) { size_t expected_length = sizeof (struct mysql_auth_packet_end); if (((struct mysql_auth_packet_end*)p)->proto_len){ if (strncmp(((struct mysql_auth_packet_end*)p)->proto, "mysql_native_password", 21)){ - vkprintf(1, "unknown auth_proto %s\n", ((struct mysql_auth_packet_end*)p)->proto); + vkprintf(4, "unknown auth_proto %s\n", ((struct mysql_auth_packet_end*)p)->proto); return -1; } expected_length += 22; } if (q - p != expected_length) { - vkprintf (1, "server_auth_packet has incorrect size\n"); + vkprintf(4, "server_auth_packet has incorrect size\n"); return -1; } int res = !SQLC_FUNC(c)->is_mysql_same_datacenter_check_disabled() && SQLC_FUNC(c)->sql_check_perm ? SQLC_FUNC(c)->sql_check_perm (c) : 1; if (res < 0 || !(res &= 3)) { - vkprintf (1, "check_perm forbids access for connection %d\n", c->fd); + vkprintf(4, "check_perm forbids access for connection %d\n", c->fd); return -1; } D->crypto_flags = res; - vkprintf(2, "crypto flags here = %d\n", D->crypto_flags); + vkprintf(4, "crypto flags here = %d\n", D->crypto_flags); if ((res & 2) && p - r >= 8 && !memcmp(r, "5.0.239-", 8) && SQLC_FUNC(c)->sql_init_crypto && SQLC_FUNC(c)->sql_init_crypto (c, r + 8, p - r - 9) > 0) { D->crypto_flags &= 2; @@ -262,10 +262,10 @@ static int sqlc_inner_authorise (struct connection *c) { D->crypto_flags &= 1; } - vkprintf(2, "crypto flags adjusted %d\n", D->crypto_flags); + vkprintf(4, "crypto flags adjusted %d\n", D->crypto_flags); if (!(D->crypto_flags & 3)) { - vkprintf(1, "unable to initialise cryptography, closing connection %d\n", c->fd); + vkprintf(4, "unable to initialise cryptography, closing connection %d\n", c->fd); return -1; } @@ -275,7 +275,7 @@ static int sqlc_inner_authorise (struct connection *c) { const char *sql_user = SQLC_FUNC(c)->sql_get_user(c); const char *sql_password = SQLC_FUNC(c)->sql_get_password(c); - vkprintf(1, "mysql db: %s; user: %s; password: *\n", sql_database, sql_user); + vkprintf(4, "mysql db: %s; user: %s; password: *\n", sql_database, sql_user); sha1 ((unsigned char *)sql_password, strlen (sql_password), (unsigned char *)stage1_hash); memcpy (password_sha1, T->scramble1, 8); @@ -310,7 +310,7 @@ int sqlc_parse_execute (struct connection *c) { struct sqlc_data *D = SQLC_DATA(c); int len = nbit_total_ready_bytes (&c->Q); static unsigned int psize; - vkprintf(2, "sqlc_parse_execute(%d), status=%d, bytes=%d, packet_state=%d, packet_len=%d\n", c->fd, c->status, len, D->packet_state, D->packet_len); + vkprintf(4, "sqlc_parse_execute(%d), status=%d, bytes=%d, packet_state=%d, packet_len=%d\n", c->fd, c->status, len, D->packet_state, D->packet_len); char *p; while (len > 0 && !(c->flags & (C_FAILED | C_STOPREAD))) { @@ -337,7 +337,7 @@ int sqlc_parse_execute (struct connection *c) { } /* complete packet ready */ c->last_response_time = precise_now; - vkprintf(2, "client packet ready: len=%d, seq_num=%d\n", D->packet_len, D->packet_seq); + vkprintf(4, "client packet ready: len=%d, seq_num=%d\n", D->packet_len, D->packet_seq); if (D->auth_state == sql_noauth) { int res = sqlc_inner_authorise(c); if (res < 0) { @@ -368,11 +368,11 @@ int sqlc_parse_execute (struct connection *c) { if (SQLC_FUNC(c)->sql_authorized (c)) { c->status = conn_error; c->error = -1; - vkprintf(2, "sql authorisation failed\n"); + vkprintf(4, "sql authorisation failed\n"); return 0; } - vkprintf(2, "outcoming authorization successful\n"); + vkprintf(4, "outcoming authorization successful\n"); } else if (D->auth_state == sql_auth_initdb) { @@ -380,7 +380,7 @@ int sqlc_parse_execute (struct connection *c) { if (D->packet_len == 0 || *p) { c->status = conn_error; c->error = -1; - vkprintf(2, "ok packet expected in response to initdb\n"); + vkprintf(4, "ok packet expected in response to initdb\n"); return 0; } D->auth_state = sql_auth_ok; @@ -393,7 +393,7 @@ int sqlc_parse_execute (struct connection *c) { kprintf ("ok packet expected\n"); return 0; }*/ - vkprintf(2, "outcoming initdb successful\n"); + vkprintf(4, "outcoming initdb successful\n"); SQLC_FUNC(c)->sql_becomes_ready (c); return 0; } else { @@ -402,7 +402,7 @@ int sqlc_parse_execute (struct connection *c) { assert (D->auth_state == sql_auth_ok); //dump_connection_buffers (c); - vkprintf(2, "execute, op=%d\n", op); + vkprintf(4, "execute, op=%d\n", op); int keep_total_bytes = c->In.total_bytes; @@ -526,7 +526,7 @@ int sqlc_default_check_perm (struct connection *c) { } int sqlc_init_crypto (struct connection *c, char *key, int key_len) { - vkprintf(2, "sqlc_init_crypto (%p [fd=%d], '%.*s' [%d])\n", c, c->fd, key_len, key, key_len); + vkprintf(4, "sqlc_init_crypto (%p [fd=%d], '%.*s' [%d])\n", c, c->fd, key_len, key, key_len); if (c->crypto) { return -1; @@ -544,7 +544,7 @@ int sqlc_init_crypto (struct connection *c, char *key, int key_len) { int mytime = time (0); - vkprintf(3, "remote time %d, local %d\n", utime, mytime); + vkprintf(4, "remote time %d, local %d\n", utime, mytime); if (abs (mytime - utime) > 10) { return -1; diff --git a/net/net-reactor.cpp b/net/net-reactor.cpp index 3a6624aace..7693f65d83 100644 --- a/net/net-reactor.cpp +++ b/net/net-reactor.cpp @@ -287,7 +287,7 @@ void net_reactor_fetch_events(net_reactor_ctx_t *ctx, int num_events) { void net_reactor_update_timer_counters(net_reactor_ctx_t *ctx, int timeout) { const double wait_time = ctx->last_wait - ctx->wait_start; if (wait_time > (timeout / 1000.0 + 0.5)) { - tvkprintf(net_events, 0, "epoll-wait worked too long: %.3fs\n", wait_time); + kprintf("epoll-wait worked too long: %.3fs\n", wait_time); } ctx->total_idle_time += wait_time; ctx->average_idle_time += wait_time; @@ -488,7 +488,7 @@ int net_reactor_run_timers(net_reactor_ctx_t *ctx) { wait_time = ctx->timer_heap[1]->wakeup_time - precise_now; if (wait_time > 0) { // do not remove this useful debug! - tvkprintf(net_events, 3, "%d event timers, next in %.3f seconds\n", ctx->timer_heap_size, wait_time); + tvkprintf(net_events, 4, "%d event timers, next in %.3f seconds\n", ctx->timer_heap_size, wait_time); return (int)(std::min(100.0, wait_time) * 1000) + 1; // min to prevent integer overflow } diff --git a/net/net-socket-options.cpp b/net/net-socket-options.cpp index cdc3a270b5..4c8734dd75 100644 --- a/net/net-socket-options.cpp +++ b/net/net-socket-options.cpp @@ -150,7 +150,7 @@ void socket_maximize_sndbuf(int socket, int max) { } } - vkprintf(2, "<%d send buffer was %d, now %d\n", socket, old_size, last_good); + vkprintf(4, "<%d send buffer was %d, now %d\n", socket, old_size, last_good); } void socket_maximize_rcvbuf(int socket, int max) { @@ -185,5 +185,5 @@ void socket_maximize_rcvbuf(int socket, int max) { } } - vkprintf(2, ">%d receive buffer was %d, now %d\n", socket, old_size, last_good); + vkprintf(4, ">%d receive buffer was %d, now %d\n", socket, old_size, last_good); } diff --git a/net/net-socket.cpp b/net/net-socket.cpp index 72bca87bec..989401a102 100644 --- a/net/net-socket.cpp +++ b/net/net-socket.cpp @@ -54,7 +54,7 @@ const char *unix_socket_path(const char *directory, const char *owner, uint16_t static struct sockaddr_un path; if (snprintf(path.sun_path, sizeof(path.sun_path), "%s/%s/%d", directory, owner, port) >= sizeof(path.sun_path)) { - vkprintf(0, "Too long UNIX socket path: \"%s/%s/%d\": %zu bytes exceeds\n", directory, owner, port, sizeof(path.sun_path)); + kprintf("Too long UNIX socket path: \"%s/%s/%d\": %zu bytes exceeds\n", directory, owner, port, sizeof(path.sun_path)); return NULL; } @@ -81,7 +81,7 @@ int prepare_unix_socket_directory(const char *directory, const char *username, c int dirfd = open(directory, O_DIRECTORY); if (dirfd == -1) { if (errno == ENOENT) { - vkprintf(1, "Trying to create UNIX socket directory: \"%s\"\n", directory); + vkprintf(4, "Trying to create UNIX socket directory: \"%s\"\n", directory); const mode_t dirmode = S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH; if (mkdir(directory, dirmode) == -1 && errno != EEXIST) { vkprintf(1, "Cannot mkdir() UNIX socket directory: \"%s\": %s\n", directory, strerror(errno)); @@ -257,7 +257,7 @@ int server_socket(int port, struct in_addr in_addr, int backlog, int mode) { addr.sin_port = htons(port); addr.sin_addr = in_addr; if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { - vkprintf(0, "bind(%s:%d): %s\n", inet_ntoa(in_addr), port, strerror(errno)); + kprintf("bind(%s:%d): %s\n", inet_ntoa(in_addr), port, strerror(errno)); close(sfd); return -1; } @@ -270,7 +270,7 @@ int server_socket(int port, struct in_addr in_addr, int backlog, int mode) { addr.sin6_addr = in6addr_any; if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) { - vkprintf(0, "bind(%s:%d): %s\n", inet_ntoa(in_addr), port, strerror(errno)); + kprintf("bind(%s:%d): %s\n", inet_ntoa(in_addr), port, strerror(errno)); close(sfd); return -1; } @@ -303,11 +303,11 @@ int server_socket_unix(const struct sockaddr_un *addr, int backlog, int mode) { socket_maximize_rcvbuf(fd, 0); socket_maximize_sndbuf(fd, 0); - vkprintf(1, "Unlinking UNIX socket path: \"%s\"\n", addr->sun_path); + vkprintf(4, "Unlinking UNIX socket path: \"%s\"\n", addr->sun_path); unlink(addr->sun_path); if (bind(fd, (struct sockaddr *) addr, sizeof(*addr)) == -1) { - vkprintf(0, "bind(%s): %s\n", addr->sun_path, strerror(errno)); + kprintf("bind(%s): %s\n", addr->sun_path, strerror(errno)); close(fd); return -1; } @@ -407,7 +407,7 @@ int client_socket_unix(const struct sockaddr_un *addr, int mode) { socket_maximize_sndbuf(fd, 0); socket_maximize_rcvbuf(fd, 0); if (connect(fd, (struct sockaddr *) addr, sizeof(*addr)) == -1 && errno != EINPROGRESS) { - vkprintf(0, "Cannot connect() to \"%s\": %s\n", addr->sun_path, strerror(errno)); + kprintf("Cannot connect() to \"%s\": %s\n", addr->sun_path, strerror(errno)); close(fd); return -1; } diff --git a/net/net-tcp-connections.cpp b/net/net-tcp-connections.cpp index 9f71a0298e..e2fb8ca5ec 100644 --- a/net/net-tcp-connections.cpp +++ b/net/net-tcp-connections.cpp @@ -93,10 +93,10 @@ int prealloc_tcp_buffers() { for (i = tcp_buffers_number - 1; i >= 0; i--) { msg_buffer_t *X = alloc_msg_buffer(tcp_buffers_size); if (!X) { - vkprintf(0, "**FATAL**: cannot allocate tcp receive buffer\n"); + kprintf("cannot allocate tcp receive buffer : calling exit(2)\n"); exit(2); } - vkprintf(3, "allocated %d byte tcp receive buffer #%d at %p\n", msg_buffer_size(X), i, X); + tvkprintf(net_connections, 4, "allocated %d byte tcp receive buffer #%d at %p\n", msg_buffer_size(X), i, X); tcp_recv_buffers[i] = X; tcp_recv_iovec[i + 1].iov_base = X->data; tcp_recv_iovec[i + 1].iov_len = msg_buffer_size(X); @@ -115,6 +115,7 @@ int prealloc_tcp_buffers() { 15.08.2013: now C_WANTRD is never cleared anymore (we don't understand what bug we were fixing originally by this) */ int tcp_server_writer(struct connection *c) { + tvkprintf(net_connections, 3, "tcp server writer on conn %d\n", c->fd); int r, s, t = 0, check_watermark; assert(c->status != conn_connecting); @@ -146,11 +147,10 @@ int tcp_server_writer(struct connection *c) { r = writev(c->fd, iov, iovcnt); - if (verbosity > 2) { - kprintf("send/writev() to %d: %d written out of %d in %d chunks\n", c->fd, r, s, iovcnt); - if (r < 0) { - perror("send()"); - } + tvkprintf(net_connections, 4, "send/writev() to %d: %d written out of %d in %d chunks\n", c->fd, r, s, iovcnt); + + if (r < 0) { + tvkprintf(net_connections, 1, "writev(): %m\n"); } if (r > 0) { @@ -250,8 +250,7 @@ static int tcp_server_reader_inner(struct connection *c, bool once) { r = recvmsg(c->fd, &msg, MSG_DONTWAIT); if (r >= 0) { assert(!(msg.msg_flags & MSG_TRUNC || msg.msg_flags & MSG_CTRUNC)); - - vkprintf(4, "Ancillary data size: %" PRIu64 "\n", static_cast(msg.msg_controllen)); + tvkprintf(net_connections, 4, "Ancillary data size: %" PRIu64 "\n", static_cast(msg.msg_controllen)); if (c->type->ancillary_data_received) { for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { @@ -268,10 +267,9 @@ static int tcp_server_reader_inner(struct connection *c, bool once) { if (r < s || once) { c->flags |= C_NORD; } - - vkprintf(3, "recv() from %d: %d read out of %d. Crypto = %d\n", c->fd, r, s, c->crypto != 0); + tvkprintf(net_connections, 4, "recv() from %d: %d read out of %d. Crypto = %d\n", c->fd, r, s, c->crypto != 0); if (r < 0 && errno != EAGAIN) { - vkprintf(1, "recv(): %s\n", strerror(errno)); + tvkprintf(net_connections, 1, "recv(): %s\n", strerror(errno)); } if (r > 0) { @@ -320,7 +318,7 @@ static int tcp_server_reader_inner(struct connection *c, bool once) { for (i = 0; i < p - 1; i++) { msg_buffer_t *X = alloc_msg_buffer(tcp_buffers_size); if (!X) { - vkprintf(0, "**FATAL**: cannot allocate tcp receive buffer\n"); + kprintf("**FATAL**: cannot allocate tcp receive buffer\n"); exit(2); } tcp_recv_buffers[i] = X; @@ -344,8 +342,7 @@ static int tcp_server_reader_inner(struct connection *c, bool once) { rwm_fetch_data(cin, 0, r1); c->skip_bytes = s += r1; - - vkprintf(3, "skipped %d bytes, %d more to skip\n", r1, -s); + tvkprintf(net_connections, 4, "skipped %d bytes, %d more to skip\n", r1, -s); if (s) { continue; } @@ -356,8 +353,7 @@ static int tcp_server_reader_inner(struct connection *c, bool once) { if (r1 >= s) { c->skip_bytes = s = 0; } - - vkprintf(2, "fetched %d bytes, %d available bytes, %d more to load\n", r, r1, s ? s - r1 : 0); + tvkprintf(net_connections, 4, "fetched %d bytes, %d available bytes, %d more to load\n", r, r1, s ? s - r1 : 0); if (s) { continue; } @@ -423,10 +419,12 @@ static int tcp_server_reader_inner(struct connection *c, bool once) { } int tcp_server_reader(struct connection *c) { + tvkprintf(net_connections, 3, "tcp server reader on conn %d\n", c->fd); return tcp_server_reader_inner(c, true); } int tcp_server_reader_till_end(struct connection *c) { + tvkprintf(net_connections, 3, "tcp server reader till end on conn %d\n", c->fd); return tcp_server_reader_inner(c, false); } diff --git a/net/net-tcp-rpc-client.cpp b/net/net-tcp-rpc-client.cpp index 6c78fe0131..2a32e34c8a 100644 --- a/net/net-tcp-rpc-client.cpp +++ b/net/net-tcp-rpc-client.cpp @@ -70,7 +70,7 @@ conn_type_t ct_tcp_rpc_client = get_default_tcp_rpc_client_conn_type(); int tcp_rpcc_default_execute (struct connection *c, int op, raw_message_t *raw) { - vkprintf (1, "rpcc_execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes); + tvkprintf(net_connections, 3, "rpcc_execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes); if (op == TL_RPC_PING && raw->total_bytes == 12) { c->last_response_time = precise_now; static int Q[12]; @@ -80,7 +80,7 @@ int tcp_rpcc_default_execute (struct connection *c, int op, raw_message_t *raw) P[1] = Q[1]; P[2] = Q[2]; - vkprintf (2, "received ping from %s (val = %lld)\n", sockaddr_storage_to_string(&c->remote_endpoint), *(long long *)(P + 1)); + tvkprintf(net_connections, 4, "received ping from %s (val = %lld)\n", sockaddr_storage_to_string(&c->remote_endpoint), *(long long *)(P + 1)); tcp_rpc_conn_send_data (c, 12, P); return 0; } @@ -101,8 +101,7 @@ static int tcp_rpcc_process_nonce_packet (struct connection *c, raw_message_t *m } assert (rwm_fetch_data (msg, &P, D->packet_len) == D->packet_len); - - vkprintf (2, "Processing nonce packet, crypto schema: %d, key select: %d\n", P.crypto_schema, P.key_select); + tvkprintf(net_connections, 4, "Processing nonce packet, crypto schema: %d, key select: %d\n", P.crypto_schema, P.key_select); switch (P.crypto_schema) { case RPC_CRYPTO_NONE: @@ -136,12 +135,12 @@ static int tcp_rpcc_process_nonce_packet (struct connection *c, raw_message_t *m default: return -4; } - vkprintf (2, "Processed nonce packet, crypto flags = %d\n", D->crypto_flags); + tvkprintf(net_connections, 4, "Processed nonce packet, crypto flags = %d\n", D->crypto_flags); return 0; } static int tcp_rpcc_send_handshake_packet (struct connection *c) { - vkprintf(2, "tcp_rpcc_send_handshake_packet\n"); + tvkprintf(net_connections, 4, "tcp_rpcc_send_handshake_packet\n"); struct tcp_rpc_data *D = TCP_RPC_DATA (c); static struct tcp_rpc_handshake_packet P; if (!PID.ip) { @@ -183,7 +182,7 @@ static int tcp_rpcc_send_handshake_error_packet (struct connection *c, int error } static int tcp_rpcc_process_handshake_packet (struct connection *c, raw_message_t *msg) { - vkprintf(2, "tcp_rpcc_process_handshake_packet\n"); + tvkprintf(net_connections, 4, "tcp_rpcc_process_handshake_packet\n"); struct tcp_rpc_data *D = TCP_RPC_DATA(c); static struct tcp_rpc_handshake_packet P; @@ -223,7 +222,7 @@ static int tcp_rpcc_process_handshake_packet (struct connection *c, raw_message_ } int tcp_rpcc_parse_execute (struct connection *c) { - vkprintf (4, "%s. in_total_bytes = %d\n", __func__, c->in.total_bytes); + tvkprintf (net_connections, 4, "%s. in_total_bytes = %d\n", __func__, c->in.total_bytes); struct tcp_rpc_data *D = TCP_RPC_DATA(c); int len; @@ -239,7 +238,7 @@ int tcp_rpcc_parse_execute (struct connection *c) { } assert (rwm_fetch_lookup (&c->in, &D->packet_len, 4) == 4); if (D->packet_len <= 0 || (D->packet_len & 3) || (D->packet_len > TCP_RPCC_FUNC(c)->max_packet_len && TCP_RPCC_FUNC(c)->max_packet_len > 0)) { - vkprintf (1, "error while parsing packet: bad packet length %d\n", D->packet_len); + tvkprintf(net_connections, 1, "error while parsing packet: bad packet length %d\n", D->packet_len); c->status = conn_error; c->error = -1; return 0; @@ -251,7 +250,7 @@ int tcp_rpcc_parse_execute (struct connection *c) { continue; } if (D->packet_len < 16) { - vkprintf (1, "error while parsing packet: bad packet length %d\n", D->packet_len); + tvkprintf(net_connections, 1, "error while parsing packet: bad packet length %d\n", D->packet_len); c->status = conn_error; c->error = -1; return 0; @@ -274,7 +273,7 @@ int tcp_rpcc_parse_execute (struct connection *c) { assert (rwm_fetch_data_back (&msg, &crc32, 4) == 4); D->packet_crc32 = rwm_custom_crc32 (&msg, D->packet_len - 4, D->custom_crc_partial); if (crc32 != D->packet_crc32) { - vkprintf(1, "error while parsing packet: crc32 = %08x != %08x\n", D->packet_crc32, crc32); + tvkprintf(net_connections, 1, "error while parsing packet: crc32 = %08x != %08x\n", D->packet_crc32, crc32); c->status = conn_error; c->error = -1; rwm_free (&msg); @@ -286,16 +285,13 @@ int tcp_rpcc_parse_execute (struct connection *c) { assert (rwm_fetch_lookup (&msg, &D->packet_type, 4) == 4); D->packet_len -= 12; - if (verbosity > 2) { - kprintf ("received packet from connection %d\n", c->fd); - rwm_dump (&msg); - } - + tvkprintf(net_connections, 4, "received packet from connection %d\n", c->fd); +// rwm_dump (&msg); int res = -1; if (D->packet_num != D->in_packet_num) { - vkprintf (1, "error while parsing packet: got packet num %d, expected %d\n", D->packet_num, D->in_packet_num); + tvkprintf(net_connections, 1, "error while parsing packet: got packet num %d, expected %d\n", D->packet_num, D->in_packet_num); c->status = conn_error; c->error = -1; rwm_free (&msg); @@ -387,7 +383,7 @@ int tcp_rpcc_connected (struct connection *c) { } char buffer_local[SOCKADDR_STORAGE_BUFFER_SIZE], buffer_remote[SOCKADDR_STORAGE_BUFFER_SIZE]; - vkprintf(2, "RPC connection #%d: %s -> %s connected, crypto_flags = %d\n", c->fd, sockaddr_storage_to_buffer(&c->local_endpoint, buffer_local), + tvkprintf(net_connections, 4, "RPC connection #%d: %s -> %s connected, crypto_flags = %d\n", c->fd, sockaddr_storage_to_buffer(&c->local_endpoint, buffer_local), sockaddr_storage_to_buffer(&c->remote_endpoint, buffer_remote), TCP_RPC_DATA(c)->crypto_flags); assert (TCP_RPCC_FUNC(c)->rpc_init_crypto); @@ -440,7 +436,7 @@ int tcp_rpcc_init_fake_crypto (struct connection *c) { int tcp_rpcc_init_outbound (struct connection *c) { - vkprintf (3, "rpcc_init_outbound (%d)\n", c->fd); + tvkprintf(net_connections, 4, "rpcc_init_outbound (%d)\n", c->fd); struct tcp_rpc_data *D = TCP_RPC_DATA(c); c->last_query_sent_time = precise_now; D->custom_crc_partial = crc32_partial; @@ -500,8 +496,7 @@ int tcp_rpcc_init_crypto (struct connection *c) { int tcp_rpcc_start_crypto (struct connection *c, char *nonce, int key_select) { struct tcp_rpc_data *D = TCP_RPC_DATA(c); - - vkprintf (2, "rpcc_start_crypto: key_select = %d\n", key_select); + tvkprintf(net_connections, 4, "rpcc_start_crypto: key_select = %d\n", key_select); if (c->crypto) { return -1; @@ -531,7 +526,7 @@ int tcp_rpcc_start_crypto (struct connection *c, char *nonce, int key_select) { void tcp_rpcc_flush_crypto (struct connection *c) { if (c->crypto) { int pad_bytes = c->type->crypto_needed_output_bytes (c); - vkprintf (2, "rpcc_flush_packet: padding with %d bytes\n", pad_bytes); + tvkprintf(net_connections, 4, "rpcc_flush_packet: padding with %d bytes\n", pad_bytes); if (pad_bytes > 0) { assert (!(pad_bytes & 3)); static int pad_str[3] = {4, 4, 4}; @@ -554,7 +549,7 @@ int tcp_rpcc_flush_packet_later (struct connection *c) { int tcp_rpcc_flush (struct connection *c) { if (c->crypto) { int pad_bytes = c->type->crypto_needed_output_bytes (c); - vkprintf (2, "rpcs_flush: padding with %d bytes\n", pad_bytes); + tvkprintf(net_connections, 4, "rpcs_flush: padding with %d bytes\n", pad_bytes); if (pad_bytes > 0) { assert (!(pad_bytes & 3)); static int pad_str[3] = {4, 4, 4}; diff --git a/net/net-tcp-rpc-common.cpp b/net/net-tcp-rpc-common.cpp index c96f610999..a07f4a5807 100644 --- a/net/net-tcp-rpc-common.cpp +++ b/net/net-tcp-rpc-common.cpp @@ -26,7 +26,7 @@ OPTION_PARSER(OPT_RPC, "no-crc32c", no_argument, "Force use of CRC32 instead of // Flag 1 - can not edit this message. Need to make copy. void tcp_rpc_conn_send (struct connection *c, raw_message_t *raw, int flags) { - vkprintf (3, "%s: sending message of size %d to conn fd=%d\n", __func__, raw->total_bytes, c->fd); + tvkprintf(net_connections, 4, "%s: sending message of size %d to conn fd=%d\n", __func__, raw->total_bytes, c->fd); assert (!(raw->total_bytes & 3)); int Q[2]; Q[0] = raw->total_bytes + 12; @@ -51,7 +51,7 @@ void tcp_rpc_conn_send_data (struct connection *c, int len, void *Q) { } void net_rpc_send_ping (struct connection *c, long long ping_id) { - vkprintf (2, "Sending ping to fd=%d. ping_id = %lld\n", c->fd, ping_id); + tvkprintf(net_connections, 4, "Sending ping to fd=%d. ping_id = %lld\n", c->fd, ping_id); assert(c->flags & C_RAWMSG); static int P[20]; P[0] = TL_RPC_PING; diff --git a/net/net-tcp-rpc-server.cpp b/net/net-tcp-rpc-server.cpp index d089c86f6f..2f9954c99a 100644 --- a/net/net-tcp-rpc-server.cpp +++ b/net/net-tcp-rpc-server.cpp @@ -99,7 +99,7 @@ struct tcp_rpc_server_functions default_tcp_rpc_server = { }; int tcp_rpcs_default_execute (struct connection *c, int op, raw_message_t *raw) { - vkprintf (1, "rpcs_execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes); + tvkprintf(net_connections, 4, "rpcs_execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes); if (op == TL_RPC_PING && raw->total_bytes == 12) { c->last_response_time = precise_now; static int Q[12]; @@ -109,7 +109,7 @@ int tcp_rpcs_default_execute (struct connection *c, int op, raw_message_t *raw) P[1] = Q[1]; P[2] = Q[2]; - vkprintf (2, "received ping from %s (val = %lld)\n", sockaddr_storage_to_string(&c->remote_endpoint), *(long long *)(P + 1)); + tvkprintf(net_connections, 4, "received ping from %s (val = %lld)\n", sockaddr_storage_to_string(&c->remote_endpoint), *(long long *)(P + 1)); tcp_rpc_conn_send_data (c, 12, P); flush_later (c); @@ -238,7 +238,7 @@ static int tcp_rpcs_process_handshake_packet (struct connection *c, raw_message_ assert (rwm_fetch_data (msg, &P, D->packet_len) == D->packet_len); memcpy (&D->remote_pid, &P.sender_pid, sizeof (struct process_id)); if (matches_pid(&PID, &P.peer_pid) == no_pid_match) { - vkprintf (1, "PID mismatch during handshake: local %08x:%hu:%hu:%u, remote %08x:%hu:%hu:%u\n", + tvkprintf(net_connections, 1, "PID mismatch during handshake: local %08x:%hu:%hu:%u, remote %08x:%hu:%hu:%u\n", PID.ip, PID.port, PID.pid, PID.utime, P.peer_pid.ip, P.peer_pid.port, P.peer_pid.pid, P.peer_pid.utime); tcp_rpcs_send_handshake_error_packet (c, -4); return -4; @@ -260,7 +260,7 @@ static int __raw_msg_to_conn (void *extra, const void *data, int len) { } int tcp_rpcs_parse_execute (struct connection *c) { - vkprintf (4, "%s. in_total_bytes = %d\n", __func__, c->in.total_bytes); + tvkprintf(net_connections, 4, "%s. in_total_bytes = %d\n", __func__, c->in.total_bytes); struct tcp_rpc_data *D = TCP_RPC_DATA(c); int len; @@ -281,7 +281,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { D->packet_len &= 0x7fffffff; } if ((D->packet_len > TCP_RPCS_FUNC(c)->max_packet_len && TCP_RPCS_FUNC(c)->max_packet_len > 0)) { - vkprintf (1, "error while parsing packet: bad packet length %d\n", D->packet_len); + tvkprintf(net_connections, 1, "error while parsing packet: bad packet length %d\n", D->packet_len); c->status = conn_error; c->error = -1; return 0; @@ -289,7 +289,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { if (D->packet_len <= 0 || (D->packet_len & 0xc0000003)) { if (D->in_packet_num <= -2 && (D->packet_len == 0x656c6564 || D->packet_len == 0x74617473 || D->packet_len == 0x73726576 || D->packet_len == 0x20746567 || D->packet_len == 0x20746573 || D->packet_len == 0x20646461 || D->packet_len == 0x6c706572 || D->packet_len == 0x72636e69 || D->packet_len == 0x72636564) && TCP_RPCS_FUNC(c)->memcache_fallback_type) { - vkprintf (1, "switching to memcache fallback for connection %d\n", c->fd); + tvkprintf(net_connections, 4, "switching to memcache fallback for connection %d\n", c->fd); memset (c->custom_data, 0, sizeof (c->custom_data)); c->type = static_cast(TCP_RPCS_FUNC(c)->memcache_fallback_type); c->extra = TCP_RPCS_FUNC(c)->memcache_fallback_extra; @@ -307,7 +307,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { rwm_process (&c->in, c->in.total_bytes, cb, c); rwm_free (&c->in); if (c->type->init_accepted (c) < 0) { - vkprintf (1, "memcache init_accepted() returns error for connection %d\n", c->fd); + tvkprintf(net_connections, 1, "memcache init_accepted() returns error for connection %d\n", c->fd); c->status = conn_error; c->error = -33; return 0; @@ -316,7 +316,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { return c->type->parse_execute (c); } if (D->in_packet_num <= -2 && (D->packet_len == 0x44414548 || D->packet_len == 0x54534f50 || D->packet_len == 0x20544547) && TCP_RPCS_FUNC(c)->http_fallback_type) { - vkprintf (1, "switching to http fallback for connection %d\n", c->fd); + tvkprintf(net_connections, 4, "switching to http fallback for connection %d\n", c->fd); memset (c->custom_data, 0, sizeof (c->custom_data)); c->type = static_cast(TCP_RPCS_FUNC(c)->http_fallback_type); c->extra = TCP_RPCS_FUNC(c)->http_fallback_extra; @@ -334,7 +334,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { rwm_process (&c->in, c->in.total_bytes, cb, c); rwm_free (&c->in); if (c->type->init_accepted (c) < 0) { - vkprintf (1, "http init_accepted() returns error for connection %d\n", c->fd); + tvkprintf(net_connections, 1, "http init_accepted() returns error for connection %d\n", c->fd); c->status = conn_error; c->error = -33; return 0; @@ -342,7 +342,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { nbit_set (&c->Q, &c->In); return c->type->parse_execute (c); } - vkprintf (1, "error while parsing packet: bad packet length %d\n", D->packet_len); + tvkprintf(net_connections, 1, "error while parsing packet: bad packet length %d\n", D->packet_len); c->status = conn_error; c->error = -1; return 0; @@ -354,7 +354,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { continue; } if (D->packet_len < 16) { - vkprintf (1, "error while parsing packet: bad packet length %d\n", D->packet_len); + tvkprintf(net_connections, 1, "error while parsing packet: bad packet length %d\n", D->packet_len); c->status = conn_error; c->error = -1; return 0; @@ -372,7 +372,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { assert (rwm_fetch_data_back (&msg, &crc32, 4) == 4); D->packet_crc32 = rwm_custom_crc32 (&msg, D->packet_len - 4, D->custom_crc_partial); if (crc32 != D->packet_crc32) { - vkprintf(1, "error while parsing packet: crc32 = %08x != %08x\n", D->packet_crc32, crc32); + tvkprintf(net_connections, 1, "error while parsing packet: crc32 = %08x != %08x\n", D->packet_crc32, crc32); c->status = conn_error; c->error = -1; rwm_free (&msg); @@ -396,7 +396,7 @@ int tcp_rpcs_parse_execute (struct connection *c) { } if (!(D->crypto_flags & 256) && D->packet_num != D->in_packet_num) { - vkprintf (1, "error while parsing packet: got packet num %d, expected %d\n", D->packet_num, D->in_packet_num); + tvkprintf(net_connections, 1, "error while parsing packet: got packet num %d, expected %d\n", D->packet_num, D->in_packet_num); c->status = conn_error; c->error = -1; rwm_free (&msg); @@ -506,7 +506,7 @@ int tcp_rpcs_init_accepted (struct connection *c) { if (TCP_RPCS_FUNC(c)->rpc_check_perm) { int res = TCP_RPCS_FUNC(c)->rpc_check_perm (c); - vkprintf(4, "rpcs_check_perm for connection %d: %s -> %s = %d\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint), sockaddr_storage_to_string(&c->local_endpoint), res); + tvkprintf(net_connections, 4, "rpcs_check_perm for connection %d: %s -> %s = %d\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint), sockaddr_storage_to_string(&c->local_endpoint), res); if (res < 0) { return res; } @@ -634,7 +634,7 @@ int tcp_rpcs_init_crypto (struct connection *c, struct tcp_rpc_nonce_packet *P) int tcp_rpcs_flush_packet (struct connection *c) { if (c->crypto) { int pad_bytes = c->type->crypto_needed_output_bytes (c); - vkprintf (2, "tcp_rpcs_flush_packet: padding with %d bytes\n", pad_bytes); + tvkprintf(net_connections, 4, "tcp_rpcs_flush_packet: padding with %d bytes\n", pad_bytes); if (pad_bytes > 0) { assert (!(pad_bytes & 3)); static int pad_str[3] = {4, 4, 4}; @@ -648,7 +648,7 @@ int tcp_rpcs_flush_packet (struct connection *c) { int tcp_rpcs_flush (struct connection *c) { if (c->crypto) { int pad_bytes = c->type->crypto_needed_output_bytes (c); - vkprintf (2, "rpcs_flush: padding with %d bytes\n", pad_bytes); + tvkprintf(net_connections, 4, "rpcs_flush: padding with %d bytes\n", pad_bytes); if (pad_bytes > 0) { assert (!(pad_bytes & 3)); static int pad_str[3] = {4, 4, 4}; diff --git a/runtime/curl.cpp b/runtime/curl.cpp index 5fb066cf27..82382e343d 100644 --- a/runtime/curl.cpp +++ b/runtime/curl.cpp @@ -18,6 +18,7 @@ #include "common/macos-ports.h" #include "common/smart_ptrs/singleton.h" #include "common/wrappers/to_array.h" +#include "common/dl-utils-lite.h" #include "net/net-events.h" #include "net/net-reactor.h" #include "server/curl-adaptor.h" @@ -661,6 +662,7 @@ bool f$curl_setopt_array(curl_easy easy_id, const array &options) noexcep } mixed f$curl_exec(curl_easy easy_id) noexcept { + constexpr double long_curl_query = 2 * 1e-1; // 0.2 sec auto *easy_context = get_context(easy_id); if (!easy_context) { return false; @@ -672,8 +674,13 @@ mixed f$curl_exec(curl_easy easy_id) noexcept { } easy_context->cleanup_for_next_request(); + double request_start_time = dl_time(); easy_context->error_num = dl::critical_section_call(curl_easy_perform, easy_context->easy_handle); - + double request_finish_time = dl_time(); + if (request_finish_time - request_start_time >= long_curl_query) { + kprintf("LONG curl query : %f. Curl id = %d, url = %.100s\n", request_finish_time - request_start_time, + easy_context->uniq_id, easy_context->get_info(CURLINFO_EFFECTIVE_URL).as_string().c_str()); + } if (easy_context->error_num != CURLE_OK && easy_context->error_num != CURLE_PARTIAL_FILE) { if (kphp_tracing::is_turned_on()) { kphp_tracing::on_curl_exec_fail(easy_context->uniq_id, -easy_context->error_num); // error_num > 0, pass negative diff --git a/runtime/resumable.cpp b/runtime/resumable.cpp index 1c815d9e2a..d4c46a4b55 100644 --- a/runtime/resumable.cpp +++ b/runtime/resumable.cpp @@ -329,7 +329,7 @@ Resumable *get_started_resumable(int64_t resumable_id) { static void add_resumable_to_queue(int64_t resumable_id, forked_resumable_info *resumable) { int64_t queue_id = resumable->queue_id; wait_queue *q = get_wait_queue(queue_id); - tvkprintf(resumable, 1, "Push resumable %" PRIi64 " to queue %" PRIi64 "(%" PRIi64 ", %" PRIi64 ", %" PRIi32 ") at %.6lf\n", + tvkprintf(resumable, 2, "Push resumable %" PRIi64 " to queue %" PRIi64 "(%" PRIi64 ", %" PRIi64 ", %" PRIi32 ") at %.6lf\n", resumable_id, resumable->queue_id, q->resumable_id, q->first_finished_function, q->left_functions, (update_precise_now(), get_precise_now())); resumable->queue_id = q->first_finished_function; @@ -445,7 +445,7 @@ static void resumable_add_finished(int64_t resumable_id) { finished_resumables_size *= 2; } - tvkprintf(resumable, 1, "Resumable %" PRIi64 " put to position %" PRIu32 " of finished list\n", resumable_id, finished_resumables_count); + tvkprintf(resumable, 3, "Resumable %" PRIi64 " put to position %" PRIu32 " of finished list\n", resumable_id, finished_resumables_count); finished_resumables[finished_resumables_count++] = resumable_id; } @@ -532,7 +532,7 @@ static void continue_resumable(resumable_info *res, int64_t resumable_id) noexce } void resumable_run_ready(int64_t resumable_id) { - tvkprintf(resumable, 1, "Run ready %" PRIi64 "\n", resumable_id); + tvkprintf(resumable, 2, "Run ready resumable %" PRIi64 "\n", resumable_id); if (resumable_id > 1000000000) { forked_resumable_info *res = get_forked_resumable_info(resumable_id); php_assert(res->queue_id >= 0); @@ -555,7 +555,7 @@ void run_scheduler(double timeout) __attribute__((section("run_scheduler_section static int64_t scheduled_resumable_id = 0; void run_scheduler(double dead_line_time) { - tvkprintf(resumable, 1, "Run scheduler %" PRIu32 "\n", finished_resumables_count); + tvkprintf(resumable, 2, "Run scheduler with finished resumables count %" PRIu32 "\n", finished_resumables_count); int32_t left_resumables = 1000; bool force_run_next = false; while (resumable_has_finished() && --left_resumables >= 0) { @@ -575,7 +575,7 @@ void run_scheduler(double dead_line_time) { started_resumable_info *res = get_started_resumable_info(resumable_id); php_assert(res->continuation == nullptr); - tvkprintf(resumable, 1, "Process %" PRIi64 "(%s) with parent %" PRIi64 " in scheduler\n", + tvkprintf(resumable, 3, "Process %" PRIi64 "(%s) with parent %" PRIi64 " in scheduler\n", resumable_id, is_yielded ? "yielded" : "not yielded", res->parent_id); int64_t parent_id = res->parent_id; if (parent_id == 0) { @@ -936,7 +936,7 @@ static int64_t wait_queue_push(int64_t queue_id, int64_t resumable_id) { if (resumable->queue_id == 0) { resumable->queue_id = queue_id; - tvkprintf(resumable, 1, "Link resumable %" PRIi64 " with queue %" PRIi64 " at %.6lf\n", + tvkprintf(resumable, 3, "Link resumable %" PRIi64 " with queue %" PRIi64 " at %.6lf\n", resumable_id, queue_id, (update_precise_now(), get_precise_now())); q->left_functions++; } else { @@ -1152,7 +1152,7 @@ static void process_wait_queue_timeout(kphp_event_timer *timer) { Optional f$wait_queue_next(int64_t queue_id, double timeout) { resumable_finished = true; - tvkprintf(resumable, 1, "Waiting for queue %" PRIi64 "\n", queue_id); + tvkprintf(resumable, 3, "Waiting for queue %" PRIi64 "\n", queue_id); if (!is_wait_queue_id(queue_id)) { if (queue_id != -1) { php_warning("Wrong queue_id %" PRIi64 " in function wait_queue_next", queue_id); diff --git a/server/php-engine.cpp b/server/php-engine.cpp index 4338ff9ab8..a1a0c62992 100644 --- a/server/php-engine.cpp +++ b/server/php-engine.cpp @@ -542,7 +542,7 @@ int hts_func_execute(connection *c, int op) { return -501; } - vkprintf (1, "in hts_execute: connection #%d, op=%d, header_size=%d, data_size=%d, http_version=%d\n", + vkprintf (1, "start http server execute: connection #%d, op=%d, header_size=%d, data_size=%d, http_version=%d\n", c->fd, op, D->header_size, D->data_size, D->http_ver); if (!vk::any_of_equal(D->query_type, htqt_get, htqt_post, htqt_head)) { @@ -553,7 +553,7 @@ int hts_func_execute(connection *c, int op) { if (D->data_size > 0) { int have_bytes = get_total_ready_bytes(&c->In); if (have_bytes < D->data_size + D->header_size && D->data_size < MAX_POST_SIZE) { - vkprintf (1, "-- need %d more bytes, waiting\n", D->data_size + D->header_size - have_bytes); + vkprintf (2, "-- need %d more bytes, waiting\n", D->data_size + D->header_size - have_bytes); return D->data_size + D->header_size - have_bytes; } } @@ -565,18 +565,12 @@ int hts_func_execute(connection *c, int op) { qHeadersLen = D->header_size - D->first_line_size; assert (D->first_line_size > 0 && D->first_line_size <= D->header_size); - vkprintf (1, "===============\n%.*s\n==============\n", D->header_size, ReqHdr); - vkprintf (1, "%d,%d,%d,%d\n", D->host_offset, D->host_size, D->uri_offset, D->uri_size); - - vkprintf (1, "hostname: '%.*s'\n", D->host_size, ReqHdr + D->host_offset); - vkprintf (1, "URI: '%.*s'\n", D->uri_size, ReqHdr + D->uri_offset); - // D->query_flags &= ~QF_KEEPALIVE; if (0 < D->data_size && D->data_size < MAX_POST_SIZE) { assert (read_in(&c->In, Post, D->data_size) == D->data_size); Post[D->data_size] = 0; - vkprintf (1, "have %d POST bytes: `%.80s`\n", D->data_size, Post); + vkprintf (2, "have %d POST bytes: `%.80s`\n", D->data_size, Post); qPost = Post; qPostLen = D->data_size; } else { @@ -606,7 +600,7 @@ int hts_func_execute(connection *c, int op) { return -418; } - vkprintf (1, "OK, lets do something\n"); + vkprintf (1, "start response processing on fd %d\n", c->fd); const char *query_type_str = nullptr; switch (D->query_type) { @@ -896,7 +890,7 @@ static void send_rpc_error(connection *c, long long req_id, int error_code, cons } int rpcx_execute(connection *c, int op, raw_message *raw) { - vkprintf(1, "rpcx_execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes); + vkprintf(2, "rpc execute: fd=%d, op=%d, len=%d\n", c->fd, op, raw->total_bytes); int len = raw->total_bytes; diff --git a/server/php-master.cpp b/server/php-master.cpp index 03f198b74c..2518371584 100644 --- a/server/php-master.cpp +++ b/server/php-master.cpp @@ -87,6 +87,8 @@ extern const char *engine_tag; #define PHP_MASTER_VERSION "0.1" +DEFINE_VERBOSITY(master_process) + DEFINE_VERBOSITY(graceful_restart) static int cpu_cnt; @@ -367,7 +369,8 @@ int signal_fd; int changed = 0; int failed = 0; int socket_fd = -1; -int to_kill = 0, to_run = 0, to_exit = 0; +int to_exit = 0; +int general_workers_to_kill = 0, general_workers_to_run = 0; int job_workers_to_kill = 0, job_workers_to_run = 0; long long generation; int receive_fd_attempts_cnt = 0; @@ -407,7 +410,7 @@ void delete_worker(worker_info_t *w) { } void terminate_worker(worker_info_t *w) { - vkprintf(1, "kill_worker: send SIGTERM to [pid = %d]\n", (int)w->pid); + kprintf("master terminate worker: send SIGTERM to [pid = %d]\n", (int)w->pid); kill(w->pid, SIGTERM); w->is_dying = 1; w->kill_time = my_now + 35; @@ -446,10 +449,8 @@ void kill_hanging_workers() { continue; } if (!worker->is_dying && worker->last_activity_time + get_max_hanging_time_sec() <= my_now) { - vkprintf(1, "No stats received from worker [pid = %d]. Terminate it\n", static_cast(worker->pid)); - if (workers[i]->type == WorkerType::job_worker) { - tvkprintf(job_workers, 1, "No stats received from job worker [pid = %d]. Terminate it\n", static_cast(worker->pid)); - } + tvkprintf(master_process, 1, "No stats received from %s [pid = %d]. Terminate it\n", + worker->type == WorkerType::general_worker ? "general worker" : "job worker", static_cast(worker->pid)); workers_hung++; terminate_worker(worker); last_terminated = my_now; @@ -460,10 +461,8 @@ void kill_hanging_workers() { for (int i = 0; i < vk::singleton::get().get_all_alive(); i++) { if (workers[i]->is_dying && workers[i]->kill_time <= my_now && workers[i]->kill_flag == 0) { - vkprintf(1, "kill_hanging_worker: send SIGKILL to [pid = %d]\n", (int)workers[i]->pid); - if (workers[i]->type == WorkerType::job_worker) { - tvkprintf(job_workers, 1, "kill hanging job worker: send SIGKILL to [pid = %d]\n", (int)workers[i]->pid); - } + kprintf("master kill hanging %s : send SIGKILL to [pid = %d]\n", + workers[i]->type == WorkerType::general_worker ? "general worker" : "job worker", static_cast(workers[i]->pid)); kill(workers[i]->pid, SIGKILL); workers_killed++; @@ -499,7 +498,7 @@ WorkerType start_master() { vk::singleton::get().get_master_name(), vk::singleton::get().get_cluster_name(), vk::singleton::get().get_shmem_name(), vk::singleton::get().get_socket_name()); - vkprintf(1, "start master: begin\n"); + tvkprintf(master_process, 1, "start master initilaizing\n"); sigemptyset(&empty_mask); @@ -530,7 +529,7 @@ WorkerType start_master() { int attempts_to_start = 2; int is_inited = 0; while (attempts_to_start-- > 0) { - vkprintf(1, "attempt to init master. [left attempts = %d]\n", attempts_to_start); + tvkprintf(master_process, 1, "attempt to init master. [left attempts = %d]\n", attempts_to_start); shared_data_lock(shared_data); shared_data_update(shared_data); @@ -544,7 +543,7 @@ WorkerType start_master() { shared_data_unlock(shared_data); if (!is_inited) { - vkprintf(1, "other restart is in progress. sleep 5 seconds. [left attempts = %d]\n", attempts_to_start); + tvkprintf(master_process, 1, "other restart is in progress. sleep 5 seconds. [left attempts = %d]\n", attempts_to_start); sleep(5); } else { break; @@ -552,11 +551,11 @@ WorkerType start_master() { } if (!is_inited) { - vkprintf(0, "Failed to init master. It seems that two other masters are running\n"); + kprintf("Failed to init master. It seems that two other masters are running\n"); _exit(1); } - vkprintf(1, "start master: end\n"); + kprintf("finish master initialization\n"); return run_master(); } @@ -573,12 +572,12 @@ int run_worker(WorkerType worker_type) { log_server_critical("fork error on launching %s worker: %s", (worker_type == WorkerType::general_worker ? "general" : "job"), strerror(errno)); assert(false); } - assert (new_pid != -1 && "failed to fork"); + dl_assert(new_pid != -1, "failed to fork"); if (new_pid == 0) { prctl(PR_SET_PDEATHSIG, SIGKILL); // TODO: or SIGTERM if (getppid() != me->pid) { - vkprintf(0, "parent is dead just after start\n"); + kprintf("parent process is dead just after start\n"); exit(123); } @@ -651,8 +650,7 @@ int run_worker(WorkerType worker_type) { } dl_restore_signal_mask(); - - vkprintf(1, "new worker launched [pid = %d]\n", (int)new_pid); + kprintf("master create %s [pid = %d]\n", worker_type == WorkerType::general_worker ? "general worker" : "job worker", (int)new_pid); worker_info_t *worker = workers[vk::singleton::get().get_all_alive() - 1] = new_worker(); worker->pid = new_pid; @@ -671,7 +669,7 @@ int run_worker(WorkerType worker_type) { } void remove_worker(pid_t pid) { - vkprintf(2, "remove workers [pid = %d]\n", static_cast(pid)); + tvkprintf(master_process, 1, "master remove worker [pid = %d]\n", static_cast(pid)); const auto &workers_control = vk::singleton::get(); for (int i = 0; i < workers_control.get_all_alive(); i++) { if (workers[i]->pid == pid) { @@ -684,7 +682,7 @@ void remove_worker(pid_t pid) { delete_worker(workers[i]); workers[i] = workers[workers_control.get_all_alive()]; - vkprintf(1, "worker removed: [general running = %d] [general dying = %d]\n", + tvkprintf(master_process, 1, "worker removed: [general running = %d] [general dying = %d]\n", int{workers_control.get_running_count(WorkerType::general_worker)}, int{workers_control.get_dying_count(WorkerType::general_worker)}); return; @@ -1143,7 +1141,7 @@ int php_master_http_execute(struct connection *c, int op) { struct hts_data *D = HTS_DATA(c); char ReqHdr[MAX_HTTP_HEADER_SIZE]; - vkprintf(1, "in php_master_http_execute: connection #%d, op=%d, header_size=%d, data_size=%d, http_version=%d\n", + tvkprintf(master_process, 1, "master http execute connection #%d, op=%d, header_size=%d, data_size=%d, http_version=%d\n", c->fd, op, D->header_size, D->data_size, D->http_ver); if (D->query_type != htqt_get) { @@ -1156,12 +1154,6 @@ int php_master_http_execute(struct connection *c, int op) { assert (D->first_line_size > 0 && D->first_line_size <= D->header_size); - vkprintf(1, "===============\n%.*s\n==============\n", D->header_size, ReqHdr); - vkprintf(1, "%d,%d,%d,%d\n", D->host_offset, D->host_size, D->uri_offset, D->uri_size); - - vkprintf(1, "hostname: '%.*s'\n", D->host_size, ReqHdr + D->host_offset); - vkprintf(1, "URI: '%.*s'\n", D->uri_size, ReqHdr + D->uri_offset); - const char *allowed_query = "/server-status"; if (D->uri_size == strlen(allowed_query) && strncmp(ReqHdr + D->uri_offset, allowed_query, static_cast(D->uri_size)) == 0) { std::string stat_html = get_master_stats_html(); @@ -1176,11 +1168,10 @@ int php_master_http_execute(struct connection *c, int op) { /*** Main loop functions ***/ - void run_master_off_in_graceful_shutdown() { - vkprintf(2, "state: master_state::off_in_graceful_shutdown\n"); + kprintf("master off in graceful shutdown\n"); assert(state == master_state::off_in_graceful_shutdown); - to_kill = vk::singleton::get().get_running_count(WorkerType::general_worker); + general_workers_to_kill = vk::singleton::get().get_running_count(WorkerType::general_worker); if (all_http_workers_killed()) { if (all_job_workers_killed()) { to_exit = 1; @@ -1192,9 +1183,9 @@ void run_master_off_in_graceful_shutdown() { } void run_master_off_in_graceful_restart() { - vkprintf(2, "state: master_state::off_in_graceful_restart\n"); + kprintf("master off in graceful restart\n"); assert (other->is_alive); - vkprintf(2, "other->to_kill_generation > me->generation --- %lld > %lld\n", other->to_kill_generation, me->generation); + tvkprintf(graceful_restart, 2, "other->to_kill_generation > me->generation --- %lld > %lld\n", other->to_kill_generation, me->generation); if (other->is_alive && other->ask_http_fds_generation > me->generation) { send_fds_via_socket(vk::singleton::get().http_socket_fds()); @@ -1205,7 +1196,7 @@ void run_master_off_in_graceful_restart() { if (other->to_kill_generation > me->generation) { // old master kills as many workers as new master told - to_kill = other->to_kill; + general_workers_to_kill = other->to_kill; } if (all_http_workers_killed()) { @@ -1231,7 +1222,7 @@ bool init_http_sockets_if_needed() { bool can_ask_http_fds = other->is_alive && other->own_http_fds && other->http_ports_count == me->http_ports_count && std::equal(me->http_ports, me->http_ports + me->http_ports_count, other->http_ports); if (!can_ask_http_fds) { - vkprintf(1, "Create http sockets\n"); + tvkprintf(master_process, 1, "master create http sockets\n"); bool ok = http_ctx.master_create_http_sockets(); assert(ok && "failed to create HTTP sockets"); @@ -1272,7 +1263,7 @@ bool init_http_sockets_if_needed() { } void run_master_on() { - vkprintf(2, "state: master_state::on\n"); + tvkprintf(master_process, 3, "master state on\n"); static double prev_attempt = 0; if (!master_sfd_inited && !other->is_alive && prev_attempt + 1 < my_now) { @@ -1284,7 +1275,7 @@ void run_master_on() { failed_cnt++; if (failed_cnt > 2000) { - vkprintf(-1, "cannot open master server socket at port %d: %m\n", master_port); + kprintf("cannot open master server socket at port %d: %m\n", master_port); exit(1); } } else { @@ -1305,7 +1296,7 @@ void run_master_on() { if (done) { const auto &control = vk::singleton::get(); const int total_workers = control.get_alive_count(WorkerType::general_worker) + (other->is_alive ? other->running_http_workers_n + other->dying_http_workers_n : 0); - to_run = std::max(0, int{control.get_count(WorkerType::general_worker)} - total_workers); + general_workers_to_run = std::max(0, int{control.get_count(WorkerType::general_worker)} - total_workers); job_workers_to_run = control.get_count(WorkerType::job_worker) - control.get_alive_count(WorkerType::job_worker); if (other->is_alive) { @@ -1318,7 +1309,7 @@ void run_master_on() { bool warmup_timeout_expired = warm_up_ctx.warmup_timeout_expired(); if (set_to_kill > 0 && (need_more_workers_for_warmup || is_instance_cache_hot_enough || warmup_timeout_expired)) { // new master tells to old master how many workers it must kill - vkprintf(1, "[set_to_kill = %d] [need_more_workers_for_warmup = %d] [is_instance_cache_hot_enough = %d] [new_instance_cache_size / old_instance_cache_size = %u / %u] [warmup_timeout_expired = %d]\n", + tvkprintf(graceful_restart, 1, "[set_to_kill = %d] [need_more_workers_for_warmup = %d] [is_instance_cache_hot_enough = %d] [new_instance_cache_size / old_instance_cache_size = %u / %u] [warmup_timeout_expired = %d]\n", set_to_kill, need_more_workers_for_warmup, is_instance_cache_hot_enough, me->instance_cache_elements_cached, other->instance_cache_elements_cached, warmup_timeout_expired); @@ -1334,7 +1325,7 @@ void run_master_on() { } int signal_epoll_handler(int fd, void *data __attribute__((unused)), event_t *ev __attribute__((unused))) { - vkprintf(2, "signal_epoll_handler\n"); + tvkprintf(master_process, 2, "master epoll handler\n"); signalfd_siginfo fdsi{}; int s = (int)read(signal_fd, &fdsi, sizeof(signalfd_siginfo)); if (s == -1) { @@ -1342,7 +1333,7 @@ int signal_epoll_handler(int fd, void *data __attribute__((unused)), event_t *ev return 0; } dl_assert (s == sizeof(signalfd_siginfo), dl_pstr("got %d bytes of %d expected", s, (int)sizeof(signalfd_siginfo))); - vkprintf(2, "signal %u received\n", fdsi.ssi_signo); + tvkprintf(master_process, 2, "signal %u received\n", fdsi.ssi_signo); if (fdsi.ssi_signo == SIGTERM && !in_sigterm) { const char *message = "master got SIGTERM, starting graceful shutdown.\n"; kwrite(2, message, strlen(message)); @@ -1413,6 +1404,12 @@ static void cron() { instance_cache_purge_expired_elements(); check_and_instance_cache_try_swap_memory(); confdata_binlog_update_cron(); + tvkprintf(master_process, 3, "master process cron work [utime = %llu, stime = %llu, alive_workers_count = %d]. " + "General workers details [running general workers = %d, waiting general workers = %d, ready for accept general workers = %d]. " + "Job workers details [running job workers = %d, waiting job workers = %d, ready for accept job workers = %d].\n", + utime, stime, alive_workers_count, + general_workers_stat.running_workers, general_workers_stat.waiting_workers, general_workers_stat.ready_for_accept_workers, + job_workers_stat.running_workers, job_workers_stat.waiting_workers, job_workers_stat.ready_for_accept_workers); } auto get_steady_tp_ms_now() noexcept { @@ -1472,15 +1469,14 @@ WorkerType run_master() { auto prev_cron_start_tp = get_steady_tp_ms_now(); WarmUpContext::get().reset(); while (true) { - vkprintf(2, "run_master iteration: begin\n"); + tvkprintf(master_process, 3, "start master interation\n"); my_now = dl_time(); changed = 0; failed = 0; - to_kill = 0; - to_run = 0; to_exit = 0; + general_workers_to_kill = general_workers_to_run = 0; job_workers_to_kill = job_workers_to_run = 0; update_workers(); @@ -1520,8 +1516,9 @@ WorkerType run_master() { me->generation = generation; - if (to_kill != 0 || to_run != 0 || job_workers_to_kill != 0 || job_workers_to_run != 0) { - vkprintf(1, "[to_kill = %d] [to_run = %d] [job_workers_to_kill = %d] [job_workers_to_run = %d]\n", to_kill, to_run, job_workers_to_kill, job_workers_to_run); + if (general_workers_to_kill != 0 || general_workers_to_run != 0 || job_workers_to_kill != 0 || job_workers_to_run != 0) { + tvkprintf(master_process, 2, "[general_workers_to_kill = %d] [general_workers_to_run = %d] [job_workers_to_kill = %d] [job_workers_to_run = %d]\n", + general_workers_to_kill, general_workers_to_run, job_workers_to_kill, job_workers_to_run); } for (int i = 0; i < job_workers_to_kill; ++i) { @@ -1534,14 +1531,15 @@ WorkerType run_master() { } } - while (to_kill-- > 0) { + for (int i = 0; i < general_workers_to_kill; ++i) { kill_worker(WorkerType::general_worker); } - while (to_run-- > 0 && !failed) { + for (int i = 0; i < general_workers_to_run && !failed; ++i) { if (run_worker(WorkerType::general_worker)) { return WorkerType::general_worker; } } + kill_hanging_workers(); me->running_http_workers_n = vk::singleton::get().get_running_count(WorkerType::general_worker); @@ -1550,7 +1548,7 @@ WorkerType run_master() { if (state != master_state::off_in_graceful_shutdown) { if (changed && other->is_alive) { - vkprintf(1, "wakeup other master [pid = %d]\n", (int)other->pid); + tvkprintf(graceful_restart, 1, "wakeup other master [pid = %d]\n", (int)other->pid); kill(other->pid, SIGPOLL); } } @@ -1558,7 +1556,7 @@ WorkerType run_master() { shared_data_unlock(shared_data); if (to_exit) { - vkprintf(1, "all workers killed. Exit\n"); + kprintf("master kill all workers. Exit\n"); _exit(0); } @@ -1570,7 +1568,7 @@ WorkerType run_master() { workers_send_signal(SIGUSR1); } - vkprintf(2, "run_master iteration: end\n"); + tvkprintf(master_process, 3, "finish master interation\n"); using namespace std::chrono_literals; auto wait_time = 1s - (get_steady_tp_ms_now() - prev_cron_start_tp); diff --git a/server/php-master.h b/server/php-master.h index 45684b9e44..d2988ec4b6 100644 --- a/server/php-master.h +++ b/server/php-master.h @@ -7,4 +7,6 @@ #include "net/net-connections.h" #include "server/workers-control.h" +DECLARE_VERBOSITY(master_process); + WorkerType start_master(); diff --git a/server/php-mc-connections.cpp b/server/php-mc-connections.cpp index 7501eab18f..7ddaeb9ac3 100644 --- a/server/php-mc-connections.cpp +++ b/server/php-mc-connections.cpp @@ -101,7 +101,7 @@ int memcache_client_execute(connection *c, int op) { int len, x = 0; char *ptr; - vkprintf (1, "proxy_mc_client: op=%d, key_len=%d, arg#=%d, response_len=%d\n", op, D->key_len, D->arg_num, D->response_len); + tvkprintf(php_connections, 3, "proxy_mc_client: op=%d, key_len=%d, arg#=%d, response_len=%d\n", op, D->key_len, D->arg_num, D->response_len); if (op == mcrt_empty) { return SKIP_ALL_BYTES; @@ -110,10 +110,8 @@ int memcache_client_execute(connection *c, int op) { conn_query *cur_query = nullptr; if (c->first_query == (conn_query *)c) { if (op != mcrt_VERSION) { - vkprintf (-1, "response received for empty query list? op=%d\n", op); - if (verbosity > -2) { - dump_connection_buffers(c); - } + kprintf("response received for empty query list? op=%d\n", op); + dump_connection_buffers(c); D->response_flags |= 16; return SKIP_ALL_BYTES; } @@ -142,7 +140,7 @@ int memcache_client_execute(connection *c, int op) { assert (len > 0); ptr = reinterpret_cast(nbit_get_ptr(&c->Q)); } else { - vkprintf(-1, "error at VALUE: op=%d, key_len=%d, arg_num=%d, value_len=%lld\n", op, D->key_len, D->arg_num, D->args[1]); + kprintf("error at VALUE: op=%d, key_len=%d, arg_num=%d, value_len=%lld\n", op, D->key_len, D->arg_num, D->args[1]); D->response_flags |= 16; return SKIP_ALL_BYTES; @@ -151,7 +149,7 @@ int memcache_client_execute(connection *c, int op) { nbit_advance(&c->Q, 1); } if (ptr[0] != '\r' || (len > 1 ? ptr[1] : *((char *)nbit_get_ptr(&c->Q))) != '\n') { - vkprintf(-1, "missing cr/lf at VALUE: op=%d, key_len=%d, arg_num=%d, value_len=%lld\n", op, D->key_len, D->arg_num, D->args[1]); + kprintf("missing cr/lf at VALUE: op=%d, key_len=%d, arg_num=%d, value_len=%lld\n", op, D->key_len, D->arg_num, D->args[1]); assert (0); @@ -160,7 +158,7 @@ int memcache_client_execute(connection *c, int op) { } len = 2; - //vkprintf (1, "mcc_value: op=%d, key_len=%d, flags=%lld, time=%lld, value_len=%lld\n", op, D->key_len, D->args[0], D->args[1], D->args[2]); + tvkprintf (php_connections, 3, "mcc_value: op=%d, key_len=%d, flags=%lld, time=%lld, value_len=%lld\n", op, D->key_len, D->args[0], D->args[1], D->args[2]); query_len = (int)(D->response_len + D->args[1] + len); reader = create_data_reader(c, query_len); @@ -177,7 +175,7 @@ int memcache_client_execute(connection *c, int op) { } case mcrt_VERSION: c->unreliability >>= 1; - vkprintf (3, "mcc_got_version: op=%d, key_len=%d, unreliability=%d\n", op, D->key_len, c->unreliability); + tvkprintf(php_connections, 3, "mcc_got_version: op=%d, key_len=%d, unreliability=%d\n", op, D->key_len, c->unreliability); if (cur_query != nullptr) { query_len = D->response_len; @@ -194,7 +192,7 @@ int memcache_client_execute(connection *c, int op) { return SKIP_ALL_BYTES; case mcrt_CLIENT_ERROR: - vkprintf (-1, "CLIENT_ERROR received from connection %d (%s)\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint)); + kprintf("CLIENT_ERROR received from connection %d (%s)\n", c->fd, sockaddr_storage_to_string(&c->remote_endpoint)); //client_errors_received++; /* fallthrough */ case mcrt_ERROR: diff --git a/server/php-queries-types.cpp b/server/php-queries-types.cpp index f5bf05a6ce..222b7cb0d2 100644 --- a/server/php-queries-types.cpp +++ b/server/php-queries-types.cpp @@ -38,8 +38,7 @@ void php_query_rpc_answer::run(PhpWorker *worker) noexcept { connection *c = worker->conn; int *q = (int *)data; int qsize = data_len; - - vkprintf (2, "going to send %d bytes as an answer [req_id = %016llx]\n", qsize, worker->req_id); + tvkprintf(php_connections, 3, "going to send %d bytes as an answer [req_id = %016llx]\n", qsize, worker->req_id); send_rpc_query(c, q[2] == 0 ? TL_RPC_REQ_RESULT : TL_RPC_REQ_ERROR, worker->req_id, q, qsize); } php_script->query_readed(); diff --git a/server/php-queries.cpp b/server/php-queries.cpp index acc21ed4b5..e6ff1ae581 100644 --- a/server/php-queries.cpp +++ b/server/php-queries.cpp @@ -32,6 +32,8 @@ #define MAX_NET_ERROR_LEN 128 +DEFINE_VERBOSITY(php_connections) + static char last_net_error[MAX_NET_ERROR_LEN + 1]; static void save_last_net_error(const char *s) { diff --git a/server/php-queries.h b/server/php-queries.h index 7263a59f7a..868c37ca48 100644 --- a/server/php-queries.h +++ b/server/php-queries.h @@ -10,9 +10,12 @@ #include #include "common/sanitizer.h" +#include "common/kprintf.h" #include "server/slot-ids-factory.h" #include "server/php-queries-types.h" +DECLARE_VERBOSITY(php_connections); + extern SlotIdsFactory parallel_job_ids_factory; extern SlotIdsFactory external_db_requests_factory; diff --git a/server/php-runner.cpp b/server/php-runner.cpp index 04e05ad6dd..4c127a04d3 100644 --- a/server/php-runner.cpp +++ b/server/php-runner.cpp @@ -35,6 +35,8 @@ #include "server/server-stats.h" #include "server/signal-handlers.h" +DEFINE_VERBOSITY(php_runner); + query_stats_t query_stats; long long query_stats_id = 1; @@ -312,16 +314,12 @@ void PhpScript::finish() noexcept { kprintf("Critical error during script execution: %s\n", error_message); kphp_tracing::on_php_script_finish_terminated(); } - if (save_state == run_state_t::error || script_mem_stats.real_memory_used >= 100000000) { - if (data != nullptr) { - http_query_data *http_data = data->http_data; - if (http_data != nullptr) { - assert (http_data->headers); - kprintf("HEADERS: len = %d\n%.*s\nEND HEADERS\n", http_data->headers_len, min(http_data->headers_len, 1 << 16), http_data->headers); - kprintf("POST: len = %d\n%.*s\nEND POST\n", http_data->post_len, min(http_data->post_len, 1 << 16), http_data->post == nullptr ? "" : http_data->post); - } - } + if (error_type == script_error_t::memory_limit || script_mem_stats.real_memory_used > max_memory / 2) { + kprintf("Detailed memory stats: total allocations = %zd, total memory allocated = %zd, huge memory pieces = %zd, small memory pieces = %zd, defragmentation calls = %zd," + "real memory used = %zd, max real memory used = %zd, memory used = %zd, max memory used = %zd, memory_limit = %zd\n", + script_mem_stats.total_allocations, script_mem_stats.total_memory_allocated, script_mem_stats.huge_memory_pieces, script_mem_stats.small_memory_pieces, script_mem_stats.defragmentation_calls, + script_mem_stats.real_memory_used, script_mem_stats.max_real_memory_used, script_mem_stats.memory_used, script_mem_stats.max_memory_used, script_mem_stats.memory_limit); } const size_t buf_size = 5000; @@ -339,7 +337,7 @@ void PhpScript::finish() noexcept { } } } - kprintf("[worked = %.3lf, net = %.3lf, script = %.3lf, queries_cnt = %5d, long_queries_cnt = %5d, static_memory = %9d, peak_memory = %9d, total_memory = %9d] %s\n", + kprintf("[worked = %.3lf, net = %.3lf, script = %.3lf, queries_cnt = %5d, long_queries_cnt = %5d, heap_memory_used = %9d, peak_script_memory = %9d, total_script_memory = %9d] %s\n", script_time + net_time, net_time, script_time, queries_cnt, long_queries_cnt, (int)dl::get_heap_memory_used(), (int)script_mem_stats.max_real_memory_used, diff --git a/server/php-runner.h b/server/php-runner.h index e9f5566c0a..eef7ec6d3f 100644 --- a/server/php-runner.h +++ b/server/php-runner.h @@ -9,6 +9,7 @@ #include "common/dl-utils-lite.h" #include "common/mixin/not_copyable.h" #include "common/sanitizer.h" +#include "common/kprintf.h" #include "server/php-engine-vars.h" #include "server/php-init-scripts.h" @@ -16,6 +17,8 @@ #include "server/php-query-data.h" #include "server/ucontext-portable.h" +DECLARE_VERBOSITY(php_runner); + enum class run_state_t { finished, uncleared, diff --git a/server/php-sql-connections.cpp b/server/php-sql-connections.cpp index e3ef59d6a9..5ae426f3f7 100644 --- a/server/php-sql-connections.cpp +++ b/server/php-sql-connections.cpp @@ -223,7 +223,7 @@ int sqlp_becomes_ready(connection *c) { int sqlp_check_ready(connection *c) { if (c->status == conn_ready && c->In.total_bytes > 0) { - vkprintf (-1, "have %d bytes in outbound sql connection %d in state ready, closing connection\n", c->In.total_bytes, c->fd); + kprintf("have %d bytes in outbound sql connection %d in state ready, closing connection\n", c->In.total_bytes, c->fd); c->status = conn_error; c->error = -3; fail_connection(c, -3); @@ -237,7 +237,7 @@ int sqlp_check_ready(connection *c) { if (c->status == conn_wait_answer || c->status == conn_reading_answer) { if (!(c->flags & C_FAILED) && c->last_query_sent_time < precise_now - RESPONSE_FAIL_TIMEOUT - c->last_query_time && c->last_response_time < precise_now - RESPONSE_FAIL_TIMEOUT - c ->last_query_time && !(SQLC_DATA(c)->extra_flags & 1)) { - vkprintf (1, "failing outbound connection %d, status=%d, response_status=%d, last_response=%.6f, last_query=%.6f, now=%.6f\n", c->fd, c->status, SQLC_DATA(c) + tvkprintf(php_connections, 1, "failing outbound connection %d, status=%d, response_status=%d, last_response=%.6f, last_query=%.6f, now=%.6f\n", c->fd, c->status, SQLC_DATA(c) ->response_state, c->last_response_time, c->last_query_sent_time, precise_now); c->error = -5; fail_connection(c, -5); @@ -259,7 +259,7 @@ int sqlp_authorized(connection *c) { SQLC_DATA(c)->auth_state = sql_auth_ok; c->status = conn_ready; SQLC_DATA(c)->packet_state = 0; - vkprintf (1, "outcoming initdb successful\n"); + tvkprintf(php_connections, 1, "outcoming initdb successful\n"); SQLC_FUNC(c)->sql_becomes_ready(c); return 0; } @@ -292,11 +292,10 @@ int proxy_client_execute(connection *c, int op) { if (b_len >= 5) { field_cnt = buffer[4] & 0xff; } - - vkprintf (1, "proxy_db_client: op=%d, packet_len=%d, response_state=%d, field_num=%d\n", op, D->packet_len, D->response_state, field_cnt); + tvkprintf(php_connections, 3, "proxy client execute: op=%d, packet_len=%d, response_state=%d, field_num=%d\n", op, D->packet_len, D->response_state, field_cnt); if (c->first_query == (conn_query *)c) { - vkprintf (-1, "response received for empty query list? op=%d\n", op); + kprintf("response received for empty query list? op=%d\n", op); return SKIP_ALL_BYTES; } @@ -325,7 +324,7 @@ int proxy_client_execute(connection *c, int op) { dl_unreachable ("looks like unused code"); SQLC_DATA(c)->extra_flags |= 2; if (c->first_query->requester->generation != c->first_query->req_generation) { - vkprintf (1, "outbound connection %d: nowhere to forward replication stream, closing\n", c->fd); + tvkprintf (php_connections, 3, "outbound connection %d: nowhere to forward replication stream, closing\n", c->fd); c->status = conn_error; } } else if (field_cnt == 0 || field_cnt == 0xff) { @@ -369,7 +368,7 @@ int proxy_client_execute(connection *c, int op) { } break; case resp_done: - vkprintf (-1, "unexpected packet from server!\n"); + kprintf("unexpected packet from server!\n"); assert (0); } diff --git a/server/php-worker.cpp b/server/php-worker.cpp index 7d241fc8a3..47c882d1f5 100644 --- a/server/php-worker.cpp +++ b/server/php-worker.cpp @@ -32,32 +32,42 @@ double PhpWorker::enter_lifecycle() noexcept { } on_wakeup(); + tvkprintf(php_runner, 3, "PHP-worker enter lifecycle [php-script state = %d, conn status = %d] lifecycle [req_id = %016llx]\n", + php_script ? static_cast(php_script->state) : -1, conn->status, req_id); paused = false; do { switch (state) { case phpq_try_start: + tvkprintf(php_runner, 1, "PHP-worker try start [req_id = %016llx]\n", req_id); state_try_start(); break; case phpq_init_script: + tvkprintf(php_runner, 1, "PHP-worker init PHP-script [req_id = %016llx]\n", req_id); state_init_script(); break; case phpq_run: + tvkprintf(php_runner, 3, "execute PHP-worker [req_id = %016llx]\n", req_id); state_run(); break; case phpq_free_script: + get_utime_monotonic(); + tvkprintf(php_runner, 1, "PHP-worker free PHP-script [query worked = %.5lf] [query waited for start = %.5lf] [req_id = %016llx]\n", precise_now - start_time, start_time - init_time, req_id); state_free_script(); break; case phpq_finish: + tvkprintf(php_runner, 1, "finish PHP-worker [req_id = %016llx]\n", req_id); state_finish(); return 0; } get_utime_monotonic(); } while (!paused); + tvkprintf(php_runner, 3, "PHP-worker [php-script state = %d, conn status = %d] return in net reactor [req_id = %016llx]\n", + php_script ? static_cast(php_script->state) : -1, conn->status, req_id); assert(conn->status == conn_wait_net); return get_timeout(); } @@ -67,7 +77,7 @@ void PhpWorker::terminate(int flag, script_error_t terminate_reason_, const char terminate_reason = terminate_reason_; error_message = error_message_; if (flag) { - vkprintf(0, "php_worker_terminate\n"); + kprintf("terminate PHP-worker with message %s\n", error_message_); conn = nullptr; } } @@ -90,7 +100,7 @@ void PhpWorker::state_try_start() noexcept { } if (php_worker_run_flag) { // put connection into pending_http_query - vkprintf(2, "php script [req_id = %016llx] is waiting\n", req_id); + tvkprintf(php_runner, 2, "PHP-worker is waiting [req_id = %016llx]\n", req_id); auto *pending_q = reinterpret_cast(malloc(sizeof(conn_query))); @@ -135,7 +145,6 @@ void PhpWorker::state_init_script() noexcept { get_utime_monotonic(); start_time = precise_now; - vkprintf(1, "START php script [req_id = %016llx]\n", req_id); assert(active_worker == nullptr); active_worker = this; vk::singleton::get().set_running_worker_status(); @@ -202,9 +211,11 @@ void php_worker_run_net_queue(PhpWorker *worker __attribute__((unused))) { } void PhpWorker::state_run() noexcept { + tvkprintf(php_runner, 3, "execute PHP-worker [req_id = %016llx]\n", req_id); int f = 1; while (f) { if (terminate_flag) { + tvkprintf(php_runner, 1, "PHP-worker terminate PHP-script [req_id = %016llx]\n", req_id); php_script->terminate(error_message, terminate_reason); } @@ -217,12 +228,12 @@ void PhpWorker::state_run() noexcept { paused = true; vk::singleton::get().set_wait_net_worker_status(); conn->status = conn_wait_net; - vkprintf(2, "php_script_iterate [req_id = %016llx] delayed due to net events\n", req_id); + tvkprintf(php_runner, 3, "PHP-script delayed due to net event [req_id = %016llx]\n", req_id); break; } - vkprintf(2, "before php_script_iterate [req_id = %016llx] (before swap context)\n", req_id); + tvkprintf(php_runner, 3, "PHP-worker before swap context [req_id = %016llx]\n", req_id); php_script->iterate(); - vkprintf(2, "after php_script_iterate [req_id = %016llx] (after swap context)\n", req_id); + tvkprintf(php_runner, 3, "PHP-worker after swap context [req_id = %016llx]\n", req_id);; wait(0); // check for net events break; } @@ -232,24 +243,24 @@ void PhpWorker::state_run() noexcept { paused = true; vk::singleton::get().set_wait_net_worker_status(); conn->status = conn_wait_net; - vkprintf(2, "query [req_id = %016llx] delayed due to net events\n", req_id); + tvkprintf(php_runner, 3, "PHP-worker delay query due to net events [req_id = %016llx]\n", req_id); break; } - vkprintf(2, "got query [req_id = %016llx]\n", req_id); + tvkprintf(php_runner, 2, "PHP-worker run query %016llx [req_id = %016llx]\n", query_stats_id, req_id); run_query(); php_worker_run_net_queue(this); wait(0); // check for net events break; } case run_state_t::query_running: { - vkprintf(2, "paused due to query [req_id = %016llx]\n", req_id); + tvkprintf(php_runner, 2, "PHP-worker paused due to query [req_id = %016llx]\n", req_id); f = 0; paused = true; vk::singleton::get().set_wait_net_worker_status(); break; } case run_state_t::error: { - vkprintf(2, "php script [req_id = %016llx]: ERROR (probably timeout)\n", req_id); + tvkprintf(php_runner, 1, "PHP-worker catch error [req_id = %016llx]\n", req_id); if (dl::is_malloc_replaced()) { // in case the error happened when malloc was replaced dl::rollback_malloc_replacement(); @@ -286,7 +297,7 @@ void PhpWorker::state_run() noexcept { break; } case run_state_t::finished: { - vkprintf(2, "php script [req_id = %016llx]: OK (still can return RPC_ERROR)\n", req_id); + tvkprintf(php_runner, 1, "PHP-worker finish PHP-script [req_id = %016llx]\n", req_id); script_result *res = php_script->res; set_result(res); php_script->finish(); @@ -311,7 +322,7 @@ void PhpWorker::wait(int timeout_ms) noexcept { int new_net_events_cnt = epoll_fetch_events(0); // TODO: maybe we have to wait for timers too if (epoll_event_heap_size() > 0) { - vkprintf(2, "paused for some nonblocking net activity [req_id = %016llx]\n", req_id); + tvkprintf(php_runner, 3, "PHP-worker paused for nonblocking net activity [req_id = %016llx]\n", req_id); wakeup(); return; } else { @@ -322,7 +333,7 @@ void PhpWorker::wait(int timeout_ms) noexcept { if (!net_events_empty()) { waiting = 0; } else { - vkprintf(2, "paused for some blocking net activity [req_id = %016llx] [timeout = %.3lf]\n", req_id, timeout_ms * 0.001); + tvkprintf(php_runner, 3, "PHP-worker paused for blocking net activity [req_id = %016llx] [timeout = %.3lf]\n", req_id, timeout_ms * 0.001); wakeup_time = get_utime_monotonic() + timeout_ms * 0.001; } } @@ -382,13 +393,8 @@ void PhpWorker::state_free_script() noexcept { php_worker_run_flag = 0; int f = 0; - get_utime_monotonic(); - double worked = precise_now - start_time; - double waited = start_time - init_time; - assert(active_worker == this); active_worker = nullptr; - vkprintf(1, "FINISH php script [query worked = %.5lf] [query waited for start = %.5lf] [req_id = %016llx]\n", worked, waited, req_id); vk::singleton::get().set_idle_worker_status(); if (mode == once_worker) { static int left = run_once_count; @@ -396,9 +402,6 @@ void PhpWorker::state_free_script() noexcept { turn_sigterm_on(); } } - if (worked + waited > 1.0) { - vkprintf(1, "ATTENTION php script [query worked = %.5lf] [query waited for start = %.5lf] [req_id = %016llx]\n", worked, waited, req_id); - } while (pending_http_queue.first_query != (conn_query *)&pending_http_queue && !f) { // TODO: is it correct to do it? @@ -423,7 +426,6 @@ void PhpWorker::state_free_script() noexcept { } void PhpWorker::state_finish() noexcept { - vkprintf(2, "free php script [req_id = %016llx]\n", req_id); clear_shared_job_messages(); // it's here because `phpq_free_script` state is skipped when worker->terminate_flag == true lease_on_worker_finish(this); } @@ -464,7 +466,7 @@ PhpWorker::PhpWorker(php_worker_mode_t mode_, connection *c, http_query_data *ht } else { target_fd = -1; } - vkprintf(2, "create php script [req_id = %016llx]\n", req_id); + tvkprintf(php_runner, 1, "initialize PHP-worker [req_id = %016llx]\n", req_id); } PhpWorker::~PhpWorker() {