diff --git a/ldms/python/ldmsd/Makefile.am b/ldms/python/ldmsd/Makefile.am index f4dd661dd..25fc2d903 100644 --- a/ldms/python/ldmsd/Makefile.am +++ b/ldms/python/ldmsd/Makefile.am @@ -1,3 +1,3 @@ pkgpythondir=${pythondir}/ldmsd -pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py +pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py ldmsd_profiling.py dist_bin_SCRIPTS = ldmsd_controller diff --git a/ldms/python/ldmsd/ldmsd_communicator.py b/ldms/python/ldmsd/ldmsd_communicator.py index ceb069943..6f043b19d 100644 --- a/ldms/python/ldmsd/ldmsd_communicator.py +++ b/ldms/python/ldmsd/ldmsd_communicator.py @@ -160,6 +160,7 @@ 'set_sec_mod' : {'req_attr': ['regex'], 'opt_attr': ['uid', 'gid', 'perm']}, 'log_status' : {'req_attr' : [], 'opt_attr' : ['name']}, 'stats_reset' : {'req_attr' : [], 'opt_attr' : ['list']}, + 'profiling' : {'req_attr' : [], 'opt_attr' : ['enable', 'reset']}, ##### Failover. ##### 'failover_config': { 'req_attr': [ @@ -617,6 +618,9 @@ class LDMSD_Request(object): SET_SEC_MOD = 0x600 + 19 LOG_STATUS = 0x600 + 20 STATS_RESET = 0x600 + 21 + # IDs 0x600 + 22 to 0x600 + 30 are reserved to match command-line options handlers + # defined in ldmsd_request.h. These must stay in sync with the C implementation. + PROFILING = 0x600 + 31 FAILOVER_CONFIG = 0x700 FAILOVER_PEERCFG_START = 0x700 + 1 @@ -732,6 +736,7 @@ class LDMSD_Request(object): 'failover_stop' : {'id' : FAILOVER_STOP}, 'set_route' : {'id': SET_ROUTE}, 'xprt_stats' : {'id' : XPRT_STATS}, + 'profiling' : {'id' : PROFILING}, 'thread_stats' : {'id' : THREAD_STATS}, 'prdcr_stats' : {'id' : PRDCR_STATS}, 'set_stats' : {'id' : SET_STATS}, @@ -3393,6 +3398,24 @@ def xprt_stats(self, reset=False, level=0): self.close() return errno.ENOTCONN, str(e) + def profiling(self, enable = None, reset = None): + attrs = [] + if enable is not None: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE, + value = enable)) + if reset is not None: + attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.RESET, + value = reset)) + req = LDMSD_Request( + command_id=LDMSD_Request.PROFILING, attrs=attrs) + try: + req.send(self) + resp = req.receive(self) + return resp['errcode'], resp['msg'] + except Exception as e: + self.close() + return errno.ENOTCONN, str(e) + def thread_stats(self, reset=False): """Query the daemon's I/O thread utilization data""" if reset is None: diff --git a/ldms/python/ldmsd/ldmsd_controller b/ldms/python/ldmsd/ldmsd_controller index 14e7724b0..c69426e6c 100755 --- a/ldms/python/ldmsd/ldmsd_controller +++ b/ldms/python/ldmsd/ldmsd_controller @@ -2410,6 +2410,30 @@ class LdmsdCmdParser(cmd.Cmd): def complete_xprt_stats(self, text, line, begidx, endidx): return self.__complete_attr_list('xprt_stats', text) + def do_profiling(self, arg): + """ + Enable/disable and query the LDMS operation profiling data + + The command was intended for diagnostic or study to improve ldmsd performance. + + The command always reports the cached profiling data if exists. + + Parameters: + [enabled=] True to enable LDMS profiling + [reset=] True to reset and free cached profiling data after the report + """ + arg = self.handle_args('profiling', arg) + if not arg: + return + rc, msg = self.comm.profiling(**arg) + if msg == "": + return + if rc != 0: + print(f"Error: {rc} {msg}") + return + stats = fmt_status(msg) + print(stats) + def do_updtr_task(self, arg): """ Report the updater tasks @@ -3212,7 +3236,6 @@ class LdmsdCmdParser(cmd.Cmd): cmd, arg = line[:i], line[i:].strip() return cmd, arg, line - if __name__ == "__main__": is_debug = True try: diff --git a/ldms/python/ldmsd/ldmsd_profiling.py b/ldms/python/ldmsd/ldmsd_profiling.py new file mode 100644 index 000000000..73b408f2c --- /dev/null +++ b/ldms/python/ldmsd/ldmsd_profiling.py @@ -0,0 +1,45 @@ +import json +import pandas as pd + +from ldmsd.ldmsd_communicator import * + +def profiling_as_json(xprt, host, port): + comm = Communicator(xprt=xprt, host=host, port=port) + comm.connect() + o = comm.profiling() + return json.loads(o[1]) + +def get_hosts(o): + return o['xprt'].keys() + +def get_streams(o): + return o['stream'].keys() + +def lookup_df(o, host): + df = pd.DataFrame(o['xprt'][host]['LOOKUP']) + return df + +def update_df(o, host): + df = pd.DataFrame(o['xprt'][host]['UPDATE']) + return df + +def send_df(o, host): + df = pd.DataFrame(o['xprt'][host]['SEND']) + return df + +def set_delete_df(o, host): + df = pd.DataFrame(o['xprt'][host]['SET_DELETE']) + return df + +def stream_publish_df(o, host): + df = pd.DataFrame(o['xprt'][host]['STREAM_PUBLISH']) + return df + +def stream_by_stream_df(o, stream_name = None, src = None): + d = o['stream'] + if stream_name is not None: + d = d[stream_name] + if src is not None: + d = d[src] + df = pd.DataFrame(d) + return df diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index 33882ef38..a4825167f 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -100,9 +100,83 @@ const char *ldms_xprt_op_names[] = { "DIR_REP", "SEND", "RECV", + "STREAM_PUBLISH", + "STREAM_SUBSCRIBE", + "STREAM_UNSUBSCRIBE" }; static char *type_names[]; +/* -ENOSYS means that LDMS doesn't support profiling for those operation. */ +int __enable_profiling[LDMS_XPRT_OP_COUNT] = { + PROFILING_CFG_DISABLED, /* lookup */ + PROFILING_CFG_DISABLED, /* update */ + PROFILING_CFG_UNSUPPORTED, /* Publish */ + PROFILING_CFG_DISABLED, /* set_delete */ + PROFILING_CFG_UNSUPPORTED, /* dir_req */ + PROFILING_CFG_UNSUPPORTED, /* dir_rep */ + PROFILING_CFG_DISABLED, /* send */ + PROFILING_CFG_UNSUPPORTED, /* receive */ + PROFILING_CFG_DISABLED, /* stream_publish */ + PROFILING_CFG_UNSUPPORTED, /* stream_subscribe */ + PROFILING_CFG_UNSUPPORTED /* stream_unsubscribe */ +}; + +int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err) +{ + int i; + int rc = 0; + + if (ops_cnt < 0) { + for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) { + if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED) + __enable_profiling[i] = PROFILING_CFG_ENABLED; + } + } else { + if (ops_err) + bzero(ops_err, sizeof(int) * ops_cnt); + for (i = 0; i < ops_cnt; i++) { + if (ops[i] >= LDMS_XPRT_OP_COUNT) { + ops_err[i] = EINVAL; + continue; + } + if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) { + rc = -1; + if (ops_err) + ops_err[i] = ENOSYS; + } else { + __enable_profiling[ops[i]] = PROFILING_CFG_ENABLED; + } + } + } + return rc; +} + +int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err) +{ + int i; + int rc = 0; + + if (ops_cnt < 0) { + for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) { + if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED) + __enable_profiling[i] = PROFILING_CFG_DISABLED; + } + } else { + if (ops_err) + bzero(ops_err, sizeof(int) * ops_cnt); + for (i = 0; i < ops_cnt; i++) { + if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) { + rc = -1; + if (ops_err) + ops_err[i] = ENOSYS; + } else { + __enable_profiling[ops[i]] = PROFILING_CFG_DISABLED; + } + } + } + return rc; +} + static struct ldms_digest_s null_digest; /* This function is useful for displaying data structures stored in @@ -739,7 +813,8 @@ static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg) sem_post(&x->sem); } -int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg) +int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, + void *arg, struct ldms_op_ctxt *op_ctxt) { ldms_t xprt = ldms_xprt_get(x); int rc; @@ -777,6 +852,21 @@ int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void ldms_t __ldms_xprt_to_rail(ldms_t x); int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg) { + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + + assert(set); + + if (set->curr_updt_ctxt) + return EBUSY; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_UPDATE; + (void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->update_profile.app_req_ts)); + } /* * We convert the transport handle to a rail handle using * __ldms_xprt_to_rail() and pass it to x->ops.update(). @@ -786,7 +876,12 @@ int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg) * when the update completes. */ ldms_t r = __ldms_xprt_to_rail(set->xprt); - return r->ops.update(r, set, cb, arg); + rc = r->ops.update(r, set, cb, arg, op_ctxt); + if (rc) { + set->curr_updt_ctxt = NULL; + free(op_ctxt); + } + return rc; } void __ldms_set_on_xprt_term(ldms_set_t set, ldms_t xprt) diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index 260e500ef..c93c1baa2 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -939,6 +939,7 @@ int ldms_xprt_addr(ldms_t x, struct ldms_addr *local_addr, */ const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz); + /** * \brief Convert a CIDR IP address string to \c ldms_addr * @@ -1471,6 +1472,8 @@ struct ldms_stream_close_event_s { typedef struct ldms_stream_event_s { ldms_t r; /* rail */ enum ldms_stream_event_type type; + struct timespec recv_ts; + uint32_t hop_num; union { struct ldms_stream_recv_data_s recv; struct ldms_stream_return_status_s status; @@ -1562,11 +1565,28 @@ struct ldms_stream_counters_s { *(p) = LDMS_STREAM_COUNTERS_INITIALIZER; \ } while (0) +struct ldms_stream_hop { + struct timespec recv_ts; + struct timespec send_ts; +}; + +#define STREAM_MAX_PROFILE_HOPS 8 +struct ldms_stream_profile { + uint32_t hop_cnt; + struct ldms_stream_hop hops[OVIS_FLEX]; +}; +struct ldms_stream_profile_ent { + TAILQ_ENTRY(ldms_stream_profile_ent) ent; + struct ldms_stream_profile profiles; +}; +TAILQ_HEAD(ldms_stream_profile_list, ldms_stream_profile_ent); + /* stream statistics by src */ struct ldms_stream_src_stats_s { struct rbn rbn; /* key ==> src */ struct ldms_addr src; struct ldms_stream_counters_s rx; /* total rx from src */ + struct ldms_stream_profile_list profiles; }; /* stats of stream-client pair */ @@ -2063,9 +2083,94 @@ typedef enum ldms_xprt_ops_e { LDMS_XPRT_OP_DIR_REP, LDMS_XPRT_OP_SEND, LDMS_XPRT_OP_RECV, + LDMS_XPRT_OP_STREAM_PUBLISH, + LDMS_XPRT_OP_STREAM_SUBSCRIBE, + LDMS_XPRT_OP_STREAM_UNSUBSCRIBE, LDMS_XPRT_OP_COUNT } ldms_xprt_ops_t; +struct ldms_op_ctxt { + enum ldms_xprt_ops_e op_type; + union { + struct lookup_profile_s { + struct timespec app_req_ts; + struct timespec req_send_ts; + struct timespec req_recv_ts; + struct timespec share_ts; + struct timespec rendzv_ts; + struct timespec read_ts; + struct timespec complete_ts; + struct timespec deliver_ts; + } lookup_profile; + struct update_profile { + struct timespec app_req_ts; + struct timespec read_ts; + struct timespec read_complete_ts; + struct timespec deliver_ts; + } update_profile; + struct set_delete_profile_s { + struct timespec send_ts; + struct timespec recv_ts; + struct timespec ack_ts; + } set_del_profile; + struct send_profile_s { + struct timespec app_req_ts; + struct timespec send_ts; + struct timespec complete_ts; + struct timespec deliver_ts; + } send_profile; + struct strm_publish_profile_s { + uint32_t hop_num; + struct timespec recv_ts; + struct timespec send_ts; /* to remote client */ + } stream_pub_profile; + }; + TAILQ_ENTRY(ldms_op_ctxt) ent; +}; +TAILQ_HEAD(ldms_op_ctxt_list, ldms_op_ctxt); + +#define PROFILING_CFG_DISABLED 0 +#define PROFILING_CFG_ENABLED 1 +#define PROFILING_CFG_UNSUPPORTED 2 + +/** + * Enable/disable LDMS operations' profiling + * + * If profiling is enabled, LDMS collects the following timestamps: + * for LOOKUP: when ldms_xprt_lookup() is called, + * when LDMS sends the lookup request to the peer, + * when the peer receives the lookup request, + * when the peer shares the set memory, + * when LDMS receives the shared memory, + * when LDMS reads the memory, + * when LDMS receives the read completion, + * and when LDMS delivers the lookup data to the application + * for UPDATE: when ldms_xprt_update() is called, + * when LDMS reads the set data, + * when LDMS receives the updated set data, + * when LDMS delivers the update completion to the application + * for SEND: when ldms_xprt_send() is called, + * when LDMS sends the data to the peer, + * when LDMS receives the send completion event, + * when LDMS delivers the send completion to the application + * for STREAM_PUBLISH: when ldms_stream_publish() is called, + * when LDMS publishes the stream data, + * when LDMS delivers the stream data to clients + * NOTE: LDMS collects the timestamps at each hop where stream data gets forwarded + * + * \param ops_cnt Number of operations in \c ops. + * -1 to enable/disable profiling of all operations + * \param ops Array of operations to enable their profiling + * \param ops_err Array to store an error of each given operation + * + * \return 0 on success; Otherwise, -1 is given. + * In this case, an error code will be assigned in the \c ops_err + * ENOSYS if the operation does not support profiling; + * EINVAL if the given operation does not exist. + */ +int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err); +int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err); + extern const char *ldms_xprt_op_names[]; struct ldms_xprt_rate_data { @@ -2173,15 +2278,25 @@ typedef struct ldms_xprt_stats { struct timespec disconnected; struct timespec last_op; struct ldms_stats_entry ops[LDMS_XPRT_OP_COUNT]; + struct ldms_op_ctxt_list op_ctxt_lists[LDMS_XPRT_OP_COUNT]; } *ldms_xprt_stats_t; +#define LDMS_PERF_M_STATS 1 +#define LDMS_PERF_M_PROFILNG 2 +#define LDMS_PERF_M_ALL LDMS_PERF_M_STATS | LDMS_PERF_M_PROFILNG + /** * \brief Retrieve transport request statistics * + * The function gets the statistics and then reset it if \c reset is not 0. + * To only reset the statistics, \c stats must be NULL. + * * \param x The transport handle - * \param s Pointer to an ldms_xprt_stats structure + * \param stats Pointer to an ldms_xprt_stats structure + * \param reset Reset the statistics after getting the statistics if not 0 + * */ -extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); +extern void ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats, int mask, int reset); /* * Metric template for: diff --git a/ldms/src/core/ldms_private.h b/ldms/src/core/ldms_private.h index 6be464a3c..3fb4cb932 100644 --- a/ldms/src/core/ldms_private.h +++ b/ldms/src/core/ldms_private.h @@ -149,6 +149,15 @@ struct ldms_set { struct ldms_context *notify_ctxt; /* Notify req context */ ldms_heap_t heap; struct ldms_heap_instance heap_inst; + + /* + * Context of the ongoing update operation on the set + * + * This field tracks the context of the current update operation. + * Subsequent updates are blocked until the current operation completes. + * The field is NULL when no update is in progress. + */ + struct ldms_op_ctxt *curr_updt_ctxt; }; /* Convenience macro to roundup a value to a multiple of the _s parameter */ @@ -157,7 +166,8 @@ struct ldms_set { extern int __ldms_xprt_push(ldms_set_t s, int push_flags); extern int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); + ldms_lookup_cb_t cb, void *cb_arg, + struct ldms_op_ctxt *op_ctxt); extern int __ldms_remote_dir(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); extern int __ldms_remote_dir_cancel(ldms_t x); extern struct ldms_set * diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index 16d8ee9a3..cb84882d9 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -76,6 +76,9 @@ extern ovis_log_t xlog; ovis_log(xlog, OVIS_LERROR, fmt, ## __VA_ARGS__); \ } while (0); +/* The definition is in ldms.c. */ +extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; + static int __rail_connect(ldms_t _r, struct sockaddr *sa, socklen_t sa_len, ldms_event_cb_t cb, void *cb_arg); static int __rail_is_connected(ldms_t _r); @@ -86,13 +89,14 @@ static int __rail_sockaddr(ldms_t _r, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); static void __rail_close(ldms_t _r); -static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len); +static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt); static size_t __rail_msg_max(ldms_t x); static int __rail_dir(ldms_t _r, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); static int __rail_dir_cancel(ldms_t _r); static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); -static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); +static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats, int mask, int is_reset); static ldms_t __rail_get(ldms_t _r); /* ref get */ static void __rail_put(ldms_t _r); /* ref put */ @@ -102,7 +106,8 @@ static uint64_t __rail_conn_id(ldms_t _r); static const char *__rail_type_name(ldms_t _r); static void __rail_priority_set(ldms_t _r, int prio); static void __rail_cred_get(ldms_t _r, ldms_cred_t lcl, ldms_cred_t rmt); -static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, void *arg); +static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); static int __rail_get_threads(ldms_t _r, pthread_t *out, int n); static ldms_set_t __rail_set_by_name(ldms_t x, const char *set_name); @@ -199,6 +204,7 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, ldms_rail_t r; zap_t zap; int i; + enum ldms_xprt_ops_e op_e; if (n <= 0) { errno = EINVAL; @@ -228,6 +234,7 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, r->recv_quota = recv_quota; r->recv_rate_limit = rate_limit; rbt_init(&r->stream_client_rbt, __str_rbn_cmp); + snprintf(r->name, sizeof(r->name), "%s", xprt_name); snprintf(r->auth_name, sizeof(r->auth_name), "%s", auth_name); if (auth_av_list) { @@ -248,6 +255,9 @@ ldms_t ldms_xprt_rail_new(const char *xprt_name, r->eps[i].remote_is_rail = -1; rbt_init(&r->eps[i].sbuf_rbt, __stream_buf_cmp); TAILQ_INIT(&r->eps[i].sbuf_tq); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + TAILQ_INIT(&(r->eps[i].op_ctxt_lists[op_e])); + } } zap = __ldms_zap_get(xprt_name); @@ -1066,20 +1076,31 @@ int __rail_rep_send_raw(struct ldms_rail_ep_s *rep, void *data, int len) return rc; } -static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len) +static int __rail_send(ldms_t _r, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt) { /* send over ep0 for now */ ldms_rail_t r = (ldms_rail_t)_r; int rc; struct ldms_rail_ep_s *rep; /* an endpoint inside the rail */ + pthread_mutex_lock(&r->mutex); if (r->eps[0].state != LDMS_RAIL_EP_CONNECTED) { rc = ENOTCONN; goto out; } rep = &r->eps[0]; - rc = ldms_xprt_send(rep->ep, msg_buf, msg_len); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_SEND]), + op_ctxt, ent); + } + rc = rep->ep->ops.send(rep->ep, msg_buf, msg_len, op_ctxt); if (rc) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_SEND]), + op_ctxt, ent); + } /* release the acquired quota if send failed */ __rep_quota_release(rep, msg_len); } @@ -1145,24 +1166,29 @@ struct ldms_rail_lookup_ctxt_s { ldms_lookup_cb_t app_cb; void *cb_arg; enum ldms_lookup_flags flags; + struct ldms_op_ctxt *op_ctxt; } *ldms_rail_lookup_ctxt_t; void __rail_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, ldms_set_t s, void *arg) { ldms_rail_lookup_ctxt_t lc = arg; + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + (void)clock_gettime(CLOCK_REALTIME, &lc->op_ctxt->lookup_profile.deliver_ts); + } lc->app_cb((void*)lc->r, status, more, s, lc->cb_arg); if (!more) free(lc); } static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg) + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt) { ldms_rail_t r = (ldms_rail_t)_r; int rc; struct ldms_rail_ep_s *rep; ldms_rail_lookup_ctxt_t lc; + pthread_mutex_lock(&r->mutex); if (r->state != LDMS_RAIL_EP_CONNECTED) { rc = ENOTCONN; @@ -1177,19 +1203,27 @@ static int __rail_lookup(ldms_t _r, const char *name, enum ldms_lookup_flags fla lc->app_cb = cb; lc->cb_arg = cb_arg; lc->flags = flags; + lc->op_ctxt = op_ctxt; rep = &r->eps[r->lookup_rr++]; r->lookup_rr %= r->n_eps; - rc = ldms_xprt_lookup(rep->ep, name, flags, __rail_lookup_cb, lc); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_LOOKUP]), op_ctxt, ent); + } + rc = rep->ep->ops.lookup(rep->ep, name, flags, __rail_lookup_cb, lc, op_ctxt); if (rc) { /* synchronous error */ free(lc); + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + TAILQ_REMOVE(&rep->op_ctxt_lists[LDMS_XPRT_OP_LOOKUP], op_ctxt, ent); + } } out: pthread_mutex_unlock(&r->mutex); return rc; } -static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats) +static void __rail_stats(ldms_t _r, ldms_xprt_stats_t stats, int mask, int is_reset) { /* TODO IMPLEMENT ME */ assert(0 == "Not Implemented"); @@ -1263,17 +1297,24 @@ void __rail_update_cb(ldms_t x, ldms_set_t s, int flags, void *arg) { struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); ldms_rail_update_ctxt_t uc = arg; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + struct ldms_op_ctxt *op_ctxt = s->curr_updt_ctxt; + + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->update_profile.deliver_ts); + s->curr_updt_ctxt = NULL; + } uc->app_cb((ldms_t)rep->rail, s, flags, uc->cb_arg); if (!(flags & LDMS_UPD_F_MORE)) { free(uc); } } -static int __rail_update(ldms_t _r, struct ldms_set *set, - ldms_update_cb_t cb, void *arg) +static int __rail_update(ldms_t _r, struct ldms_set *set, ldms_update_cb_t cb, + void *arg, struct ldms_op_ctxt *op_ctxt) { ldms_rail_t r = (void*)_r; ldms_rail_update_ctxt_t uc; + struct ldms_rail_ep_s *rep; int rc; uc = calloc(1, sizeof(*uc)); @@ -1282,9 +1323,19 @@ static int __rail_update(ldms_t _r, struct ldms_set *set, uc->r = r; uc->app_cb = cb; uc->cb_arg = arg; - rc = set->xprt->ops.update(set->xprt, set, __rail_update_cb, uc); + + rep = ldms_xprt_ctxt_get(set->xprt); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_UPDATE]), op_ctxt, ent); + set->curr_updt_ctxt = op_ctxt; + } + rc = set->xprt->ops.update(set->xprt, set, __rail_update_cb, uc, op_ctxt); if (rc) { /* synchronously error, clean up the context */ + set->curr_updt_ctxt = NULL; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_UPDATE]), op_ctxt, ent); free(uc); } return rc; @@ -1404,6 +1455,13 @@ void __rail_process_send_quota(ldms_t x, struct ldms_request *req) __rep_flush_sbuf_tq(rep); } +struct ldms_op_ctxt_list * +__rail_op_ctxt_list(struct ldms_xprt *x, enum ldms_xprt_ops_e op_e) +{ + struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); + return &(rep->op_ctxt_lists[op_e]); +} + int ldms_xprt_rail_send_quota_get(ldms_t _r, uint64_t *quotas, int n) { ldms_rail_t r; @@ -1463,6 +1521,18 @@ zap_ep_t __rail_get_zap_ep(ldms_t x) return r->eps[0].ep->zap_ep; } +void timespec_hton(struct timespec *ts) +{ + ts->tv_nsec = htobe64(ts->tv_nsec); + ts->tv_sec = htobe64(ts->tv_sec); +} + +void timespec_ntoh(struct timespec *ts) +{ + ts->tv_nsec = be64toh(ts->tv_nsec); + ts->tv_sec = be64toh(ts->tv_sec); +} + int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la) { union ldms_sockaddr *lsa = (void*)sa; @@ -1639,8 +1709,6 @@ size_t format_set_delete_req(struct ldms_request *req, uint64_t xid, void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, ldms_set_delete_cb_t cb_fn) { - /* - */ assert(XTYPE_IS_RAIL(_r->xtype)); ldms_rail_t r = (ldms_rail_t)_r; @@ -1651,6 +1719,8 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, struct xprt_set_coll_entry *ent; int i; ldms_t x; + struct ldms_rail_ep_s *rep; + struct ldms_op_ctxt *op_ctxt; x = NULL; @@ -1697,6 +1767,19 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, req = (struct ldms_request *)(ctxt + 1); len = format_set_delete_req(req, (uint64_t)(unsigned long)ctxt, ldms_set_instance_name_get(s)); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) { + rep = (struct ldms_rail_ep_s *)ldms_xprt_ctxt_get(x); + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) { + ovis_log(xlog, OVIS_LCRIT, "%s:%s:%d Memory allocation failure\n", + __FILE__, __func__, __LINE__); + /* Let the routine continue */ + } else { + ctxt->op_ctxt = op_ctxt; + TAILQ_INSERT_TAIL(&rep->op_ctxt_lists[LDMS_XPRT_OP_SET_DELETE], op_ctxt, ent); + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->set_del_profile.send_ts); + } + } zap_err_t zerr = zap_send(x->zap_ep, req, len); if (zerr) { char name[128]; @@ -1707,6 +1790,8 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, __FILE__, __func__, __LINE__, zerr, name); x->zerrno = zerr; __ldms_free_ctxt(x, ctxt); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) + TAILQ_REMOVE(&rep->op_ctxt_lists[LDMS_XPRT_OP_SET_DELETE], op_ctxt, ent); } pthread_mutex_unlock(&x->lock); } @@ -1714,20 +1799,41 @@ void __rail_on_set_delete(ldms_t _r, struct ldms_set *s, int __rep_flush_sbuf_tq(struct ldms_rail_ep_s *rep) { int rc; + struct ldms_op_ctxt *op_ctxt = NULL; struct __pending_sbuf_s *p; while ((p = TAILQ_FIRST(&rep->sbuf_tq))) { rc = __rep_quota_acquire(rep, p->sbuf->msg->msg_len); if (rc) goto out; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) { + rc = ENOMEM; + goto out; + } + op_ctxt->op_type = LDMS_XPRT_OP_STREAM_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = p->hop_num; + op_ctxt->stream_pub_profile.recv_ts = p->recv_ts; + TAILQ_INSERT_TAIL(&(rep->op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + } rc = __rep_publish(rep, p->sbuf->name, p->sbuf->msg->name_hash, p->sbuf->msg->stream_type, &p->sbuf->msg->src, p->sbuf->msg->msg_gn, &p->sbuf->msg->cred, p->sbuf->msg->perm, - p->sbuf->data, - p->sbuf->data_len); + p->sbuf->msg->hop_cnt, + p->sbuf->msg->hops, + p->sbuf->data, p->sbuf->data_len, + &(op_ctxt->stream_pub_profile)); if (rc) { __rep_quota_release(rep, p->sbuf->msg->msg_len); + if (ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + TAILQ_REMOVE(&(rep->op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + free(op_ctxt); + } goto out; } TAILQ_REMOVE(&rep->sbuf_tq, p, entry); diff --git a/ldms/src/core/ldms_rail.h b/ldms/src/core/ldms_rail.h index 63d865477..438a6ef04 100644 --- a/ldms/src/core/ldms_rail.h +++ b/ldms/src/core/ldms_rail.h @@ -114,6 +114,8 @@ struct ldms_rail_rate_quota_s { }; struct __pending_sbuf_s { + uint32_t hop_num; + struct timespec recv_ts; TAILQ_ENTRY(__pending_sbuf_s) entry; struct __stream_buf_s *sbuf; }; @@ -133,6 +135,14 @@ struct ldms_rail_ep_s { uint64_t pending_ret_quota; /* pending return quota */ int in_eps_stq; TAILQ_HEAD(, __pending_sbuf_s) sbuf_tq; /* pending fwd stream msgs */ + /* + * Array of operation context lists, indexed by `ldms_xprt_ops_e`. + * + * Each list stores operation contexts for the corresponding operation type. + * Operation contexts track profiling data for various operations, + * such as lookups, updates, and stream operations. + */ + struct ldms_op_ctxt_list op_ctxt_lists[LDMS_XPRT_OP_COUNT]; }; typedef struct ldms_rail_dir_ctxt_s { diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 186359e8a..460216b0f 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -81,6 +81,9 @@ #include "ldms_stream.h" #include "ldms_qgroup.h" +/* The definition is in ldms.c. */ +extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; + static ovis_log_t __ldms_stream_log = NULL; /* see __ldms_stream_init() below */ #define __LOG(LVL, FMT, ...) ovis_log(__ldms_stream_log, LVL, FMT, ##__VA_ARGS__ ); @@ -229,12 +232,19 @@ static int __part_send(struct ldms_rail_ep_s *rep, return rc; } +/* The implementations are in ldms_rail.c, */ +extern void timespec_ntoh(struct timespec *ts); +extern void timespec_hton(struct timespec *ts); + int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, uint32_t hash, - ldms_stream_type_t stream_type, + ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, - const char *data, size_t data_len) + uint32_t hop_cnt, + struct ldms_stream_hop * hops, + const char *data, size_t data_len, + struct strm_publish_profile_s *pts) { int rc = 0; int name_len = strlen(stream_name) + 1; @@ -248,6 +258,7 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, } else { bzero(&msg.src, sizeof(msg.src)); } + msg.msg_gn = htobe64(msg_gn); msg.msg_len = htobe32(name_len + data_len); msg.stream_type = htobe32(stream_type); @@ -255,6 +266,32 @@ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, msg.cred.gid = htobe32(cred->gid); msg.perm = htobe32(perm); msg.name_hash = hash; + msg.hop_cnt = htobe32(hop_cnt); + if (hops && hop_cnt) { + size_t sz = hop_cnt * sizeof(struct ldms_stream_hop); + memcpy(&msg.hops, hops, sz); + /* + * The timespec in hops are in network order already, + * so don't covert it. + */ + } + + (void)clock_gettime(CLOCK_REALTIME, &(pts->send_ts)); + if (hop_cnt <= STREAM_MAX_PROFILE_HOPS) { + /* + * We store the receive and send time only + * when the stream data has been forwarded + * at most STREAM_MAX_PROFILE_HOPS times. + * + * We ignore the timestamps after + * the STREAM_MAX_PROFILE_HOPS'th. + * + */ + msg.hops[hop_cnt].recv_ts = pts->recv_ts; + msg.hops[hop_cnt].send_ts = pts->send_ts; + timespec_hton(&msg.hops[hop_cnt].recv_ts); + timespec_hton(&msg.hops[hop_cnt].send_ts); + } rc = __part_send(rep, &msg.src, msg_gn, &msg, sizeof(msg), /* msg hdr */ stream_name, name_len, /* name */ @@ -279,11 +316,14 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) int rc; uint64_t addr_port; uint64_t hash; + struct ldms_op_ctxt *op_ctxt = NULL; + if (ev->type == LDMS_STREAM_EVENT_CLOSE) return 0; assert( ev->type == LDMS_STREAM_EVENT_RECV ); if (!XTYPE_IS_RAIL(ev->recv.client->x->xtype)) return ENOTSUP; + r = (ldms_rail_t)ev->recv.client->x; switch (ev->recv.src.sa_family) { case 0: @@ -325,6 +365,8 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) struct __pending_sbuf_s *e; e = malloc(sizeof(*e)); if (e) { + e->hop_num = _ev->pub.hop_num; + e->recv_ts = _ev->pub.recv_ts; e->sbuf = sbuf; ref_get(&sbuf->ref, "pending"); TAILQ_INSERT_TAIL(&r->eps[ep_idx].sbuf_tq, e, entry); @@ -332,14 +374,29 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) goto out; } + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_STREAM_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = _ev->pub.hop_num; + op_ctxt->stream_pub_profile.recv_ts = _ev->pub.recv_ts; rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.name_hash, ev->recv.type, &ev->recv.src, ev->recv.msg_gn, &ev->recv.cred, ev->recv.perm, - ev->recv.data, - ev->recv.data_len); - if (rc) + _ev->sbuf->msg->hop_cnt, + _ev->sbuf->msg->hops, + ev->recv.data, ev->recv.data_len, + &(op_ctxt->stream_pub_profile)); + if (rc || !ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + free(op_ctxt); + } else { + TAILQ_INSERT_TAIL(&(r->eps[ep_idx].op_ctxt_lists[LDMS_XPRT_OP_STREAM_PUBLISH]), + op_ctxt, ent); + } + if (rc) { __rate_quota_release(&ev->recv.client->rate_quota, ev->recv.data_len); + } out: return rc; } @@ -533,13 +590,15 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, uint32_t hash, ldms_stream_type_t stream_type, ldms_cred_t cred, uint32_t perm, - const char *data, size_t data_len) + const char *data, size_t data_len, + uint32_t hop_cnt, struct timespec *recv_ts) { int rc = 0, gc; struct ldms_stream_s *s; struct ldms_stream_client_entry_s *sce, *next_sce; struct ldms_stream_client_s *c; struct timespec now; + size_t sz; s = __stream_get(stream_name, NULL); if (!s) { @@ -549,6 +608,8 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, struct __stream_event_s _ev = { .pub = { + .hop_num = hop_cnt, + .recv_ts = *recv_ts, .type = LDMS_STREAM_EVENT_RECV, .recv = { .src = {0}, @@ -577,10 +638,11 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, pthread_rwlock_wrlock(&s->rwlock); clock_gettime(CLOCK_REALTIME, &now); __counters_update(&s->rx, &now, data_len); - if (__stream_stats_level > 1) { + if ((__stream_stats_level > 1) || ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { /* stats by src */ struct rbn *rbn = rbt_find(&s->src_stats_rbt, &_ev.pub.recv.src); struct ldms_stream_src_stats_s *ss; + struct ldms_stream_profile_ent *prof; if (rbn) { ss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); } else { @@ -595,8 +657,30 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, rbn_init(&ss->rbn, &ss->src); ss->rx = LDMS_STREAM_COUNTERS_INITIALIZER; rbt_ins(&s->src_stats_rbt, &ss->rbn); + TAILQ_INIT(&ss->profiles); } __counters_update(&ss->rx, &now, data_len); + if (sbuf && ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + /* Receive the stream data from remote server, so cache the profile */ + sz = (sbuf->msg->hop_cnt+1) * sizeof(struct ldms_stream_hop); + prof = calloc(1, sizeof(*prof) + sz); + if (!prof) { + /* error in stats shall not break the normal + * operations + */ + pthread_rwlock_unlock(&s->rwlock); + goto skip_stats; + } + prof->profiles.hop_cnt = sbuf->msg->hop_cnt; + memcpy(&(prof->profiles.hops), &(sbuf->msg->hops), sz); + int i; + for (i = 0; i < prof->profiles.hop_cnt; i++) { + timespec_ntoh(&prof->profiles.hops[i].recv_ts); + timespec_ntoh(&prof->profiles.hops[i].send_ts); + } + prof->profiles.hops[prof->profiles.hop_cnt].recv_ts = *recv_ts; + TAILQ_INSERT_TAIL(&ss->profiles, prof, ent); + } } pthread_rwlock_unlock(&s->rwlock); skip_stats: @@ -627,9 +711,7 @@ __stream_deliver(struct __stream_buf_s *sbuf, uint64_t msg_gn, ref_get(&c->ref, "callback"); pthread_rwlock_unlock(&s->rwlock); _ev.pub.recv.client = c; - /* TODO: Start: Get timing for application's stream handling time. */ rc = c->cb_fn(&_ev.pub, c->cb_arg); - /* TODO: End: Get timing for application's stream handling time. */ if (__stream_stats_level > 0) { pthread_rwlock_wrlock(&c->rwlock); if (rc) { @@ -871,8 +953,7 @@ int __publish_cred_check(ldms_cred_t cred) int ldms_stream_publish(ldms_t x, const char *stream_name, ldms_stream_type_t stream_type, - ldms_cred_t cred, - uint32_t perm, + ldms_cred_t cred, uint32_t perm, const char *data, size_t data_len) { ldms_rail_t r; @@ -883,6 +964,11 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, int rc; uint32_t hash; int ep_idx; + struct ldms_op_ctxt *op_ctxt = NULL; + struct ldms_op_ctxt_list *op_ctxt_list; + struct timespec recv_ts; + + (void)clock_gettime(CLOCK_REALTIME, &recv_ts); msg_gn = __atomic_fetch_add(&stream_gn, 1, __ATOMIC_SEQ_CST); @@ -912,14 +998,30 @@ int ldms_stream_publish(ldms_t x, const char *stream_name, rc = __rep_quota_acquire(&r->eps[ep_idx], q); if (rc) return rc; - return __rep_publish(&r->eps[ep_idx], stream_name, hash, - stream_type, 0, msg_gn, cred, perm, data, - data_len); + + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_PUBLISH; + op_ctxt->stream_pub_profile.hop_num = 0; + op_ctxt->stream_pub_profile.recv_ts = recv_ts; + rc = __rep_publish(&r->eps[ep_idx], stream_name, hash, + stream_type, 0, msg_gn, cred, perm, + 0, NULL, data, data_len, + &(op_ctxt->stream_pub_profile)); + + if (rc || !ENABLED_PROFILING(LDMS_XPRT_OP_STREAM_PUBLISH)) { + free(op_ctxt); + } else { + op_ctxt_list = &(r->eps[ep_idx].op_ctxt_lists[LDMS_XPRT_OP_PUBLISH]); + TAILQ_INSERT_TAIL(op_ctxt_list, op_ctxt, ent); + } + return rc; } /* else publish locally */ return __stream_deliver(0, msg_gn, stream_name, name_len, hash, - stream_type, cred, perm, data, data_len); + stream_type, cred, perm, data, data_len, 0, &recv_ts); } static void __client_ref_free(void *arg) @@ -1205,6 +1307,7 @@ static void __process_stream_msg(ldms_t x, struct ldms_request *req) { struct ldms_rail_ep_s *rep = ldms_xprt_ctxt_get(x); + struct ldms_thrstat *thrstat; /* no need to take lock ; only one zap thread working on this tree */ struct rbn *rbn; struct __stream_buf_s *sbuf; @@ -1267,7 +1370,7 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) fmsg = (void*)req->stream_part.part_msg; flen = be32toh(fmsg->msg_len); - sbuf = malloc(sizeof(*sbuf) + sizeof(*fmsg) + flen); + sbuf = calloc(1, sizeof(*sbuf) + sizeof(*fmsg) + flen); if (!sbuf) return; ref_init(&sbuf->ref, "init", __stream_buf_s_ref_free, sbuf); @@ -1304,6 +1407,8 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) sbuf->msg->cred.uid = be32toh(sbuf->msg->cred.uid); sbuf->msg->cred.gid = be32toh(sbuf->msg->cred.gid); sbuf->msg->perm = be32toh(sbuf->msg->perm); + sbuf->msg->hop_cnt = be32toh(sbuf->msg->hop_cnt); + sbuf->msg->hop_cnt++; /* sbuf->msg->name_hash does not need byte conversion */ sbuf->name = sbuf->msg->msg; @@ -1318,11 +1423,13 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) goto cleanup; } + thrstat = zap_thrstat_ctxt_get(x->zap_ep); __stream_deliver(sbuf, sbuf->msg->msg_gn, sbuf->name, sbuf->name_len, sbuf->msg->name_hash, sbuf->msg->stream_type, &sbuf->msg->cred, sbuf->msg->perm, - sbuf->data, sbuf->data_len); + sbuf->data, sbuf->data_len, + sbuf->msg->hop_cnt, &(thrstat->last_op_start)); cleanup: rbt_del(&rep->sbuf_rbt, &sbuf->rbn); @@ -1457,6 +1564,7 @@ void __stream_req_recv(ldms_t x, int cmd, struct ldms_request *req) { assert(0 == XTYPE_IS_RAIL(x->xtype)); /* x is NOT a rail */ assert(x->event_cb == __rail_cb); + switch (cmd) { case LDMS_CMD_STREAM_MSG: __process_stream_msg(x, req); @@ -1546,6 +1654,15 @@ int ldms_stream_stats_level_get() return __atomic_load_n(&__stream_stats_level, __ATOMIC_SEQ_CST); } +void __stream_profiling_purge(struct ldms_stream_profile_list *profiles) +{ + struct ldms_stream_profile_ent *prf; + while ((prf = TAILQ_FIRST(profiles))) { + TAILQ_REMOVE(profiles, prf, ent); + free(prf); + } +} + void __src_stats_rbt_purge(struct rbt *rbt) { struct rbn *rbn; @@ -1553,6 +1670,7 @@ void __src_stats_rbt_purge(struct rbt *rbt) while ((rbn = rbt_min(rbt))) { rbt_del(rbt, rbn); sss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); + __stream_profiling_purge(&sss->profiles); free(sss); } } @@ -1562,6 +1680,8 @@ int __src_stats_rbt_copy(struct rbt *t0, struct rbt *t1, int is_reset) { struct rbn *rbn; struct ldms_stream_src_stats_s *s0, *s1; + struct ldms_stream_profile_ent *prf0, *prf1; + size_t sz; int rc; for (rbn = rbt_min(t0); rbn; rbn = rbn_succ(rbn)) { s0 = container_of(rbn, struct ldms_stream_src_stats_s, rbn); @@ -1571,8 +1691,24 @@ int __src_stats_rbt_copy(struct rbt *t0, struct rbt *t1, int is_reset) goto err_0; } *s1 = *s0; - if (is_reset) + TAILQ_INIT(&s1->profiles); + + /* Copy the profiles */ + TAILQ_FOREACH(prf0, &s0->profiles, ent) { + sz = prf0->profiles.hop_cnt * sizeof(struct ldms_stream_hop); + prf1 = calloc(1, sizeof(*prf1) + sz); + if (!prf1) { + rc = ENOMEM; + goto err_0; + } + prf1->profiles.hop_cnt = prf0->profiles.hop_cnt; + memcpy(&(prf1->profiles.hops), &(prf0->profiles.hops), sz); + TAILQ_INSERT_TAIL(&s1->profiles, prf1, ent); + } + if (is_reset) { LDMS_STREAM_COUNTERS_INIT(&s0->rx); + __stream_profiling_purge(&s0->profiles); + } rbn_init(&s1->rbn, &s1->src); rbt_ins(t1, &s1->rbn); } diff --git a/ldms/src/core/ldms_stream.h b/ldms/src/core/ldms_stream.h index 7e796b3aa..ba896a23f 100644 --- a/ldms/src/core/ldms_stream.h +++ b/ldms/src/core/ldms_stream.h @@ -135,6 +135,9 @@ struct ldms_stream_full_msg_s { struct ldms_cred cred; /* credential of the originator */ uint32_t perm; /* 0777 style permission */ uint32_t name_hash; + /* Allocate space to collect profile data for 8 hops */ + uint32_t hop_cnt; + struct ldms_stream_hop hops[STREAM_MAX_PROFILE_HOPS+1]; char msg[OVIS_FLEX]; /* `msg` format: * .----------------------. @@ -169,10 +172,11 @@ struct __stream_buf_s { /* for internal use */ int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, - uint32_t hash, - ldms_stream_type_t stream_type, + uint32_t hash, ldms_stream_type_t stream_type, struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, - const char *data, size_t data_len); - + uint32_t hop_cnt, + struct ldms_stream_hop *hops, + const char *data, size_t data_len, + struct strm_publish_profile_s *pts); #endif /* __LDMS_STREAM_H__ */ diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index c3e0d3776..fa53f5c97 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -6,8 +6,7 @@ * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU - * General Public License (GPL) Version 2, available from the file - * COPYING in the main directory of this source tree, or the BSD-type +` * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without @@ -84,6 +83,9 @@ extern ovis_log_t xlog; ovis_log(xlog, level, fmt, ## __VA_ARGS__); \ } while (0); +/* The definition is in ldms_xprt.c. */ +extern int __enable_profiling[LDMS_XPRT_OP_COUNT]; + /** * zap callback function. */ @@ -95,6 +97,10 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev); */ void ldms_zap_auto_cb(zap_ep_t zep, zap_event_t ev); +/* The implementation is is ldms_rail.c. */ +struct ldms_op_ctxt_list * +__rail_op_ctxt_list(ldms_t x, enum ldms_xprt_ops_e op_e); + #if 0 #define TF() XPRT_LOG(NULL, OVIS_LALWAYS, "%s:%d\n", __FUNCTION__, __LINE__) #else @@ -744,10 +750,15 @@ void ldms_xprt_put(ldms_t x) x->ops.put(x); } +/* The implementations are in ldms_rail.c. */ +extern void timespec_hton(struct timespec *ts); +extern void timespec_ntoh(struct timespec *ts); + static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request *req) { struct ldms_reply reply; struct ldms_set *set; + size_t len; /* * Always notify the application about peer set delete. If we happened @@ -777,8 +788,11 @@ static void process_set_delete_request(struct ldms_xprt *x, struct ldms_request reply.hdr.xid = req->hdr.xid; reply.hdr.cmd = htonl(LDMS_CMD_SET_DELETE_REPLY); reply.hdr.rc = 0; - reply.hdr.len = htonl(sizeof(reply.hdr)); - zap_err_t zerr = zap_send(x->zap_ep, &reply, sizeof(reply.hdr)); + len = sizeof(reply.hdr) + sizeof(reply.set_del); + reply.hdr.len = htonl(len); + (void)clock_gettime(CLOCK_REALTIME, &reply.set_del.recv_ts); + timespec_hton(&reply.set_del.recv_ts); + zap_err_t zerr = zap_send(x->zap_ep, &reply, len); if (zerr != ZAP_ERR_OK) { x->zerrno = zerr; XPRT_LOG(x, OVIS_LERROR, "%s: zap_send synchronously error. " @@ -790,6 +804,12 @@ static void process_set_delete_reply(struct ldms_xprt *x, struct ldms_reply *reply, struct ldms_context *ctxt) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SET_DELETE)) { + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); + memcpy(&ctxt->op_ctxt->set_del_profile.ack_ts, &thrstat->last_op_start, sizeof(struct timespec)); + timespec_ntoh(&reply->set_del.recv_ts); + memcpy(&ctxt->op_ctxt->set_del_profile.recv_ts, &reply->set_del.recv_ts, sizeof(struct timespec)); + } ctxt->set_delete.cb(x, reply->hdr.rc, ctxt->set_delete.s, ctxt->set_delete.cb_arg); pthread_mutex_lock(&x->lock); __ldms_free_ctxt(x, ctxt); @@ -1149,7 +1169,7 @@ process_cancel_push_request(struct ldms_xprt *x, struct ldms_request *req) return; } -static void __copy_set_info_to_lookup_msg(char *buffer, ldms_name_t schema, +static void *__copy_set_info_to_lookup_msg(char *buffer, ldms_name_t schema, ldms_name_t inst_name, struct ldms_set *set) { @@ -1200,6 +1220,7 @@ static void __copy_set_info_to_lookup_msg(char *buffer, ldms_name_t schema, str = (ldms_name_t)&(str->name[str->len]); } str->len = 0; + return (void*)str + sizeof(str->len); } /* Caller should hold the set lock */ @@ -1303,6 +1324,7 @@ static int __add_lookup_peer(struct ldms_xprt *x, struct ldms_set *set) return rc; } +#define LU_PARAM_PRFL_MARKER "lu_prflng" static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, uint64_t xid, int more) { @@ -1312,6 +1334,7 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, ldms_name_t name = get_instance_name(set->meta); ldms_name_t schema = get_schema_name(set->meta); + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(x->zap_ep); /* * The lookup.set_info encodes schema name, instance name * and the set info key value pairs as follows. @@ -1349,11 +1372,24 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, * | last value string | * S S * +---------------------------+ + * | 0 | + * +---------------------------+ + * | LU_PARAM_PRFL_MARKER | + * +---------------------------+ + * | struct timespec | + * | (request receiving ts) | + * +---------------------------+ + * | struct timespec | + * | (sharing ts) | + * +---------------------------+ */ int set_info_cnt; size_t set_info_len; size_t msg_len; struct ldms_rendezvous_msg *msg; + struct timespec *req_recv_ts, *share_ts; + char *prfl_marker; + size_t prfl_marker_len = strlen(LU_PARAM_PRFL_MARKER) + 1; pthread_mutex_lock(&set->lock); __get_set_info_sz(set, &set_info_cnt, &set_info_len); @@ -1364,13 +1400,26 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, * +1 for the terminating string of length 0 */ + sizeof(struct ldms_name) * (2 + (set_info_cnt) * 2 + 1) - + name->len + schema->len + set_info_len; + + name->len + schema->len + set_info_len + /* + * Encode the request receiving timestamp + * and the sharing timestamp + */ + + prfl_marker_len + sizeof(struct timespec) * 2; + msg = calloc(1, msg_len); if (!msg) { pthread_mutex_unlock(&set->lock); return ENOMEM; } - __copy_set_info_to_lookup_msg(msg->lookup.set_info, schema, name, set); + prfl_marker = __copy_set_info_to_lookup_msg(msg->lookup.set_info, schema, name, set); + /* Embed the profiling timestamps in the lookup reply message */ + strcpy(prfl_marker, LU_PARAM_PRFL_MARKER); + req_recv_ts = (struct timespec *)(prfl_marker + prfl_marker_len); + memcpy(req_recv_ts, &thrstat->last_op_start, sizeof(struct timespec)); + share_ts = req_recv_ts+1; + (void)clock_gettime(CLOCK_REALTIME, share_ts); + /* Fill the set details */ pthread_mutex_unlock(&set->lock); msg->hdr.xid = xid; msg->hdr.cmd = htonl(LDMS_XPRT_RENDEZVOUS_LOOKUP); @@ -1381,7 +1430,6 @@ static int __send_lookup_reply(struct ldms_xprt *x, struct ldms_set *set, msg->lookup.meta_len = htonl(__le32_to_cpu(set->meta->meta_sz)); msg->lookup.card = htonl(__le32_to_cpu(set->meta->card)); msg->lookup.array_card = htonl(__le32_to_cpu(set->meta->array_card)); - XPRT_LOG(x, OVIS_LDEBUG, "%s(): x %p: sharing ... remote lookup ctxt %p\n", __func__, x, (void *)xid); zap_err_t zerr = zap_share(x->zap_ep, set->lmap, (const char *)msg, msg_len); @@ -1549,6 +1597,24 @@ static int do_read_all(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) goto out; } assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + * + * The read operation may involve reading the entire set at once, + * reading the meta followed by data, + * or reading multiple times to obtain the updated copy of the set. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), len, ctxt); if (rc) { @@ -1575,6 +1641,20 @@ static int do_read_meta(ldms_t x, ldms_set_t s, ldms_update_cb_t cb, void *arg) goto out; } assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap), s->lmap, zap_map_addr(s->lmap), meta_sz, ctxt); if (rc) { @@ -1598,7 +1678,6 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, ctxt = __ldms_alloc_ctxt(x, sizeof(*ctxt), LDMS_CONTEXT_UPDATE, s, cb, arg, idx_from, idx_to); - if (!ctxt) { rc = ENOMEM; goto out; @@ -1609,6 +1688,20 @@ static int do_read_data(ldms_t x, ldms_set_t s, int idx_from, int idx_to, dlen = (idx_to - idx_from + 1) * data_sz; assert(x == ctxt->x); + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + ctxt->op_ctxt = s->curr_updt_ctxt; + if (0 == ctxt->op_ctxt->update_profile.read_ts.tv_sec) { + /* + * If the data read timestamp is not set, + * record the current time as the start of the read operation. + */ + (void)clock_gettime(CLOCK_REALTIME, &ctxt->op_ctxt->update_profile.read_ts); + } else { + /* + * Continue reading the set. The read operation has already started. + */ + } + } rc = zap_read(x->zap_ep, s->rmap, zap_map_addr(s->rmap) + doff, s->lmap, zap_map_addr(s->lmap) + doff, dlen, ctxt); if (rc) { @@ -1758,6 +1851,12 @@ static void process_lookup_reply(struct ldms_xprt *x, struct ldms_reply *reply, struct ldms_context *ctxt) { + struct ldms_thrstat *thrstat; + + thrstat = zap_thrstat_ctxt_get(x->zap_ep); + memcpy(&ctxt->op_ctxt->lookup_profile.complete_ts, &thrstat->last_op_start, + sizeof(struct timespec)); + int rc = ntohl(reply->hdr.rc); if (!rc) { /* A peer should only receive error in lookup_reply. @@ -2538,14 +2637,40 @@ static void handle_zap_read_complete(zap_ep_t zep, zap_event_t ev) switch (ctxt->type) { case LDMS_CONTEXT_UPDATE: thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + /* + * If read complete timestamp is already set, + * we replace it with a new timestamp. + * + * We collect the timestamp of the beginning of thr first read and + * the timestamp of the completion of the last read. + */ + memcpy(&ctxt->op_ctxt->update_profile.read_complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } __handle_update_data(x, ctxt, ev); break; case LDMS_CONTEXT_UPDATE_META: - thrstat->last_op = LDMS_THRSTAT_OP_UPDATE_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) { + /* + * With the same reason as in the LDMS_CONTEXT_UPDATE case, + * we set or reset the read complete timestamp. + */ + memcpy(&ctxt->op_ctxt->update_profile.read_complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } __handle_update_meta(x, ctxt, ev); break; case LDMS_CONTEXT_LOOKUP_READ: thrstat->last_op = LDMS_THRSTAT_OP_LOOKUP_REPLY; + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + memcpy(&ctxt->op_ctxt->lookup_profile.complete_ts, + &thrstat->last_op_start, + sizeof(struct timespec)); + } + __handle_lookup(x, ctxt, ev); break; default: @@ -2602,7 +2727,7 @@ static const ldms_name_t __lookup_set_info_find(const char *set_info, return NULL; } -static int __process_lookup_set_info(struct ldms_set *lset, char *set_info) +static int __process_lookup_set_info(struct ldms_set *lset, char *set_info, char **_buf) { int rc = 0; ldms_name_t key, value; @@ -2628,6 +2753,7 @@ static int __process_lookup_set_info(struct ldms_set *lset, char *set_info) key = (ldms_name_t)(&value->name[value->len]); value = (ldms_name_t)(&key->name[key->len]); } + *_buf = (char *)value; if (!dir_upd) { /* Check if a key-value pair is removed from the set info or not */ pair = LIST_FIRST(&lset->remote_info); @@ -2659,6 +2785,12 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, struct ldms_set *lset; int rc; ldms_name_t schema_name, inst_name; + char *prfl_maker = 0; + struct timespec *req_recv_ts; + struct timespec *share_ts; + + struct ldms_thrstat *thrstat = zap_thrstat_ctxt_get(zep); + struct ldms_op_ctxt *op_ctxt = ctxt->op_ctxt; #ifdef DEBUG if (!__is_lookup_name_good(x, lu, ctxt)) { @@ -2695,13 +2827,28 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, rc = errno; goto callback; } - /* Drop when deleting the set TODO: don't forget to drop this */ lset->xprt = ldms_xprt_get(x); lset->rmap = ev->map; /* lset now owns ev->map */ lset->remote_set_id = lm->lookup.set_id; pthread_mutex_lock(&lset->lock); - (void)__process_lookup_set_info(lset, &inst_name->name[inst_name->len]); + (void)__process_lookup_set_info(lset, &inst_name->name[inst_name->len], &prfl_maker); + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + if (prfl_maker < (char *)lm + lm->hdr.len) { + /* The message is from v4.5.1+ version, + * which includes the lookup profiling timestamps. + */ + if (0 == strcmp(prfl_maker, LU_PARAM_PRFL_MARKER)) { + req_recv_ts = (struct timespec *)(prfl_maker + strlen(LU_PARAM_PRFL_MARKER) + 1); + share_ts = req_recv_ts+1; + memcpy(&op_ctxt->lookup_profile.rendzv_ts, &thrstat->last_op_start, sizeof(struct timespec)); + memcpy(&op_ctxt->lookup_profile.req_recv_ts, req_recv_ts, sizeof(struct timespec)); + memcpy(&op_ctxt->lookup_profile.share_ts, share_ts, sizeof(struct timespec)); + } + } + } + pthread_mutex_unlock(&lset->lock); pthread_mutex_lock(&x->lock); @@ -2719,8 +2866,12 @@ static void handle_rendezvous_lookup(zap_ep_t zep, zap_event_t ev, rd_ctxt->sem = ctxt->sem; rd_ctxt->sem_p = ctxt->sem_p; rd_ctxt->rc = ctxt->rc; + rd_ctxt->op_ctxt = ctxt->op_ctxt; pthread_mutex_unlock(&x->lock); assert((zep == x->zap_ep) && (x == rd_ctxt->x)); + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.read_ts); + } rc = zap_read(zep, lset->rmap, zap_map_addr(lset->rmap), lset->lmap, zap_map_addr(lset->lmap), @@ -2950,6 +3101,7 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) struct ldms_xprt *x = zap_get_ucontext(zep); struct ldms_thrstat *thrstat; struct ldms_thrstat_entry *thrstat_e = NULL; + struct ldms_op_ctxt *op_ctxt = NULL; if (x == NULL) return; @@ -3112,7 +3264,13 @@ static void ldms_zap_cb(zap_ep_t zep, zap_event_t ev) * Applications know only the connection is connecting. */ } else { - if (x->event_cb && (uint64_t)ev->context == LDMS_CMD_SEND_MSG) { + if (x->event_cb && ev->context) { + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + op_ctxt = (struct ldms_op_ctxt *)ev->context; + memcpy(&op_ctxt->send_profile.complete_ts, &thrstat->last_op_start, + sizeof(struct timespec)); + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->send_profile.deliver_ts); + } event.type = LDMS_XPRT_EVENT_SEND_COMPLETE; x->event_cb(x, &event, x->event_cb_arg); } @@ -3244,12 +3402,13 @@ static int __ldms_xprt_sockaddr(ldms_t x, struct sockaddr *local_sa, struct sockaddr *remote_sa, socklen_t *sa_len); static void __ldms_xprt_close(ldms_t x); -static int __ldms_xprt_send(ldms_t x, char *msg_buf, size_t msg_len); +static int __ldms_xprt_send(ldms_t x, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt); static size_t __ldms_xprt_msg_max(ldms_t x); static int __ldms_xprt_dir(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); static int __ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); -static void __ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); +static void __ldms_xprt_stats(ldms_t x, ldms_xprt_stats_t stats, int mask, int is_reset); static int __ldms_xprt_dir_cancel(ldms_t x); static ldms_t __ldms_xprt_get(ldms_t x); /* ref get */ @@ -3261,7 +3420,8 @@ static const char *__ldms_xprt_type_name(ldms_t x); static void __ldms_xprt_priority_set(ldms_t x, int prio); static void __ldms_xprt_cred_get(ldms_t x, ldms_cred_t lcl, ldms_cred_t rmt); static void __ldms_xprt_event_cb_set(ldms_t x, ldms_event_cb_t cb, void *cb_arg); -int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg); +int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); int __ldms_xprt_get_threads(ldms_t x, pthread_t *out, int n); zap_ep_t __ldms_xprt_get_zap_ep(ldms_t x); static ldms_set_t __ldms_xprt_set_by_name(ldms_t x, const char *set_name); @@ -3511,7 +3671,8 @@ size_t format_cancel_notify_req(struct ldms_request *req, uint64_t xid, return len; } -static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) +static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len, + struct ldms_op_ctxt *op_ctxt) { struct ldms_xprt *x = _x; struct ldms_request *req; @@ -3536,6 +3697,7 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) rc = ENOMEM; goto err_0; } + ctxt->op_ctxt = op_ctxt; req = (struct ldms_request *)(ctxt + 1); req->hdr.xid = 0; req->hdr.cmd = htonl(LDMS_CMD_SEND_MSG); @@ -3545,7 +3707,10 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) sizeof(struct ldms_send_cmd_param) + msg_len; req->hdr.len = htonl(len); - rc = zap_send2(x->zap_ep, req, len, (void*)(uint64_t)LDMS_CMD_SEND_MSG); + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->send_profile.send_ts); + } + rc = zap_send2(x->zap_ep, req, len, (void*)op_ctxt); #ifdef DEBUG if (rc) { XPRT_LOG(x, OVIS_LDEBUG, "send: error. put ref %p.\n", x->zap_ep); @@ -3560,7 +3725,19 @@ static int __ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) int ldms_xprt_send(ldms_t _x, char *msg_buf, size_t msg_len) { - return _x->ops.send(_x, msg_buf, msg_len); + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + if (ENABLED_PROFILING(LDMS_XPRT_OP_SEND)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_SEND; + (void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->send_profile.app_req_ts)); + } + rc = _x->ops.send(_x, msg_buf, msg_len, op_ctxt); + if (rc) + free(op_ctxt); + return rc; } static size_t __ldms_xprt_msg_max(ldms_t x) @@ -3699,7 +3876,8 @@ int ldms_xprt_dir_cancel(ldms_t x) int __ldms_remote_lookup(ldms_t _x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *arg) + ldms_lookup_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt) { struct ldms_xprt *x = _x; struct ldms_request *req; @@ -3749,6 +3927,10 @@ int __ldms_remote_lookup(ldms_t _x, const char *path, XPRT_LOG(x, OVIS_LDEBUG, "remote_lookup: get ref %p: active_lookup = %d\n", x->zap_ep, x->active_lookup); #endif /* DEBUG */ + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + ctxt->op_ctxt = op_ctxt; + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.req_send_ts); + } zap_err_t zerr = zap_send(x->zap_ep, req, len); if (zerr) { pthread_mutex_lock(&x->lock); @@ -3777,37 +3959,91 @@ static void sync_lookup_cb(ldms_t x, enum ldms_lookup_status status, int more, } static int __ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg) + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt) { int rc; if ((flags & !cb) || strlen(path) > LDMS_LOOKUP_PATH_MAX) return EINVAL; if (!cb) { - rc = __ldms_remote_lookup(x, path, flags, sync_lookup_cb, cb_arg); + rc = __ldms_remote_lookup(x, path, flags, sync_lookup_cb, cb_arg, op_ctxt); if (rc) return rc; sem_wait(&x->sem); rc = x->sem_rc; } else - rc = __ldms_remote_lookup(x, path, flags, cb, cb_arg); + rc = __ldms_remote_lookup(x, path, flags, cb, cb_arg, op_ctxt); return rc; } int ldms_xprt_lookup(ldms_t x, const char *path, enum ldms_lookup_flags flags, ldms_lookup_cb_t cb, void *cb_arg) { - return x->ops.lookup(x, path, flags, cb, cb_arg); + int rc; + struct ldms_op_ctxt *op_ctxt = NULL; + + if (ENABLED_PROFILING(LDMS_XPRT_OP_LOOKUP)) { + op_ctxt = calloc(1, sizeof(*op_ctxt)); + if (!op_ctxt) + return ENOMEM; + op_ctxt->op_type = LDMS_XPRT_OP_LOOKUP; + (void)clock_gettime(CLOCK_REALTIME, &op_ctxt->lookup_profile.app_req_ts); + } + rc = x->ops.lookup(x, path, flags, cb, cb_arg, op_ctxt); + if (rc) + free(op_ctxt); + return rc; } -static void __ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +static void __ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats, int mask, int is_reset) { + struct ldms_op_ctxt_list *src_list, *dst_list; + struct ldms_op_ctxt *src, *dst; + enum ldms_xprt_ops_e op_e; + + if (!stats) + goto reset; *stats = _x->stats; + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + TAILQ_INIT(&stats->op_ctxt_lists[op_e]); + dst_list = &stats->op_ctxt_lists[op_e]; + src_list = __rail_op_ctxt_list(_x, op_e); + + TAILQ_FOREACH(src, src_list, ent) { + dst = malloc(sizeof(*dst)); + if (!dst) { + ovis_log(NULL, OVIS_LCRIT, "Memory allocation failure.\n"); + return; + } + memcpy(dst, src, sizeof(*dst)); + dst->ent.tqe_next = NULL; + dst->ent.tqe_prev = NULL; + TAILQ_INSERT_TAIL(dst_list, dst, ent); + } + } + reset: + if (!is_reset) + return; + if (mask & LDMS_PERF_M_STATS) { + /* last_op and ops could also be reset by ldms_xprt_rate_data(). */ + /* don't reset the connect/disconnect time */ + memset(&_x->stats.last_op, 0, sizeof(_x->stats.last_op)); + memset(&_x->stats.ops, 0, sizeof(_x->stats.ops)); + } + if (mask & LDMS_PERF_M_PROFILNG) { + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + src_list = __rail_op_ctxt_list(_x, op_e); + while ((src = TAILQ_FIRST(src_list))) { + TAILQ_REMOVE(src_list, src, ent); + free(src); + } + } + } } -void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats) +void ldms_xprt_stats(ldms_t _x, ldms_xprt_stats_t stats, int mask, int is_reset) { - _x->ops.stats(_x, stats); + _x->ops.stats(_x, stats, mask, is_reset); } static int send_req_notify(ldms_t _x, ldms_set_t s, uint32_t flags, diff --git a/ldms/src/core/ldms_xprt.h b/ldms/src/core/ldms_xprt.h index 7a98300f5..9a61330da 100644 --- a/ldms/src/core/ldms_xprt.h +++ b/ldms/src/core/ldms_xprt.h @@ -281,6 +281,8 @@ struct ldms_rendezvous_lookup_param { uint32_t card; /* card of dict */ uint32_t schema_len; uint32_t array_card; /* card of array */ + // struct timespec req_recv; /* Timestamp when server has received the lookup request. */ + // struct timespec share; /* Timestamp when server has called zap_share(). */ /* schema name, then instance name, and then set_info key value pairs */ char set_info[OVIS_FLEX]; }; @@ -346,6 +348,10 @@ struct ldms_stream_sub_reply { char msg[0]; }; +struct ldms_set_delete_reply { + struct timespec recv_ts; +}; + struct ldms_reply { struct ldms_reply_hdr hdr; union { @@ -354,6 +360,7 @@ struct ldms_reply { struct ldms_auth_challenge_reply auth_challenge; struct ldms_push_reply push; struct ldms_stream_sub_reply sub; + struct ldms_set_delete_reply set_del; }; }; #pragma pack() @@ -377,6 +384,7 @@ struct ldms_context { int rc; struct ldms_xprt *x; ldms_context_type_t type; + struct ldms_op_ctxt *op_ctxt; union { struct { ldms_dir_cb_t cb; @@ -449,13 +457,13 @@ struct ldms_xprt_ops_s { struct sockaddr *remote_sa, socklen_t *sa_len); void (*close)(ldms_t x); - int (*send)(ldms_t x, char *msg_buf, size_t msg_len); + int (*send)(ldms_t x, char *msg_buf, size_t msg_len, struct ldms_op_ctxt *op_ctxt); size_t (*msg_max)(ldms_t x); int (*dir)(ldms_t x, ldms_dir_cb_t cb, void *cb_arg, uint32_t flags); int (*dir_cancel)(ldms_t x); int (*lookup)(ldms_t t, const char *name, enum ldms_lookup_flags flags, - ldms_lookup_cb_t cb, void *cb_arg); - void (*stats)(ldms_t x, ldms_xprt_stats_t stats); + ldms_lookup_cb_t cb, void *cb_arg, struct ldms_op_ctxt *op_ctxt); + void (*stats)(ldms_t x, ldms_xprt_stats_t stats, int mask, int is_reset); ldms_t (*get)(ldms_t x); /* ref get */ void (*put)(ldms_t x); /* ref put */ @@ -465,7 +473,8 @@ struct ldms_xprt_ops_s { const char *(*type_name)(ldms_t x); void (*priority_set)(ldms_t x, int prio); void (*cred_get)(ldms_t x, ldms_cred_t lcl, ldms_cred_t rmt); - int (*update)(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg); + int (*update)(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg, + struct ldms_op_ctxt *op_ctxt); int (*get_threads)(ldms_t x, pthread_t *out, int n); @@ -559,4 +568,10 @@ void ldms_xprt_auth_begin(ldms_t xprt); int ldms_xprt_auth_send(ldms_t _x, const char *msg_buf, size_t msg_len); void ldms_xprt_auth_end(ldms_t xprt, int result); +/* ======================== + * LDMS operation profiling + * ======================== + */ +#define ENABLED_PROFILING(_OP_) (__enable_profiling[_OP_] == 1) + #endif diff --git a/ldms/src/ldmsd/ldmsd_request.c b/ldms/src/ldmsd/ldmsd_request.c index 5ef9156f2..5813008b9 100644 --- a/ldms/src/ldmsd/ldmsd_request.c +++ b/ldms/src/ldmsd/ldmsd_request.c @@ -264,6 +264,7 @@ static int update_time_stats_handler(ldmsd_req_ctxt_t reqc); static int set_sec_mod_handler(ldmsd_req_ctxt_t reqc); static int log_status_handler(ldmsd_req_ctxt_t reqc); static int stats_reset_handler(ldmsd_req_ctxt_t reqc); +static int profiling_handler(ldmsd_req_ctxt_t req); /* these are implemented in ldmsd_failover.c */ int failover_config_handler(ldmsd_req_ctxt_t req_ctxt); @@ -565,6 +566,10 @@ static struct request_handler_entry request_handler[] = { LDMSD_SET_DEFAULT_AUTHZ_REQ, set_default_authz_handler, XUG | MOD }, + [LDMSD_PROFILING_REQ] = { + LDMSD_PROFILING_REQ, profiling_handler, XALL + }, + /* FAILOVER user commands */ [LDMSD_FAILOVER_CONFIG_REQ] = { LDMSD_FAILOVER_CONFIG_REQ, failover_config_handler, XUG | MOD, @@ -7066,7 +7071,7 @@ static char *__xprt_stats_as_json(size_t *json_sz, int reset, int level) first = 0; } - ldms_xprt_stats(x, &xs); + ldms_xprt_stats(x, &xs, LDMS_PERF_M_STATS, reset); xprt_count += 1; switch (ep_state) { @@ -7238,6 +7243,254 @@ static int xprt_stats_handler(ldmsd_req_ctxt_t req) return ENOMEM; } +double __ts2double(struct timespec ts) +{ + return ts.tv_sec + ((double)ts.tv_nsec)/1000000000.0; +} + +json_t *__ldms_op_profiling_as_json(struct ldms_op_ctxt *xc, enum ldms_xprt_ops_e op_e) +{ + json_t *stat; + stat = json_object(); + switch (op_e) { + case LDMS_XPRT_OP_LOOKUP: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->lookup_profile.app_req_ts))); + json_object_set_new(stat, "req_send", + json_real(__ts2double(xc->lookup_profile.req_send_ts))); + json_object_set_new(stat, "req_recv", + json_real(__ts2double(xc->lookup_profile.req_recv_ts))); + json_object_set_new(stat, "share", + json_real(__ts2double(xc->lookup_profile.share_ts))); + json_object_set_new(stat, "rendzv", + json_real(__ts2double(xc->lookup_profile.rendzv_ts))); + json_object_set_new(stat, "read", + json_real(__ts2double(xc->lookup_profile.read_ts))); + json_object_set_new(stat, "complete", + json_real(__ts2double(xc->lookup_profile.complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->lookup_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_UPDATE: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->update_profile.app_req_ts))); + json_object_set_new(stat, "read_start", + json_real(__ts2double(xc->update_profile.read_ts))); + json_object_set_new(stat, "read_complete", + json_real(__ts2double(xc->update_profile.read_complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->update_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_SEND: + json_object_set_new(stat, "app_req", + json_real(__ts2double(xc->send_profile.app_req_ts))); + json_object_set_new(stat, "send", + json_real(__ts2double(xc->send_profile.send_ts))); + json_object_set_new(stat, "complete", + json_real(__ts2double(xc->send_profile.complete_ts))); + json_object_set_new(stat, "deliver", + json_real(__ts2double(xc->send_profile.deliver_ts))); + break; + case LDMS_XPRT_OP_SET_DELETE: + json_object_set_new(stat, "send", + json_real(__ts2double(xc->set_del_profile.send_ts))); + json_object_set_new(stat, "recv", + json_real(__ts2double(xc->set_del_profile.recv_ts))); + json_object_set_new(stat, "acknowledge", + json_real(__ts2double(xc->set_del_profile.ack_ts))); + break; + case LDMS_XPRT_OP_STREAM_PUBLISH: + json_object_set_new(stat, "hop_cnt", + json_integer(xc->stream_pub_profile.hop_num)); + json_object_set_new(stat, "recv", + json_real(__ts2double(xc->stream_pub_profile.recv_ts))); + json_object_set_new(stat, "send", + json_real(__ts2double(xc->stream_pub_profile.send_ts))); + break; + default: + break; + } + return stat; +} + +int __stream_profiling_as_json(json_t **_jobj, int is_reset) { + json_t *jobj, *strm_jobj, *src_jobj, *hop_jobj, *prf_array, *prf_jobj; + struct ldms_stream_stats_tq_s *tq; + struct ldms_stream_stats_s *ss; + struct ldms_stream_src_stats_s *strm_src; + struct ldms_stream_profile_ent *prf; + struct ldms_addr addr; + char addr_buf[128] = ""; + struct rbn *rbn; + int i, rc = 0; + + jobj = json_object(); + tq = ldms_stream_stats_tq_get(NULL, 0, is_reset); + if (!tq) { + /* no stream ... nothing to do here. */ + goto out; + } + TAILQ_FOREACH(ss, tq, entry) { + strm_jobj = json_object(); + + RBT_FOREACH(rbn, &ss->src_stats_rbt) { + src_jobj = json_array(); + + strm_src = container_of(rbn, struct ldms_stream_src_stats_s, rbn); + addr = strm_src->src; + ldms_addr_ntop(&addr, addr_buf, sizeof(addr_buf)); + TAILQ_FOREACH(prf, &strm_src->profiles, ent) { + hop_jobj = json_object(); + json_object_set_new(hop_jobj, "hop_count", json_integer(prf->profiles.hop_cnt)); + prf_array = json_array(); + json_object_set_new(hop_jobj, "profile", prf_array); + for (i = 0; i < prf->profiles.hop_cnt; i++) { + prf_jobj = json_object(); + json_object_set_new(prf_jobj, "recv", + json_real(__ts2double(prf->profiles.hops[i].recv_ts))); + json_object_set_new(prf_jobj, "deliver", + json_real(__ts2double(prf->profiles.hops[i].send_ts))); + json_array_append_new(prf_array, prf_jobj); + } + json_array_append_new(src_jobj, hop_jobj); + } + json_object_set_new(strm_jobj, addr_buf, src_jobj); + } + json_object_set_new(jobj, ss->name, strm_jobj); + } + out: + *_jobj = jobj; + return rc; +} + +int __xprt_profiling_as_json(json_t **_obj, int is_reset) +{ + json_t *obj, *ep_prf, *op_prf; + ldms_t x; + struct ldms_xprt_stats stats; + struct ldms_op_ctxt *xc; + int rc; + enum ldms_xprt_ops_e op_e; + char lhostname[128], lport_no[32], rhostname[128], rport_no[32], name[161]; + + + obj = json_object(); + if (!obj) { + ovis_log(config_log, OVIS_LCRIT, "Memory allocation failure\n"); + return ENOMEM; + } + for (x = ldms_xprt_first(); x; x = ldms_xprt_next(x)) { + rc = ldms_xprt_names(x, lhostname, sizeof(lhostname), + lport_no, sizeof(lport_no), + rhostname, sizeof(rhostname), + rport_no, sizeof(rport_no), + NI_NAMEREQD | NI_NUMERICSERV); + if (rc) { + if (rc == ENOTCONN) + continue; + } + + ldms_xprt_stats(x, &stats, LDMS_PERF_M_PROFILNG, is_reset); + snprintf(name, 160, "%s:%s", rhostname, rport_no); + ep_prf = json_object(); + for (op_e = 0; op_e < LDMS_XPRT_OP_COUNT; op_e++) { + op_prf = json_array(); + TAILQ_FOREACH(xc, &stats.op_ctxt_lists[op_e], ent) { + json_array_append_new(op_prf, __ldms_op_profiling_as_json(xc, op_e)); + } + json_object_set_new(ep_prf, ldms_xprt_op_names[op_e], op_prf); + + } + json_object_set_new(obj, name, ep_prf); + } + *_obj = obj; + return 0; +} + +static int profiling_handler(ldmsd_req_ctxt_t req) +{ + json_t *obj, *xprt_prf, *strm_prf; + char *json_as_str; + int rc = 0; + struct ldmsd_req_attr_s attr; + size_t str_len; + char *enable_str, *reset_str; + int is_enable = -1; /* -1 means only getting the profile data, don't enable/disable */ + int is_reset = 0; + + enable_str = ldmsd_req_attr_str_value_get_by_id(req, LDMSD_ATTR_TYPE); + if (enable_str) { + is_enable = 1; + if (0 == strcasecmp(enable_str, "false")) + is_enable = 0; /* disable */ + } + reset_str = ldmsd_req_attr_str_value_get_by_id(req, LDMSD_ATTR_RESET); + if (reset_str) { + is_reset = 1; + if (0 == strcasecmp(reset_str, "false")) + is_reset = 0; + } + + if (is_enable == 1) { + ldms_profiling_enable(-1, NULL, NULL); + } else if (is_enable == 0) { + ldms_profiling_disable(-1, NULL, NULL); + } + + /* + * The output JSON object looks like this: + * + * { + * "xprt": { + * : { + * "lookup": , + * "update": , + * "send": + * }, + * ... + * }, + * "stream" : { + * : , + * ... + * } + * } + */ + obj = json_object(); + (void)__xprt_profiling_as_json(&xprt_prf, is_reset); + json_object_set_new(obj, "xprt", xprt_prf); + + (void)__stream_profiling_as_json(&strm_prf, is_reset); + json_object_set_new(obj, "stream", strm_prf); + + json_as_str = json_dumps(obj, JSON_INDENT(0)); + str_len = strlen(json_as_str) + 1; /* +1 for \0 */ + + attr.discrim = 1; + attr.attr_id = LDMSD_ATTR_JSON; + attr.attr_len = str_len; + ldmsd_hton_req_attr(&attr); + + if (ldmsd_append_reply(req, (const char *)&attr, sizeof(attr), LDMSD_REQ_SOM_F)) + goto err; + + if (ldmsd_append_reply(req, json_as_str, str_len, 0)) + goto err; + + attr.discrim = 0; + if (ldmsd_append_reply(req, (const char *)&attr.discrim, sizeof(attr.discrim), LDMSD_REQ_EOM_F)) + goto err; + + free(obj); + free(json_as_str); + return 0; +err: + free(obj); + free(json_as_str); + req->errcode = rc; + ldmsd_send_req_response(req, "Failed to get ldms_xprt's probe data"); + return ENOMEM; +} + struct store_time_thread { pid_t tid; uint64_t store_time; diff --git a/ldms/src/ldmsd/ldmsd_request.h b/ldms/src/ldmsd/ldmsd_request.h index 5dab3a501..39fdfd803 100644 --- a/ldms/src/ldmsd/ldmsd_request.h +++ b/ldms/src/ldmsd/ldmsd_request.h @@ -165,6 +165,7 @@ enum ldmsd_request { LDMSD_DEFAULT_QUOTA_REQ, LDMSD_PID_FILE_REQ, LDMSD_BANNER_MODE_REQ, + LDMSD_PROFILING_REQ, /* failover requests by user */ LDMSD_FAILOVER_CONFIG_REQ = 0x700, /* "failover_config" user command */ diff --git a/ldms/src/ldmsd/ldmsd_request_util.c b/ldms/src/ldmsd/ldmsd_request_util.c index 8cda0e123..0e300cd0f 100644 --- a/ldms/src/ldmsd/ldmsd_request_util.c +++ b/ldms/src/ldmsd/ldmsd_request_util.c @@ -123,6 +123,7 @@ struct req_str_id req_str_id_table[] = { { "prdcr_stream_status",LDMSD_PRDCR_STREAM_STATUS_REQ }, { "prdcr_subscribe", LDMSD_PRDCR_SUBSCRIBE_REQ }, { "prdcr_unsubscribe", LDMSD_PRDCR_UNSUBSCRIBE_REQ }, + { "profiling", LDMSD_PROFILING_REQ }, { "publish_kernel", LDMSD_PUBLISH_KERNEL_REQ }, { "qgroup_config", LDMSD_QGROUP_CONFIG_REQ }, { "qgroup_info", LDMSD_QGROUP_INFO_REQ }, @@ -330,6 +331,7 @@ const char *ldmsd_req_id2str(enum ldmsd_request req_id) case LDMSD_CMDLINE_OPTIONS_SET_REQ : return "CMDLINE_OPTION_SET_REQ"; case LDMSD_SET_SEC_MOD_REQ : return "SET_SEC_REQ"; case LDMSD_LOG_STATUS_REQ : return "LOG_STATUS_REQ"; + case LDMSD_PROFILING_REQ : return "PROFILING_REQ"; /* failover requests by user */ case LDMSD_FAILOVER_CONFIG_REQ : return "FAILOVER_CONFIG_REQ";