From 3b24b0cbcd6c6d0e1291f8ed1fa94f9fc54a7fd7 Mon Sep 17 00:00:00 2001 From: feedmeapples Date: Sat, 11 Feb 2023 02:03:37 -0500 Subject: [PATCH] Support exporting history for replay --- common/defs-flags.go | 183 +++++++++++++++++----------------- common/flags.go | 22 ++-- workflow/workflow_commands.go | 49 +++++++-- 3 files changed, 147 insertions(+), 107 deletions(-) diff --git a/common/defs-flags.go b/common/defs-flags.go index 3808235b..f173bba0 100644 --- a/common/defs-flags.go +++ b/common/defs-flags.go @@ -1,134 +1,135 @@ package common + const ( // Shared flag definitions - FlagEnvDefinition = "Name of the environment to read environmental variables from." - FlagAddrDefinition = "The host and port (formatted as host:port) for the Temporal Frontend Service." - FlagNSAliasDefinition = "Identifies a Namespace in the Temporal Workflow." - FlagMetadataDefinition = "Contains gRPC metadata to send with requests (Format: key=value). Values must be in a valid JSON format." - FlagTLSCertPathDefinition = "Path to x509 certificate." - FlagTLSKeyPathDefinition = "Path to private certificate key." - FlagTLSCaPathDefinition = "Path to server CA certificate." - FlagTLSDisableHVDefinition = "Disables TLS host name verification if already enabled." - FlagTLSServerNameDefinition = "Provides an override for the target TLS server name." + FlagEnvDefinition = "Name of the environment to read environmental variables from." + FlagAddrDefinition = "The host and port (formatted as host:port) for the Temporal Frontend Service." + FlagNSAliasDefinition = "Identifies a Namespace in the Temporal Workflow." + FlagMetadataDefinition = "Contains gRPC metadata to send with requests (Format: key=value). Values must be in a valid JSON format." + FlagTLSCertPathDefinition = "Path to x509 certificate." + FlagTLSKeyPathDefinition = "Path to private certificate key." + FlagTLSCaPathDefinition = "Path to server CA certificate." + FlagTLSDisableHVDefinition = "Disables TLS host name verification if already enabled." + FlagTLSServerNameDefinition = "Provides an override for the target TLS server name." FlagContextTimeoutDefinition = "An optional timeout for the context of an RPC call (in seconds)." - FlagCodecEndpointDefinition = "Endpoint for a remote Codec Server." - FlagCodecAuthDefinition = "Sets the authorization header on requests to the Codec Server." - FlagArchiveDefinition = "List archived Workflow Executions.\nCurrently an experimental feature." + FlagCodecEndpointDefinition = "Endpoint for a remote Codec Server." + FlagCodecAuthDefinition = "Sets the authorization header on requests to the Codec Server." + FlagArchiveDefinition = "List archived Workflow Executions.\nCurrently an experimental feature." // Execution flags - FlagWorkflowId = "Workflow Id" - FlagRunIdDefinition = "Run Id" - FlagJobIDDefinition = "Batch Job Id" + FlagWorkflowId = "Workflow Id" + FlagRunIdDefinition = "Run Id" + FlagJobIDDefinition = "Batch Job Id" FlagScheduleIDDefinition = "Schedule Id" // ShowWorkflow flags - FlagOutputFilenameDefinition = "Serializes Event History to a file." - FlagMaxFieldLengthDefinition = "Maximum length for each attribute field." + FlagOutputFilenameDefinition = "Serializes Event History to a file." + FlagMaxFieldLengthDefinition = "Maximum length for each attribute field." FlagResetPointsOnlyDefinition = "Only show Workflow Events that are eligible for reset." - FlagFollowAliasDefinition = "Follow the progress of a Workflow Execution." + FlagFollowAliasDefinition = "Follow the progress of a Workflow Execution." + FlagExportPathDefinition = "Export Workflow Execution history suited for replay." // StartWorkflow flags - FlagWFTypeDefinition = "Workflow type name." - FlagTaskQueueDefinition = "Task Queue" - FlagWorkflowRunTimeoutDefinition = "Timeout (in seconds) of a single Workflow run." + FlagWFTypeDefinition = "Workflow type name." + FlagTaskQueueDefinition = "Task Queue" + FlagWorkflowRunTimeoutDefinition = "Timeout (in seconds) of a single Workflow run." FlagWorkflowExecutionTimeoutDefinition = "Timeout (in seconds) for a WorkflowExecution, including retries and continue-as-new tasks." - FlagWorkflowTaskTimeoutDefinition = "Start-to-close timeout for a Workflow Task (in seconds)." - FlagCronScheduleDefinition = "Optional Cron Schedule for the Workflow. Cron spec is formatted as: \n" + - "\t┌───────────── minute (0 - 59) \n" + - "\t│ ┌───────────── hour (0 - 23) \n" + - "\t│ │ ┌───────────── day of the month (1 - 31) \n" + - "\t│ │ │ ┌───────────── month (1 - 12) \n" + - "\t│ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday) \n" + - "\t│ │ │ │ │ \n" + - "\t* * * * *" + FlagWorkflowTaskTimeoutDefinition = "Start-to-close timeout for a Workflow Task (in seconds)." + FlagCronScheduleDefinition = "Optional Cron Schedule for the Workflow. Cron spec is formatted as: \n" + + "\t┌───────────── minute (0 - 59) \n" + + "\t│ ┌───────────── hour (0 - 23) \n" + + "\t│ │ ┌───────────── day of the month (1 - 31) \n" + + "\t│ │ │ ┌───────────── month (1 - 12) \n" + + "\t│ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday) \n" + + "\t│ │ │ │ │ \n" + + "\t* * * * *" FlagWorkflowIdReusePolicyDefinition = "Allows the same Workflow Id to be used in a new Workflow Execution. Options: AllowDuplicate, AllowDuplicateFailedOnly, RejectDuplicate, TerminateIfRunning." - FlagInputDefinition = "Optional JSON input to provide to the Workflow. Pass \"null\" for null values." - FlagInputFileDefinition = "Passes optional input for the Workflow from a JSON file. If there are multiple JSON files, concatenate them and separate by space or newline. Input from the command line will overwrite file input." - FlagSearchAttributeDefinition = "Passes Search Attribute in key=value format. Use valid JSON formats for value." - FlagMemoDefinition = "Passes a memo in key=value format.\nUse valid JSON formats for value." - FlagMemoFileDefinition = "Passes a memo as file input, with each line following key=value format. Use valid JSON formats for value." + FlagInputDefinition = "Optional JSON input to provide to the Workflow. Pass \"null\" for null values." + FlagInputFileDefinition = "Passes optional input for the Workflow from a JSON file. If there are multiple JSON files, concatenate them and separate by space or newline. Input from the command line will overwrite file input." + FlagSearchAttributeDefinition = "Passes Search Attribute in key=value format. Use valid JSON formats for value." + FlagMemoDefinition = "Passes a memo in key=value format.\nUse valid JSON formats for value." + FlagMemoFileDefinition = "Passes a memo as file input, with each line following key=value format. Use valid JSON formats for value." // Other Workflow flags - FlagResetPointsUsage = "Only show auto-reset points." - FlagPrintRawUsage = "Print properties as they are stored." - QueryFlagTypeUsage = "The Query type you want to run." - FlagWorkflowSignalUsage = "Signal Workflow Execution by Id." - FlagQueryDefinition = "Signal Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/." - FlagSignalName = "Signal Name" - FlagInputSignal = "Input for the Signal (JSON)." - FlagInputFileSignal = "Input for the Signal from file (JSON)." - FlagCancelWorkflow = "Cancel Workflow Execution by Id." - FlagWorkflowIDTerminate = "Terminate Workflow Execution by Id." - FlagQueryTerminate = "Terminate Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/." - FlagEventIDDefinition = "The Event Id for any Event after WorkflowTaskStarted you want to reset to (exclusive). It can be WorkflowTaskCompleted, WorkflowTaskFailed or others." - FlagQueryResetBatch = "Visibility Query of Search Attributes describing the Workflow Executions to reset. See https://docs.temporal.io/docs/tctl/workflow/list#--query." - FlagInputFileReset = "Input file that specifies Workflow Executions to reset. Each line contains one Workflow Id as the base Run and, optionally, a Run Id." - FlagExcludeFileDefinition = "Input file that specifies Workflow Executions to exclude from resetting." - FlagInputSeparatorDefinition = "Separator for the input file. The default is a tab (\t)." - FlagParallelismDefinition = "Number of goroutines to run in parallel. Each goroutine processes one line for every second." - FlagSkipCurrentOpenDefinition = "Skip a Workflow Execution if the current Run is open for the same Workflow Id as the base Run." - FlagSkipBaseDefinition = "Skip a Workflow Execution if the base Run is not the current Run." + FlagResetPointsUsage = "Only show auto-reset points." + FlagPrintRawUsage = "Print properties as they are stored." + QueryFlagTypeUsage = "The Query type you want to run." + FlagWorkflowSignalUsage = "Signal Workflow Execution by Id." + FlagQueryDefinition = "Signal Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/." + FlagSignalName = "Signal Name" + FlagInputSignal = "Input for the Signal (JSON)." + FlagInputFileSignal = "Input for the Signal from file (JSON)." + FlagCancelWorkflow = "Cancel Workflow Execution by Id." + FlagWorkflowIDTerminate = "Terminate Workflow Execution by Id." + FlagQueryTerminate = "Terminate Workflow Executions by List Filter. See https://docs.temporal.io/concepts/what-is-a-list-filter/." + FlagEventIDDefinition = "The Event Id for any Event after WorkflowTaskStarted you want to reset to (exclusive). It can be WorkflowTaskCompleted, WorkflowTaskFailed or others." + FlagQueryResetBatch = "Visibility Query of Search Attributes describing the Workflow Executions to reset. See https://docs.temporal.io/docs/tctl/workflow/list#--query." + FlagInputFileReset = "Input file that specifies Workflow Executions to reset. Each line contains one Workflow Id as the base Run and, optionally, a Run Id." + FlagExcludeFileDefinition = "Input file that specifies Workflow Executions to exclude from resetting." + FlagInputSeparatorDefinition = "Separator for the input file. The default is a tab (\t)." + FlagParallelismDefinition = "Number of goroutines to run in parallel. Each goroutine processes one line for every second." + FlagSkipCurrentOpenDefinition = "Skip a Workflow Execution if the current Run is open for the same Workflow Id as the base Run." + FlagSkipBaseDefinition = "Skip a Workflow Execution if the base Run is not the current Run." FlagNonDeterministicDefinition = "Reset Workflow Execution only if its last Event is WorkflowTaskFailed with a nondeterministic error." - FlagDryRunDefinition = "Simulate reset without resetting any Workflow Executions." - FlagDepthDefinition = "Number of Child Workflows to expand, -1 to expand all Child Workflows." - FlagConcurrencyDefinition = "Request concurrency." - FlagNoFoldDefinition = "Disable folding. All Child Workflows within the set depth will be fetched and displayed." - + FlagDryRunDefinition = "Simulate reset without resetting any Workflow Executions." + FlagDepthDefinition = "Number of Child Workflows to expand, -1 to expand all Child Workflows." + FlagConcurrencyDefinition = "Request concurrency." + FlagNoFoldDefinition = "Disable folding. All Child Workflows within the set depth will be fetched and displayed." // Stack trace query flag definitions - FlagInputSTQDefinition = "Optional Query input, in JSON format.\nFor multiple parameters, concatenate them and separate by space." - FlagInputFileSTQDefinition = "Passes optional Query input from a JSON file.\nIf there are multiple JSON, concatenate them and separate by space or newline.\n" + "Input from the command line will overwrite file input." + FlagInputSTQDefinition = "Optional Query input, in JSON format.\nFor multiple parameters, concatenate them and separate by space." + FlagInputFileSTQDefinition = "Passes optional Query input from a JSON file.\nIf there are multiple JSON, concatenate them and separate by space or newline.\n" + "Input from the command line will overwrite file input." FlagQueryRejectConditionDefinition = "Optional flag for rejecting Queries based on Workflow state. Valid values are \"not_open\" and \"not_completed_cleanly\"." // Pagination flag definitions - FlagLimitDefinition = "Number of items to print." - FlagPagerDefinition = "Sets the pager for Temporal CLI to use. Options: less, more, favoritePager." + FlagLimitDefinition = "Number of items to print." + FlagPagerDefinition = "Sets the pager for Temporal CLI to use. Options: less, more, favoritePager." FlagNoPagerDefinition = "Disables the interactive pager." - FlagFieldsDefinition = "Customize fields to print. Set to 'long' to automatically print more of main fields." + FlagFieldsDefinition = "Customize fields to print. Set to 'long' to automatically print more of main fields." // Activity flag definitions FlagWorkflowIDDefinition = "Identifies the Workflow that the Activity is running on." - FlagRunIDDefinition = "Identifies the current Workflow Run." + FlagRunIDDefinition = "Identifies the current Workflow Run." FlagActivityIDDefinition = "Identifies the Activity Execution." - FlagResultDefinition = "Set the result value of Activity completion." - FlagIdentityDefinition = "Specify operator's identity." + FlagResultDefinition = "Set the result value of Activity completion." + FlagIdentityDefinition = "Specify operator's identity." FlagDetailDefinition = "Detail to fail the Activity." FlagReasonDefinition = "Reason to perform a given operation on the Cluster." // Cluster flag definition - FlagClusterAddressDefinition = "Frontend address of the remote Cluster." + FlagClusterAddressDefinition = "Frontend address of the remote Cluster." FlagClusterEnableConnectionDefinition = "Enable cross-cluster connection." - FlagNameDefinition = "Frontend address of the remote Cluster." + FlagNameDefinition = "Frontend address of the remote Cluster." // Schedule flag definition - FlagOverlapPolicyDefinition = "Overlap policy: Skip, BufferOne, BufferAll, CancelOther, TerminateOther, AllowAll." - FlagCalenderDefinition = `Calendar specification in JSON ({"dayOfWeek":"Fri","hour":"17","minute":"5"}) or as a Cron string ("30 2 * * 5" or "@daily").` - FlagIntervalDefinition = "Interval duration, e.g. 90m, or 90m/13m to include phase offset." - FlagStartTimeDefinition = "Overall schedule start time." - FlagEndTimeDefinition = "Overall schedule end time." - FlagJitterDefinition = "Jitter duration." - FlagTimeZoneDefinition = "Time zone (IANA name)." - FlagNotesDefinition = "Initial value of notes field." - FlagPauseDefinition = "Initial value of paused state." - FlagRemainingActionsDefinition = "Total number of actions allowed." - FlagCatchupWindowDefinition = "Maximum allowed catch-up time if server is down." - FlagPauseOnFailureDefinition = "Pause schedule after any workflow failure." + FlagOverlapPolicyDefinition = "Overlap policy: Skip, BufferOne, BufferAll, CancelOther, TerminateOther, AllowAll." + FlagCalenderDefinition = `Calendar specification in JSON ({"dayOfWeek":"Fri","hour":"17","minute":"5"}) or as a Cron string ("30 2 * * 5" or "@daily").` + FlagIntervalDefinition = "Interval duration, e.g. 90m, or 90m/13m to include phase offset." + FlagStartTimeDefinition = "Overall schedule start time." + FlagEndTimeDefinition = "Overall schedule end time." + FlagJitterDefinition = "Jitter duration." + FlagTimeZoneDefinition = "Time zone (IANA name)." + FlagNotesDefinition = "Initial value of notes field." + FlagPauseDefinition = "Initial value of paused state." + FlagRemainingActionsDefinition = "Total number of actions allowed." + FlagCatchupWindowDefinition = "Maximum allowed catch-up time if server is down." + FlagPauseOnFailureDefinition = "Pause schedule after any workflow failure." FlagSearchAttributeScheduleDefinition = "Set Search Attribute on a schedule. Format: key=value. Use valid JSON formats for value." - FlagMemoScheduleDefinition = "Set a memo on a schedule. Format: key=value. Use valid JSON formats for value." - FlagMemoFileScheduleDefinition = "Set a memo from a file. Each line should follow the format key=value. Use valid JSON formats for value." - FlagPauseScheduleDefinition = "Pauses the Schedule." - FlagUnpauseDefinition = "Unpauses the Schedule." - FlagBackfillStartTime = "Backfill start time." - FlagBackfillEndTime = "Backfill end time." - FlagPrintRawDefinition = "Print raw data as json (prefer this over -o json for scripting)." + FlagMemoScheduleDefinition = "Set a memo on a schedule. Format: key=value. Use valid JSON formats for value." + FlagMemoFileScheduleDefinition = "Set a memo from a file. Each line should follow the format key=value. Use valid JSON formats for value." + FlagPauseScheduleDefinition = "Pauses the Schedule." + FlagUnpauseDefinition = "Unpauses the Schedule." + FlagBackfillStartTime = "Backfill start time." + FlagBackfillEndTime = "Backfill end time." + FlagPrintRawDefinition = "Print raw data as json (prefer this over -o json for scripting)." // Search Attribute flags FlagNameSearchAttribute = "Search Attribute name." - FlagYesDefinition = "Confirm all prompts." + FlagYesDefinition = "Confirm all prompts." // Task Queue flags - FlagTaskQueueName = "Task Queue name." + FlagTaskQueueName = "Task Queue name." FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]" -) \ No newline at end of file +) diff --git a/common/flags.go b/common/flags.go index a2e25da8..dab96a64 100644 --- a/common/flags.go +++ b/common/flags.go @@ -45,6 +45,7 @@ var ( FlagEnv = "env" FlagEventID = "event-id" FlagExcludeFile = "exclude-file" + FlagExportPath = "export-path" FlagFold = "fold" FlagFollowAlias = []string{"f"} FlagHeadless = "headless" @@ -263,6 +264,11 @@ var FlagsForShowWorkflow = []cli.Flag{ Value: false, Category: CategoryMain, }, + &cli.StringFlag{ + Name: FlagExportPath, + Usage: FlagExportPathDefinition, + Category: CategoryMain, + }, } var FlagsForStartWorkflow = append(FlagsForStartWorkflowT, @@ -312,13 +318,13 @@ var FlagsForStartWorkflowT = []cli.Flag{ Category: CategoryMain, }, &cli.StringFlag{ - Name: FlagCronSchedule, - Usage: FlagCronScheduleDefinition, + Name: FlagCronSchedule, + Usage: FlagCronScheduleDefinition, Category: CategoryMain, }, &cli.StringFlag{ - Name: FlagWorkflowIDReusePolicy, - Usage: FlagWorkflowIdReusePolicyDefinition, + Name: FlagWorkflowIDReusePolicy, + Usage: FlagWorkflowIdReusePolicyDefinition, Category: CategoryMain, }, &cli.StringSliceFlag{ @@ -328,8 +334,8 @@ var FlagsForStartWorkflowT = []cli.Flag{ Category: CategoryMain, }, &cli.StringFlag{ - Name: FlagInputFile, - Usage: FlagInputFileDefinition, + Name: FlagInputFile, + Usage: FlagInputFileDefinition, Category: CategoryMain, }, &cli.IntFlag{ @@ -376,8 +382,8 @@ var FlagsForStackTraceQuery = append(FlagsForExecution, []cli.Flag{ Category: CategoryMain, }, &cli.StringFlag{ - Name: FlagInputFile, - Usage: FlagInputFileSTQDefinition, + Name: FlagInputFile, + Usage: FlagInputFileSTQDefinition, Category: CategoryMain, }, &cli.StringFlag{ diff --git a/workflow/workflow_commands.go b/workflow/workflow_commands.go index 78b40515..0bd07eaf 100644 --- a/workflow/workflow_commands.go +++ b/workflow/workflow_commands.go @@ -20,6 +20,7 @@ import ( "github.com/temporalio/cli/common/stringify" "github.com/temporalio/cli/dataconverter" "github.com/temporalio/tctl-kit/pkg/color" + "github.com/temporalio/tctl-kit/pkg/iterator" "github.com/temporalio/tctl-kit/pkg/output" "github.com/temporalio/tctl-kit/pkg/pager" "github.com/urfave/cli/v2" @@ -35,6 +36,7 @@ import ( clispb "go.temporal.io/server/api/cli/v1" scommon "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/codec" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/primitives/timestamp" @@ -166,7 +168,7 @@ func StartWorkflow(c *cli.Context, printProgress bool) error { } if printProgress { - return printWorkflowProgress(c, wid, resp.GetRunID(), true) + return printWorkflowProgress(c, wid, resp.GetRunID(), true, "") } return nil @@ -268,7 +270,7 @@ func (h *historyIterator) Next() (interface{}, error) { } // helper function to print workflow progress with time refresh every second -func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error { +func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool, exportPath string) error { isJSON := false if c.IsSet(output.FlagOutput) { outputFlag := c.String(output.FlagOutput) @@ -292,7 +294,7 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error { fmt.Println(color.Magenta(c, "Progress:")) } - var lastEvent historypb.HistoryEvent // used for print result of this run + var lastEvent historypb.HistoryEvent po := &output.PrintOptions{ Fields: []string{"ID", "Time", "Type"}, @@ -301,9 +303,15 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error { } errChan := make(chan error) go func() { - hIter := sdkClient.GetWorkflowHistory(tcCtx, wid, rid, watch, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - iter := &historyIterator{iter: hIter, maxFieldLength: maxFieldLength, lastEvent: &lastEvent} - err = output.PrintIterator(c, iter, po) + iter := sdkClient.GetWorkflowHistory(tcCtx, wid, rid, watch, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + hIter := &historyIterator{iter: iter, maxFieldLength: maxFieldLength, lastEvent: &lastEvent} + + if exportPath != "" { + err = exportHistory(hIter, exportPath) + } else { + err = output.PrintIterator(c, hIter, po) + } + if err != nil { errChan <- err return @@ -343,6 +351,31 @@ func printWorkflowProgress(c *cli.Context, wid, rid string, watch bool) error { } } +func exportHistory(iter iterator.Iterator, exportPath string) error { + var events []*historypb.HistoryEvent + for iter.HasNext() { + event, err := iter.Next() + if err != nil { + return err + + } + events = append(events, event.(*historypb.HistoryEvent)) + } + + history := &historypb.History{} + history.Events = events + serializer := codec.NewJSONPBIndentEncoder(" ") + data, err := serializer.Encode(history) + if err != nil { + return err + } + if err := os.WriteFile(exportPath, data, 0666); err != nil { + return err + } + + return nil +} + func TerminateWorkflow(c *cli.Context) error { if c.String(common.FlagQuery) != "" { return batch.BatchTerminate(c) @@ -800,10 +833,10 @@ func printRunStatus(c *cli.Context, event *historypb.HistoryEvent) { func ShowHistory(c *cli.Context) error { wid := c.String(common.FlagWorkflowID) rid := c.String(common.FlagRunID) - + export := c.String(common.FlagExportPath) follow := c.Bool(output.FlagFollow) - return printWorkflowProgress(c, wid, rid, follow) + return printWorkflowProgress(c, wid, rid, follow, export) } // ResetWorkflow reset workflow