diff --git a/common/persistence/sql/sqlplugin/visibility.go b/common/persistence/sql/sqlplugin/visibility.go index 9a41a8e3493..4616519d086 100644 --- a/common/persistence/sql/sqlplugin/visibility.go +++ b/common/persistence/sql/sqlplugin/visibility.go @@ -64,6 +64,8 @@ type ( Encoding string TaskQueue string SearchAttributes *VisibilitySearchAttributes + ParentWorkflowID *string + ParentRunID *string } // VisibilitySelectFilter contains the column names within executions_visibility table that diff --git a/common/persistence/visibility/manager/visibility_manager.go b/common/persistence/visibility/manager/visibility_manager.go index 85359a0ec23..6045374df4c 100644 --- a/common/persistence/visibility/manager/visibility_manager.go +++ b/common/persistence/visibility/manager/visibility_manager.go @@ -83,6 +83,7 @@ type ( Memo *commonpb.Memo TaskQueue string SearchAttributes *commonpb.SearchAttributes + ParentExecution *commonpb.WorkflowExecution } // RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 173224826f3..bf8f097b125 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -1010,7 +1010,10 @@ func (s *visibilityStore) serializePageToken(token *visibilityPageToken) ([]byte return data, nil } -func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequestBase, visibilityTaskKey string) (map[string]interface{}, error) { +func (s *visibilityStore) generateESDoc( + request *store.InternalVisibilityRequestBase, + visibilityTaskKey string, +) (map[string]interface{}, error) { doc := map[string]interface{}{ searchattribute.VisibilityTaskKey: visibilityTaskKey, searchattribute.NamespaceID: request.NamespaceID, @@ -1023,6 +1026,13 @@ func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequest searchattribute.TaskQueue: request.TaskQueue, } + if request.ParentWorkflowID != nil { + doc[searchattribute.ParentWorkflowID] = *request.ParentWorkflowID + } + if request.ParentRunID != nil { + doc[searchattribute.ParentRunID] = *request.ParentRunID + } + if len(request.Memo.GetData()) > 0 { doc[searchattribute.Memo] = request.Memo.GetData() doc[searchattribute.MemoEncoding] = request.Memo.GetEncodingType().String() @@ -1143,6 +1153,10 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa record.StateTransitionCount = fieldValueParsed.(int64) case searchattribute.HistorySizeBytes: record.HistorySizeBytes = fieldValueParsed.(int64) + case searchattribute.ParentWorkflowID: + record.ParentWorkflowID = fieldValueParsed.(string) + case searchattribute.ParentRunID: + record.ParentRunID = fieldValueParsed.(string) default: // All custom and predefined search attributes are handled here. if customSearchAttributes == nil { diff --git a/common/persistence/visibility/store/sql/visibility_store.go b/common/persistence/visibility/store/sql/visibility_store.go index 5178d48307f..2189346fb29 100644 --- a/common/persistence/visibility/store/sql/visibility_store.go +++ b/common/persistence/visibility/store/sql/visibility_store.go @@ -535,6 +535,8 @@ func (s *VisibilityStore) generateVisibilityRow( Encoding: request.Memo.EncodingType.String(), TaskQueue: request.TaskQueue, SearchAttributes: searchAttributes, + ParentWorkflowID: request.ParentWorkflowID, + ParentRunID: request.ParentRunID, }, nil } @@ -622,6 +624,12 @@ func (s *VisibilityStore) rowToInfo( if row.StateTransitionCount != nil { info.StateTransitionCount = *row.StateTransitionCount } + if row.ParentWorkflowID != nil { + info.ParentWorkflowID = *row.ParentWorkflowID + } + if row.ParentRunID != nil { + info.ParentRunID = *row.ParentRunID + } return info, nil } diff --git a/common/persistence/visibility/store/visibility_store.go b/common/persistence/visibility/store/visibility_store.go index 7850dee8988..29503057165 100644 --- a/common/persistence/visibility/store/visibility_store.go +++ b/common/persistence/visibility/store/visibility_store.go @@ -85,6 +85,8 @@ type ( Memo *commonpb.DataBlob TaskQueue string SearchAttributes *commonpb.SearchAttributes + ParentWorkflowID string + ParentRunID string } // InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions @@ -114,6 +116,8 @@ type ( Memo *commonpb.DataBlob TaskQueue string SearchAttributes *commonpb.SearchAttributes + ParentWorkflowID *string + ParentRunID *string } // InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted diff --git a/common/persistence/visibility/visibility_manager_impl.go b/common/persistence/visibility/visibility_manager_impl.go index ff5ed46f4ce..1ff13687001 100644 --- a/common/persistence/visibility/visibility_manager_impl.go +++ b/common/persistence/visibility/visibility_manager_impl.go @@ -283,7 +283,9 @@ func (p *visibilityManagerImpl) GetWorkflowExecution( return &manager.GetWorkflowExecutionResponse{Execution: execution}, err } -func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(request *manager.VisibilityRequestBase) (*store.InternalVisibilityRequestBase, error) { +func (p *visibilityManagerImpl) newInternalVisibilityRequestBase( + request *manager.VisibilityRequestBase, +) (*store.InternalVisibilityRequestBase, error) { if request == nil { return nil, nil } @@ -292,6 +294,15 @@ func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(request *manage return nil, err } + var ( + parentWorkflowID *string + parentRunID *string + ) + if request.ParentExecution != nil { + parentWorkflowID = &request.ParentExecution.WorkflowId + parentRunID = &request.ParentExecution.RunId + } + return &store.InternalVisibilityRequestBase{ NamespaceID: request.NamespaceID.String(), WorkflowID: request.Execution.GetWorkflowId(), @@ -305,10 +316,14 @@ func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(request *manage TaskQueue: request.TaskQueue, Memo: memoBlob, SearchAttributes: request.SearchAttributes, + ParentWorkflowID: parentWorkflowID, + ParentRunID: parentRunID, }, nil } -func (p *visibilityManagerImpl) convertInternalListResponse(internalResponse *store.InternalListWorkflowExecutionsResponse) (*manager.ListWorkflowExecutionsResponse, error) { +func (p *visibilityManagerImpl) convertInternalListResponse( + internalResponse *store.InternalListWorkflowExecutionsResponse, +) (*manager.ListWorkflowExecutionsResponse, error) { if internalResponse == nil { return nil, nil } @@ -327,7 +342,9 @@ func (p *visibilityManagerImpl) convertInternalListResponse(internalResponse *st return resp, nil } -func (p *visibilityManagerImpl) convertInternalWorkflowExecutionInfo(internalExecution *store.InternalWorkflowExecutionInfo) (*workflowpb.WorkflowExecutionInfo, error) { +func (p *visibilityManagerImpl) convertInternalWorkflowExecutionInfo( + internalExecution *store.InternalWorkflowExecutionInfo, +) (*workflowpb.WorkflowExecutionInfo, error) { if internalExecution == nil { return nil, nil } @@ -352,6 +369,13 @@ func (p *visibilityManagerImpl) convertInternalWorkflowExecutionInfo(internalExe Status: internalExecution.Status, } + if internalExecution.ParentWorkflowID != "" { + executionInfo.ParentExecution = &commonpb.WorkflowExecution{ + WorkflowId: internalExecution.ParentWorkflowID, + RunId: internalExecution.ParentRunID, + } + } + // for close records if internalExecution.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { executionInfo.CloseTime = &internalExecution.CloseTime diff --git a/common/searchattribute/defs.go b/common/searchattribute/defs.go index 2e742d8029b..629feba79a5 100644 --- a/common/searchattribute/defs.go +++ b/common/searchattribute/defs.go @@ -50,6 +50,8 @@ const ( BatcherNamespace = "BatcherNamespace" BatcherUser = "BatcherUser" HistorySizeBytes = "HistorySizeBytes" + ParentWorkflowID = "ParentWorkflowId" + ParentRunID = "ParentRunId" TemporalNamespaceDivision = "TemporalNamespaceDivision" @@ -83,6 +85,8 @@ var ( ExecutionDuration: enumspb.INDEXED_VALUE_TYPE_INT, StateTransitionCount: enumspb.INDEXED_VALUE_TYPE_INT, HistorySizeBytes: enumspb.INDEXED_VALUE_TYPE_INT, + ParentWorkflowID: enumspb.INDEXED_VALUE_TYPE_KEYWORD, + ParentRunID: enumspb.INDEXED_VALUE_TYPE_KEYWORD, } // predefined are internal search attributes which are passed and stored in SearchAttributes object together with custom search attributes. @@ -100,9 +104,9 @@ var ( // reserved are internal field names that can't be used as search attribute names. reserved = map[string]struct{}{ - NamespaceID: {}, - MemoEncoding: {}, - Memo: {}, + NamespaceID: {}, + MemoEncoding: {}, + Memo: {}, // Used in the Elasticsearch bulk processor, not needed in SQL databases. VisibilityTaskKey: {}, } @@ -123,6 +127,8 @@ var ( StateTransitionCount: "state_transition_count", Memo: "memo", MemoEncoding: "encoding", + ParentWorkflowID: "parent_workflow_id", + ParentRunID: "parent_run_id", } sqlDbCustomSearchAttributes = map[string]enumspb.IndexedValueType{ diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index eb537461d43..b04f39760f9 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -349,6 +349,14 @@ func (t *visibilityQueueTaskExecutor) getVisibilityRequestBase( searchAttributes = getSearchAttributes(copyMapPayload(executionInfo.SearchAttributes)) ) + var parentExecution *commonpb.WorkflowExecution + if executionInfo.ParentWorkflowId != "" && executionInfo.ParentRunId != "" { + parentExecution = &commonpb.WorkflowExecution{ + WorkflowId: executionInfo.ParentWorkflowId, + RunId: executionInfo.ParentRunId, + } + } + // Data from mutable state used to build VisibilityRequestBase must be deep // copied to ensure that the mutable state is not accessed after the workflow // lock is released and that there is no data race. @@ -368,6 +376,7 @@ func (t *visibilityQueueTaskExecutor) getVisibilityRequestBase( Memo: visibilityMemo, TaskQueue: executionInfo.TaskQueue, SearchAttributes: searchAttributes, + ParentExecution: parentExecution, } } diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index f92b5931a19..f2f0d22022b 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -34,7 +34,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" - enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" @@ -54,6 +53,8 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/queues" @@ -237,7 +238,22 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessCloseExecution() { persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockVisibilityMgr.EXPECT().RecordWorkflowExecutionClosed(gomock.Any(), gomock.Any()).Return(nil) + s.mockVisibilityMgr.EXPECT().RecordWorkflowExecutionClosed( + gomock.Any(), + s.createRecordWorkflowExecutionClosedRequest( + s.namespace, + visibilityTask, + mutableState, + taskQueueName, + &commonpb.WorkflowExecution{ + WorkflowId: parentExecution.GetWorkflowId(), + RunId: parentExecution.GetRunId(), + }, + map[string]any{ + searchattribute.BuildIds: []string{worker_versioning.UnversionedSearchAttribute}, + }, + ), + ).Return(nil) resp := s.visibilityQueueTaskExecutor.Execute(context.Background(), s.newTaskExecutable(visibilityTask)) s.Nil(resp.ExecutionErr) @@ -307,7 +323,22 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessCloseExecutionWithWorkflow persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockExecutionMgr.EXPECT().SetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.SetWorkflowExecutionResponse{}, nil) - s.mockVisibilityMgr.EXPECT().RecordWorkflowExecutionClosed(gomock.Any(), gomock.Any()).Return(nil) + s.mockVisibilityMgr.EXPECT().RecordWorkflowExecutionClosed( + gomock.Any(), + s.createRecordWorkflowExecutionClosedRequest( + s.namespace, + visibilityTask, + mutableState, + taskQueueName, + &commonpb.WorkflowExecution{ + WorkflowId: parentExecution.GetWorkflowId(), + RunId: parentExecution.GetRunId(), + }, + map[string]any{ + searchattribute.BuildIds: []string{worker_versioning.UnversionedSearchAttribute}, + }, + ), + ).Return(nil) resp := s.visibilityQueueTaskExecutor.Execute(context.Background(), s.newTaskExecutable(visibilityTask)) s.Nil(resp.ExecutionErr) @@ -528,6 +559,42 @@ func (s *visibilityQueueTaskExecutorSuite) execute(task tasks.Task) error { return s.visibilityQueueTaskExecutor.Execute(context.Background(), s.newTaskExecutable(task)).ExecutionErr } +func (s *visibilityQueueTaskExecutorSuite) createVisibilityRequestBase( + namespaceName namespace.Name, + task tasks.Task, + mutableState workflow.MutableState, + taskQueueName string, + parentExecution *commonpb.WorkflowExecution, + searchAttributes map[string]any, +) *manager.VisibilityRequestBase { + encodedSearchAttributes, err := searchattribute.Encode( + searchAttributes, + &searchattribute.NameTypeMap{}, + ) + s.NoError(err) + + execution := &commonpb.WorkflowExecution{ + WorkflowId: task.GetWorkflowID(), + RunId: task.GetRunID(), + } + executionInfo := mutableState.GetExecutionInfo() + + return &manager.VisibilityRequestBase{ + NamespaceID: namespace.ID(task.GetNamespaceID()), + Namespace: namespaceName, + Execution: *execution, + WorkflowTypeName: executionInfo.WorkflowTypeName, + StartTime: timestamp.TimeValue(executionInfo.GetStartTime()), + Status: mutableState.GetExecutionState().GetStatus(), + ExecutionTime: timestamp.TimeValue(executionInfo.GetExecutionTime()), + TaskID: task.GetTaskID(), + ShardID: s.mockShard.GetShardID(), + TaskQueue: taskQueueName, + ParentExecution: parentExecution, + SearchAttributes: encodedSearchAttributes, + } +} + func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionStartedRequest( namespaceName namespace.Name, startEvent *historypb.HistoryEvent, @@ -536,26 +603,15 @@ func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionStartedR backoff time.Duration, taskQueueName string, ) *manager.RecordWorkflowExecutionStartedRequest { - execution := &commonpb.WorkflowExecution{ - WorkflowId: task.WorkflowID, - RunId: task.RunID, - } - executionInfo := mutableState.GetExecutionInfo() - executionTimestamp := timestamp.TimeValue(startEvent.GetEventTime()).Add(backoff) - return &manager.RecordWorkflowExecutionStartedRequest{ - VisibilityRequestBase: &manager.VisibilityRequestBase{ - Namespace: namespaceName, - NamespaceID: namespace.ID(task.NamespaceID), - Execution: *execution, - WorkflowTypeName: executionInfo.WorkflowTypeName, - StartTime: timestamp.TimeValue(startEvent.GetEventTime()), - ExecutionTime: executionTimestamp, - TaskID: task.TaskID, - Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - ShardID: s.mockShard.GetShardID(), - TaskQueue: taskQueueName, - }, + VisibilityRequestBase: s.createVisibilityRequestBase( + namespaceName, + task, + mutableState, + taskQueueName, + nil, + nil, + ), } } @@ -565,25 +621,40 @@ func (s *visibilityQueueTaskExecutorSuite) createUpsertWorkflowRequest( mutableState workflow.MutableState, taskQueueName string, ) *manager.UpsertWorkflowExecutionRequest { - execution := &commonpb.WorkflowExecution{ - WorkflowId: task.WorkflowID, - RunId: task.RunID, + return &manager.UpsertWorkflowExecutionRequest{ + VisibilityRequestBase: s.createVisibilityRequestBase( + namespaceName, + task, + mutableState, + taskQueueName, + nil, + nil, + ), } - executionInfo := mutableState.GetExecutionInfo() +} - return &manager.UpsertWorkflowExecutionRequest{ - VisibilityRequestBase: &manager.VisibilityRequestBase{ - Namespace: namespaceName, - NamespaceID: namespace.ID(task.NamespaceID), - Execution: *execution, - WorkflowTypeName: executionInfo.WorkflowTypeName, - StartTime: timestamp.TimeValue(executionInfo.GetStartTime()), - ExecutionTime: timestamp.TimeValue(executionInfo.GetExecutionTime()), - TaskID: task.TaskID, - Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - TaskQueue: taskQueueName, - ShardID: s.mockShard.GetShardID(), - }, +func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionClosedRequest( + namespaceName namespace.Name, + task *tasks.CloseExecutionVisibilityTask, + mutableState workflow.MutableState, + taskQueueName string, + parentExecution *commonpb.WorkflowExecution, + searchAttributes map[string]any, +) *manager.RecordWorkflowExecutionClosedRequest { + executionInfo := mutableState.GetExecutionInfo() + return &manager.RecordWorkflowExecutionClosedRequest{ + VisibilityRequestBase: s.createVisibilityRequestBase( + namespaceName, + task, + mutableState, + taskQueueName, + parentExecution, + searchAttributes, + ), + CloseTime: timestamp.TimeValue(executionInfo.GetCloseTime()), + HistoryLength: mutableState.GetNextEventID() - 1, + HistorySizeBytes: executionInfo.GetExecutionStats().GetHistorySize(), + StateTransitionCount: executionInfo.GetStateTransitionCount(), } } diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index ea1a58137e2..03a3962f883 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -1190,7 +1190,7 @@ func (s *advancedVisibilitySuite) TestCountGroupByWorkflow() { } } - query := `GROUP BY ExecutionStatus` + query := fmt.Sprintf(`WorkflowType = %q GROUP BY ExecutionStatus`, wt) countRequest := &workflowservice.CountWorkflowExecutionsRequest{ Namespace: s.namespace, Query: query, @@ -1915,6 +1915,94 @@ func (s *advancedVisibilitySuite) TestUpsertWorkflowExecution_InvalidKey() { s.NotNil(failedEventAttr.GetFailure()) } +func (s *advancedVisibilitySuite) TestChildWorkflow_ParentWorkflow() { + var ( + ctx = NewContext() + id = s.randomizeStr(s.T().Name()) + childWfType = "child-wf-type-" + id + wfType = "wf-type-" + id + taskQueue = "task-queue-" + id + ) + + childWf := func(ctx workflow.Context) error { + return nil + } + wf := func(ctx workflow.Context) error { + err := workflow.ExecuteChildWorkflow(ctx, childWfType).Get(ctx, nil) + return err + } + + w := worker.New(s.sdkClient, taskQueue, worker.Options{}) + w.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: wfType}) + w.RegisterWorkflowWithOptions(childWf, workflow.RegisterOptions{Name: childWfType}) + s.Require().NoError(w.Start()) + + startOptions := sdkclient.StartWorkflowOptions{ + ID: id, + TaskQueue: taskQueue, + } + run, err := s.sdkClient.ExecuteWorkflow(ctx, startOptions, wfType) + s.NoError(err) + s.NoError(run.Get(ctx, nil)) + w.Stop() + + // check child workflow has parent workflow + var childWfInfo *workflowpb.WorkflowExecutionInfo + s.Eventually( + func() bool { + resp, err := s.engine.ListWorkflowExecutions( + ctx, + &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: s.namespace, + Query: fmt.Sprintf("WorkflowType = %q", childWfType), + PageSize: defaultPageSize, + }, + ) + if err != nil { + return false + } + if len(resp.Executions) != 1 { + return false + } + childWfInfo = resp.Executions[0] + return true + }, + waitForESToSettle, + 100*time.Millisecond, + ) + s.NotNil(childWfInfo) + parentExecution := childWfInfo.GetParentExecution() + s.NotNil(parentExecution) + s.Equal(id, parentExecution.GetWorkflowId()) + + // check main workflow doesn't have parent workflow + var wfInfo *workflowpb.WorkflowExecutionInfo + s.Eventually( + func() bool { + resp, err := s.engine.ListWorkflowExecutions( + ctx, + &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: s.namespace, + Query: fmt.Sprintf("WorkflowType = %q", wfType), + PageSize: defaultPageSize, + }, + ) + if err != nil { + return false + } + if len(resp.Executions) != 1 { + return false + } + wfInfo = resp.Executions[0] + return true + }, + waitForESToSettle, + 100*time.Millisecond, + ) + s.NotNil(wfInfo) + s.Nil(wfInfo.GetParentExecution()) +} + func (s *advancedVisibilitySuite) Test_LongWorkflowID() { if s.testClusterConfig.Persistence.StoreType == config.StoreTypeSQL { // TODO: remove this when workflow_id field size is increased from varchar(255) in SQL schema.