From 6f433dc745124095d591ec9288d75a0e581f01c2 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 13 Dec 2024 14:13:40 -0800 Subject: [PATCH] IWF-415: Improve activity logging --- service/api/handler.go | 43 ++++++++++------------------- service/common/log/truncate.go | 14 ++++++++++ service/interpreter/activityImpl.go | 9 +++--- 3 files changed, 34 insertions(+), 32 deletions(-) create mode 100644 service/common/log/truncate.go diff --git a/service/api/handler.go b/service/api/handler.go index 92d43575..52fb244f 100644 --- a/service/api/handler.go +++ b/service/api/handler.go @@ -1,7 +1,6 @@ package api import ( - "encoding/json" "github.com/indeedeng/iwf/config" "net/http" @@ -48,7 +47,7 @@ func (h *handler) apiV1WorkflowStart(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowStartPost(c.Request.Context(), req) if errResp != nil { @@ -65,7 +64,7 @@ func (h *handler) apiV1WorkflowWaitForStateCompletion(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowWaitForStateCompletion(c.Request.Context(), req) if errResp != nil { @@ -82,7 +81,7 @@ func (h *handler) apiV1WorkflowSignal(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) errResp := h.svc.ApiV1WorkflowSignalPost(c.Request.Context(), req) if errResp != nil { @@ -99,7 +98,7 @@ func (h *handler) apiV1WorkflowStop(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) errResp := h.svc.ApiV1WorkflowStopPost(c.Request.Context(), req) if errResp != nil { @@ -116,7 +115,7 @@ func (h *handler) apiV1WorkflowInternalDump(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowDumpPost(c.Request.Context(), req) if errResp != nil { @@ -133,7 +132,7 @@ func (h *handler) apiV1WorkflowConfigUpdate(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) errResp := h.svc.ApiV1WorkflowConfigUpdate(c.Request.Context(), req) if errResp != nil { @@ -150,7 +149,7 @@ func (h *handler) apiV1WorkflowTriggerContinueAsNew(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) errResp := h.svc.ApiV1WorkflowTriggerContinueAsNew(c.Request.Context(), req) if errResp != nil { @@ -167,7 +166,7 @@ func (h *handler) apiV1WorkflowSearch(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowSearchPost(c.Request.Context(), req) if errResp != nil { @@ -184,7 +183,7 @@ func (h *handler) apiV1WorkflowRpc(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowRpcPost(c.Request.Context(), req) if errResp != nil { @@ -210,7 +209,7 @@ func (h *handler) apiV1WorkflowGetDataObjects(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowGetQueryAttributesPost(c.Request.Context(), req) if errResp != nil { @@ -227,7 +226,7 @@ func (h *handler) apiV1WorkflowSetDataObjects(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) errResp := h.svc.ApiV1WorkflowSetQueryAttributesPost(c.Request.Context(), req) if errResp != nil { @@ -244,7 +243,7 @@ func (h *handler) apiV1WorkflowGetSearchAttributes(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowGetSearchAttributesPost(c.Request.Context(), req) if errResp != nil { @@ -261,7 +260,7 @@ func (h *handler) apiV1WorkflowSetSearchAttributes(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) errResp := h.svc.ApiV1WorkflowSetSearchAttributesPost(c.Request.Context(), req) if errResp != nil { @@ -286,7 +285,7 @@ func (h *handler) doApiV1WorkflowGetPost(c *gin.Context, waitIfStillRunning bool invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) var resp *iwfidl.WorkflowGetResponse var errResp *errors.ErrorAndStatus @@ -310,7 +309,7 @@ func (h *handler) apiV1WorkflowReset(c *gin.Context) { invalidRequestSchema(c) return } - h.logger.Debug("received API request", tag.Value(h.toJsonLogging(req))) + h.logger.Debug("received API request", tag.Value(log.ToJsonAndTruncateForLogging(req))) resp, errResp := h.svc.ApiV1WorkflowResetPost(c.Request.Context(), req) if errResp != nil { @@ -345,15 +344,3 @@ func invalidRequestSchema(c *gin.Context) { func (h *handler) processError(c *gin.Context, resp *errors.ErrorAndStatus) { c.JSON(resp.StatusCode, resp.Error) } - -func (h *handler) toJsonLogging(req any) string { - str, err := json.Marshal(req) - if len(str) > 1000 { - str = str[0:1000] - } - if err != nil { - h.logger.Error("error when serializing request", tag.Error(err), tag.DefaultValue(req)) - return "" - } - return string(str) -} diff --git a/service/common/log/truncate.go b/service/common/log/truncate.go new file mode 100644 index 00000000..21df6c6d --- /dev/null +++ b/service/common/log/truncate.go @@ -0,0 +1,14 @@ +package log + +import "encoding/json" + +func ToJsonAndTruncateForLogging(req any) string { + str, err := json.Marshal(req) + if len(str) > 1000 { + str = str[0:1000] + } + if err != nil { + return "Error when serializing request: " + err.Error() + } + return string(str) +} diff --git a/service/interpreter/activityImpl.go b/service/interpreter/activityImpl.go index bba26aa3..4b32309d 100644 --- a/service/interpreter/activityImpl.go +++ b/service/interpreter/activityImpl.go @@ -7,6 +7,7 @@ import ( "github.com/indeedeng/iwf/service" "github.com/indeedeng/iwf/service/common/compatibility" "github.com/indeedeng/iwf/service/common/event" + "github.com/indeedeng/iwf/service/common/log" "github.com/indeedeng/iwf/service/common/ptr" "github.com/indeedeng/iwf/service/common/rpc" "github.com/indeedeng/iwf/service/common/urlautofix" @@ -31,7 +32,7 @@ func StateApiWaitUntil( stateApiWaitUntilStartTime := time.Now().UnixMilli() provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) - logger.Info("StateStartActivity", "input", input) + logger.Info("StateStartActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) svcCfg := env.GetSharedConfig() @@ -116,7 +117,7 @@ func StateApiExecute( stateApiExecuteStartTime := time.Now().UnixMilli() provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) - logger.Info("StateDecideActivity", "input", input) + logger.Info("StateDecideActivity", "input", log.ToJsonAndTruncateForLogging(input)) iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl) svcCfg := env.GetSharedConfig() @@ -330,7 +331,7 @@ func DumpWorkflowInternal( ) (*iwfidl.WorkflowDumpResponse, error) { provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) - logger.Info("DumpWorkflowInternal", "input", req) + logger.Info("DumpWorkflowInternal", "input", log.ToJsonAndTruncateForLogging(req)) svcCfg := env.GetSharedConfig() apiAddress := svcCfg.GetApiServiceAddressWithDefault() @@ -359,7 +360,7 @@ func InvokeWorkerRpc( ) (*InvokeRpcActivityOutput, error) { provider := getActivityProviderByType(backendType) logger := provider.GetLogger(ctx) - logger.Info("invoke worker RPC by activity", "input", req) + logger.Info("invoke worker RPC by activity", "input", log.ToJsonAndTruncateForLogging(req)) apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds