Skip to content

Commit

Permalink
Profiling LDMS Operations
Browse files Browse the repository at this point in the history
This commit introduces profiling capabilities to LDMS operations,
including lookup, update, set_delete, and stream_publish. It collects
timestamps for key events, such as API calls, request/response
exchanges, and completion notifications. The feature is disabled by
default but can be configured using the 'profilng' configuration command
to enable, disable, or reset data collection. This enhancement will aid
in performance analysis and optimization.
  • Loading branch information
nichamon authored and tom95858 committed Dec 12, 2024
1 parent ff176bb commit 5160ffc
Show file tree
Hide file tree
Showing 16 changed files with 1,153 additions and 79 deletions.
2 changes: 1 addition & 1 deletion ldms/python/ldmsd/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pkgpythondir=${pythondir}/ldmsd
pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py
pkgpython_PYTHON = __init__.py ldmsd_setup.py ldmsd_util.py ldmsd_communicator.py ldmsd_config.py ldmsd_profiling.py
dist_bin_SCRIPTS = ldmsd_controller
23 changes: 23 additions & 0 deletions ldms/python/ldmsd/ldmsd_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
'set_sec_mod' : {'req_attr': ['regex'], 'opt_attr': ['uid', 'gid', 'perm']},
'log_status' : {'req_attr' : [], 'opt_attr' : ['name']},
'stats_reset' : {'req_attr' : [], 'opt_attr' : ['list']},
'profiling' : {'req_attr' : [], 'opt_attr' : ['enable', 'reset']},
##### Failover. #####
'failover_config': {
'req_attr': [
Expand Down Expand Up @@ -617,6 +618,9 @@ class LDMSD_Request(object):
SET_SEC_MOD = 0x600 + 19
LOG_STATUS = 0x600 + 20
STATS_RESET = 0x600 + 21
# IDs 0x600 + 22 to 0x600 + 30 are reserved to match command-line options handlers
# defined in ldmsd_request.h. These must stay in sync with the C implementation.
PROFILING = 0x600 + 31

FAILOVER_CONFIG = 0x700
FAILOVER_PEERCFG_START = 0x700 + 1
Expand Down Expand Up @@ -732,6 +736,7 @@ class LDMSD_Request(object):
'failover_stop' : {'id' : FAILOVER_STOP},
'set_route' : {'id': SET_ROUTE},
'xprt_stats' : {'id' : XPRT_STATS},
'profiling' : {'id' : PROFILING},
'thread_stats' : {'id' : THREAD_STATS},
'prdcr_stats' : {'id' : PRDCR_STATS},
'set_stats' : {'id' : SET_STATS},
Expand Down Expand Up @@ -3393,6 +3398,24 @@ def xprt_stats(self, reset=False, level=0):
self.close()
return errno.ENOTCONN, str(e)

def profiling(self, enable = None, reset = None):
attrs = []
if enable is not None:
attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.TYPE,
value = enable))
if reset is not None:
attrs.append(LDMSD_Req_Attr(attr_id = LDMSD_Req_Attr.RESET,
value = reset))
req = LDMSD_Request(
command_id=LDMSD_Request.PROFILING, attrs=attrs)
try:
req.send(self)
resp = req.receive(self)
return resp['errcode'], resp['msg']
except Exception as e:
self.close()
return errno.ENOTCONN, str(e)

def thread_stats(self, reset=False):
"""Query the daemon's I/O thread utilization data"""
if reset is None:
Expand Down
25 changes: 24 additions & 1 deletion ldms/python/ldmsd/ldmsd_controller
Original file line number Diff line number Diff line change
Expand Up @@ -2410,6 +2410,30 @@ class LdmsdCmdParser(cmd.Cmd):
def complete_xprt_stats(self, text, line, begidx, endidx):
return self.__complete_attr_list('xprt_stats', text)

def do_profiling(self, arg):
"""
Enable/disable and query the LDMS operation profiling data
The command was intended for diagnostic or study to improve ldmsd performance.
The command always reports the cached profiling data if exists.
Parameters:
[enabled=] True to enable LDMS profiling
[reset=] True to reset and free cached profiling data after the report
"""
arg = self.handle_args('profiling', arg)
if not arg:
return
rc, msg = self.comm.profiling(**arg)
if msg == "":
return
if rc != 0:
print(f"Error: {rc} {msg}")
return
stats = fmt_status(msg)
print(stats)

