Skip to content

Commit

Permalink
Store parent workflow execution in visibility
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Nov 15, 2023
1 parent 82f103d commit b0a075f
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 47 deletions.
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlplugin/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type (
Encoding string
TaskQueue string
SearchAttributes *VisibilitySearchAttributes
ParentWorkflowID *string
ParentRunID *string
}

// VisibilitySelectFilter contains the column names within executions_visibility table that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type (
Memo *commonpb.Memo
TaskQueue string
SearchAttributes *commonpb.SearchAttributes
ParentExecution *commonpb.WorkflowExecution
}

// RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,10 @@ func (s *visibilityStore) serializePageToken(token *visibilityPageToken) ([]byte
return data, nil
}

func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequestBase, visibilityTaskKey string) (map[string]interface{}, error) {
func (s *visibilityStore) generateESDoc(
request *store.InternalVisibilityRequestBase,
visibilityTaskKey string,
) (map[string]interface{}, error) {
doc := map[string]interface{}{
searchattribute.VisibilityTaskKey: visibilityTaskKey,
searchattribute.NamespaceID: request.NamespaceID,
Expand All @@ -1023,6 +1026,13 @@ func (s *visibilityStore) generateESDoc(request *store.InternalVisibilityRequest
searchattribute.TaskQueue: request.TaskQueue,
}

if request.ParentWorkflowID != nil {
doc[searchattribute.ParentWorkflowID] = *request.ParentWorkflowID
}
if request.ParentRunID != nil {
doc[searchattribute.ParentRunID] = *request.ParentRunID
}

if len(request.Memo.GetData()) > 0 {
doc[searchattribute.Memo] = request.Memo.GetData()
doc[searchattribute.MemoEncoding] = request.Memo.GetEncodingType().String()
Expand Down Expand Up @@ -1143,6 +1153,10 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa
record.StateTransitionCount = fieldValueParsed.(int64)
case searchattribute.HistorySizeBytes:
record.HistorySizeBytes = fieldValueParsed.(int64)
case searchattribute.ParentWorkflowID:
record.ParentWorkflowID = fieldValueParsed.(string)
case searchattribute.ParentRunID:
record.ParentRunID = fieldValueParsed.(string)
default:
// All custom and predefined search attributes are handled here.
if customSearchAttributes == nil {
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/visibility/store/sql/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,8 @@ func (s *VisibilityStore) generateVisibilityRow(
Encoding: request.Memo.EncodingType.String(),
TaskQueue: request.TaskQueue,
SearchAttributes: searchAttributes,
ParentWorkflowID: request.ParentWorkflowID,
ParentRunID: request.ParentRunID,
}, nil
}

Expand Down Expand Up @@ -622,6 +624,12 @@ func (s *VisibilityStore) rowToInfo(
if row.StateTransitionCount != nil {
info.StateTransitionCount = *row.StateTransitionCount
}
if row.ParentWorkflowID != nil {
info.ParentWorkflowID = *row.ParentWorkflowID
}
if row.ParentRunID != nil {
info.ParentRunID = *row.ParentRunID
}
return info, nil
}

Expand Down
4 changes: 4 additions & 0 deletions common/persistence/visibility/store/visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type (
Memo *commonpb.DataBlob
TaskQueue string
SearchAttributes *commonpb.SearchAttributes
ParentWorkflowID string
ParentRunID string
}

// InternalListWorkflowExecutionsResponse is response from ListWorkflowExecutions
Expand Down Expand Up @@ -114,6 +116,8 @@ type (
Memo *commonpb.DataBlob
TaskQueue string
SearchAttributes *commonpb.SearchAttributes
ParentWorkflowID *string
ParentRunID *string
}

// InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted
Expand Down
30 changes: 27 additions & 3 deletions common/persistence/visibility/visibility_manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ func (p *visibilityManagerImpl) GetWorkflowExecution(
return &manager.GetWorkflowExecutionResponse{Execution: execution}, err
}

func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(request *manager.VisibilityRequestBase) (*store.InternalVisibilityRequestBase, error) {
func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(
request *manager.VisibilityRequestBase,
) (*store.InternalVisibilityRequestBase, error) {
if request == nil {
return nil, nil
}
Expand All @@ -292,6 +294,15 @@ func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(request *manage
return nil, err
}

var (
parentWorkflowID *string
parentRunID *string
)
if request.ParentExecution != nil {
parentWorkflowID = &request.ParentExecution.WorkflowId
parentRunID = &request.ParentExecution.RunId
}

return &store.InternalVisibilityRequestBase{
NamespaceID: request.NamespaceID.String(),
WorkflowID: request.Execution.GetWorkflowId(),
Expand All @@ -305,10 +316,14 @@ func (p *visibilityManagerImpl) newInternalVisibilityRequestBase(request *manage
TaskQueue: request.TaskQueue,
Memo: memoBlob,
SearchAttributes: request.SearchAttributes,
ParentWorkflowID: parentWorkflowID,
ParentRunID: parentRunID,
}, nil
}

func (p *visibilityManagerImpl) convertInternalListResponse(internalResponse *store.InternalListWorkflowExecutionsResponse) (*manager.ListWorkflowExecutionsResponse, error) {
func (p *visibilityManagerImpl) convertInternalListResponse(
internalResponse *store.InternalListWorkflowExecutionsResponse,
) (*manager.ListWorkflowExecutionsResponse, error) {
if internalResponse == nil {
return nil, nil
}
Expand All @@ -327,7 +342,9 @@ func (p *visibilityManagerImpl) convertInternalListResponse(internalResponse *st
return resp, nil
}

func (p *visibilityManagerImpl) convertInternalWorkflowExecutionInfo(internalExecution *store.InternalWorkflowExecutionInfo) (*workflowpb.WorkflowExecutionInfo, error) {
func (p *visibilityManagerImpl) convertInternalWorkflowExecutionInfo(
internalExecution *store.InternalWorkflowExecutionInfo,
) (*workflowpb.WorkflowExecutionInfo, error) {
if internalExecution == nil {
return nil, nil
}
Expand All @@ -352,6 +369,13 @@ func (p *visibilityManagerImpl) convertInternalWorkflowExecutionInfo(internalExe
Status: internalExecution.Status,
}

if internalExecution.ParentWorkflowID != "" {
executionInfo.ParentExecution = &commonpb.WorkflowExecution{
WorkflowId: internalExecution.ParentWorkflowID,
RunId: internalExecution.ParentRunID,
}
}

// for close records
if internalExecution.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
executionInfo.CloseTime = &internalExecution.CloseTime
Expand Down
12 changes: 9 additions & 3 deletions common/searchattribute/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
BatcherNamespace = "BatcherNamespace"
BatcherUser = "BatcherUser"
HistorySizeBytes = "HistorySizeBytes"
ParentWorkflowID = "ParentWorkflowId"
ParentRunID = "ParentRunId"

TemporalNamespaceDivision = "TemporalNamespaceDivision"

Expand Down Expand Up @@ -83,6 +85,8 @@ var (
ExecutionDuration: enumspb.INDEXED_VALUE_TYPE_INT,
StateTransitionCount: enumspb.INDEXED_VALUE_TYPE_INT,
HistorySizeBytes: enumspb.INDEXED_VALUE_TYPE_INT,
ParentWorkflowID: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
ParentRunID: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}

// predefined are internal search attributes which are passed and stored in SearchAttributes object together with custom search attributes.
Expand All @@ -100,9 +104,9 @@ var (

// reserved are internal field names that can't be used as search attribute names.
reserved = map[string]struct{}{
NamespaceID: {},
MemoEncoding: {},
Memo: {},
NamespaceID: {},
MemoEncoding: {},
Memo: {},
// Used in the Elasticsearch bulk processor, not needed in SQL databases.
VisibilityTaskKey: {},
}
Expand All @@ -123,6 +127,8 @@ var (
StateTransitionCount: "state_transition_count",
Memo: "memo",
MemoEncoding: "encoding",
ParentWorkflowID: "parent_workflow_id",
ParentRunID: "parent_run_id",
}

sqlDbCustomSearchAttributes = map[string]enumspb.IndexedValueType{
Expand Down
9 changes: 9 additions & 0 deletions service/history/visibility_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ func (t *visibilityQueueTaskExecutor) getVisibilityRequestBase(
searchAttributes = getSearchAttributes(copyMapPayload(executionInfo.SearchAttributes))
)

var parentExecution *commonpb.WorkflowExecution
if executionInfo.ParentWorkflowId != "" && executionInfo.ParentRunId != "" {
parentExecution = &commonpb.WorkflowExecution{
WorkflowId: executionInfo.ParentWorkflowId,
RunId: executionInfo.ParentRunId,
}
}

// Data from mutable state used to build VisibilityRequestBase must be deep
// copied to ensure that the mutable state is not accessed after the workflow
// lock is released and that there is no data race.
Expand All @@ -368,6 +376,7 @@ func (t *visibilityQueueTaskExecutor) getVisibilityRequestBase(
Memo: visibilityMemo,
TaskQueue: executionInfo.TaskQueue,
SearchAttributes: searchAttributes,
ParentExecution: parentExecution,
}
}

Expand Down
Loading

0 comments on commit b0a075f

Please sign in to comment.