def do_updtr_task(self, arg):
"""
Report the updater tasks
Expand Down Expand Up @@ -3212,7 +3236,6 @@ class LdmsdCmdParser(cmd.Cmd):
cmd, arg = line[:i], line[i:].strip()
return cmd, arg, line


if __name__ == "__main__":
is_debug = True
try:
Expand Down
45 changes: 45 additions & 0 deletions ldms/python/ldmsd/ldmsd_profiling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import pandas as pd

from ldmsd.ldmsd_communicator import *

def profiling_as_json(xprt, host, port):
comm = Communicator(xprt=xprt, host=host, port=port)
comm.connect()
o = comm.profiling()
return json.loads(o[1])

def get_hosts(o):
return o['xprt'].keys()

def get_streams(o):
return o['stream'].keys()

def lookup_df(o, host):
df = pd.DataFrame(o['xprt'][host]['LOOKUP'])
return df

def update_df(o, host):
df = pd.DataFrame(o['xprt'][host]['UPDATE'])
return df

def send_df(o, host):
df = pd.DataFrame(o['xprt'][host]['SEND'])
return df

def set_delete_df(o, host):
df = pd.DataFrame(o['xprt'][host]['SET_DELETE'])
return df

def stream_publish_df(o, host):
df = pd.DataFrame(o['xprt'][host]['STREAM_PUBLISH'])
return df

def stream_by_stream_df(o, stream_name = None, src = None):
d = o['stream']
if stream_name is not None:
d = d[stream_name]
if src is not None:
d = d[src]
df = pd.DataFrame(d)
return df
99 changes: 97 additions & 2 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,83 @@ const char *ldms_xprt_op_names[] = {
"DIR_REP",
"SEND",
"RECV",
"STREAM_PUBLISH",
"STREAM_SUBSCRIBE",
"STREAM_UNSUBSCRIBE"
};
static char *type_names[];

/* -ENOSYS means that LDMS doesn't support profiling for those operation. */
int __enable_profiling[LDMS_XPRT_OP_COUNT] = {
PROFILING_CFG_DISABLED, /* lookup */
PROFILING_CFG_DISABLED, /* update */
PROFILING_CFG_UNSUPPORTED, /* Publish */
PROFILING_CFG_DISABLED, /* set_delete */
PROFILING_CFG_UNSUPPORTED, /* dir_req */
PROFILING_CFG_UNSUPPORTED, /* dir_rep */
PROFILING_CFG_DISABLED, /* send */
PROFILING_CFG_UNSUPPORTED, /* receive */
PROFILING_CFG_DISABLED, /* stream_publish */
PROFILING_CFG_UNSUPPORTED, /* stream_subscribe */
PROFILING_CFG_UNSUPPORTED /* stream_unsubscribe */
};

int ldms_profiling_enable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err)
{
int i;
int rc = 0;

if (ops_cnt < 0) {
for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) {
if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED)
__enable_profiling[i] = PROFILING_CFG_ENABLED;
}
} else {
if (ops_err)
bzero(ops_err, sizeof(int) * ops_cnt);
for (i = 0; i < ops_cnt; i++) {
if (ops[i] >= LDMS_XPRT_OP_COUNT) {
ops_err[i] = EINVAL;
continue;
}
if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) {
rc = -1;
if (ops_err)
ops_err[i] = ENOSYS;
} else {
__enable_profiling[ops[i]] = PROFILING_CFG_ENABLED;
}
}
}
return rc;
}

int ldms_profiling_disable(int ops_cnt, enum ldms_xprt_ops_e *ops, int *ops_err)
{
int i;
int rc = 0;

if (ops_cnt < 0) {
for (i = 0; i < LDMS_XPRT_OP_COUNT; i++) {
if (__enable_profiling[i] != PROFILING_CFG_UNSUPPORTED)
__enable_profiling[i] = PROFILING_CFG_DISABLED;
}
} else {
if (ops_err)
bzero(ops_err, sizeof(int) * ops_cnt);
for (i = 0; i < ops_cnt; i++) {
if (__enable_profiling[ops[i]] == PROFILING_CFG_UNSUPPORTED) {
rc = -1;
if (ops_err)
ops_err[i] = ENOSYS;
} else {
__enable_profiling[ops[i]] = PROFILING_CFG_DISABLED;
}
}
}
return rc;
}

static struct ldms_digest_s null_digest;

/* This function is useful for displaying data structures stored in
Expand Down Expand Up @@ -739,7 +813,8 @@ static void sync_update_cb(ldms_t x, ldms_set_t s, int status, void *arg)
sem_post(&x->sem);
}

int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void *arg)
int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb,
void *arg, struct ldms_op_ctxt *op_ctxt)
{
ldms_t xprt = ldms_xprt_get(x);
int rc;
Expand Down Expand Up @@ -777,6 +852,21 @@ int __ldms_xprt_update(ldms_t x, struct ldms_set *set, ldms_update_cb_t cb, void
ldms_t __ldms_xprt_to_rail(ldms_t x);
int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg)
{
int rc;
struct ldms_op_ctxt *op_ctxt = NULL;

assert(set);

if (set->curr_updt_ctxt)
return EBUSY;

if (ENABLED_PROFILING(LDMS_XPRT_OP_UPDATE)) {
op_ctxt = calloc(1, sizeof(*op_ctxt));
if (!op_ctxt)
return ENOMEM;
op_ctxt->op_type = LDMS_XPRT_OP_UPDATE;
(void)clock_gettime(CLOCK_REALTIME, &(op_ctxt->update_profile.app_req_ts));
}
/*
* We convert the transport handle to a rail handle using
* __ldms_xprt_to_rail() and pass it to x->ops.update().
Expand All @@ -786,7 +876,12 @@ int ldms_xprt_update(struct ldms_set *set, ldms_update_cb_t cb, void *arg)
* when the update completes.
*/
ldms_t r = __ldms_xprt_to_rail(set->xprt);
return r->ops.update(r, set, cb, arg);
rc = r->ops.update(r, set, cb, arg, op_ctxt);
if (rc) {
set->curr_updt_ctxt = NULL;
free(op_ctxt);
}
return rc;
}

void __ldms_set_on_xprt_term(ldms_set_t set, ldms_t xprt)
Expand Down
Loading

0 comments on commit 5160ffc

Please sign in to comment.