Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw sync versioned transition task for state based replication #6607

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,131 changes: 580 additions & 551 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ message SyncVersionedTransitionTaskAttributes {
reserved 3;
reserved 4;
VersionedTransitionArtifact versioned_transition_artifact = 5;
string namespace_id = 6;
string workflow_id = 7;
string run_id = 8;
}

message VersionedTransitionArtifact {
Expand Down
14 changes: 10 additions & 4 deletions service/history/ndc/workflow_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func (r *WorkflowStateReplicatorImpl) ReplicateVersionedTransition(
return serviceerror.NewInvalidArgument(fmt.Sprintf("unknown artifact type %T", artifactType))
}
executionState, executionInfo := func() (*persistencespb.WorkflowExecutionState, *persistencespb.WorkflowExecutionInfo) {

if snapshot != nil {
return snapshot.State.ExecutionState, snapshot.State.ExecutionInfo
}
Expand Down Expand Up @@ -384,7 +383,10 @@ func (r *WorkflowStateReplicatorImpl) applyMutation(
if err != nil {
return err
}
// TODO: localMutableState.ApplyMutation
err = localMutableState.ApplyMutation(mutation.StateMutation)
if err != nil {
return err
}

var newRunWorkflow Workflow
if versionedTransition.NewRunInfo != nil {
Expand Down Expand Up @@ -488,7 +490,11 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
if err != nil {
return err
}
// Todo: localMutableState.ApplySnapshot

err = localMutableState.ApplySnapshot(snapshot)
if err != nil {
return err
}

var newRunWorkflow Workflow
if versionedTransition.NewRunInfo != nil {
Expand Down Expand Up @@ -808,7 +814,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
return nil
}
// Fill the gap between local last event and request's first event
if historyEvents[0][0].EventId > startEventID+1 {
if len(historyEvents) > 0 && historyEvents[0][0].EventId > startEventID+1 {
err := fetchFromRemoteAndAppend(localLastItem.EventId, localLastItem.Version, historyEvents[0][0].EventId, historyEvents[0][0].Version)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State)
mockTransactionManager.EXPECT().UpdateWorkflow(gomock.Any(), false, gomock.Any(), nil).Return(nil).Times(1)
mockTaskRefresher.EXPECT().
PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{
Expand Down Expand Up @@ -770,6 +771,7 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_DifferentBra
{NamespaceFailoverVersion: 1, TransitionCount: 13}, // local transition is stale
},
}).AnyTimes()
mockMutableState.EXPECT().ApplySnapshot(versionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes().State)
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()
Expand Down Expand Up @@ -858,6 +860,7 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_SameBranch_S
mockMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().ApplyMutation(versionedTransitionArtifact.GetSyncWorkflowStateMutationAttributes().StateMutation)
mockTransactionManager.EXPECT().UpdateWorkflow(gomock.Any(), false, gomock.Any(), nil).Return(nil).Times(1)
mockTaskRefresher.EXPECT().
PartialRefresh(gomock.Any(), gomock.Any(), EqVersionedTransition(&persistencespb.VersionedTransition{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func NewExecutableSyncVersionedTransitionTask(
taskCreationTime time.Time,
sourceClusterName string,
replicationTask *replicationspb.ReplicationTask,
) *ExecutableVerifyVersionedTransitionTask {
task := replicationTask.GetVerifyVersionedTransitionTaskAttributes()
return &ExecutableVerifyVersionedTransitionTask{
) *ExecutableSyncVersionedTransitionTask {
task := replicationTask.GetSyncVersionedTransitionTaskAttributes()
return &ExecutableSyncVersionedTransitionTask{
ProcessToolBox: processToolBox,

WorkflowKey: definition.NewWorkflowKey(task.NamespaceId, task.WorkflowId, task.RunId),
Expand Down Expand Up @@ -165,7 +165,7 @@ func (e *ExecutableSyncVersionedTransitionTask) HandleErr(err error) error {
}
return e.Execute()
default:
e.Logger.Error("VerifyVersionedTransition replication task encountered error",
e.Logger.Error("Sync Versioned Transition replication task encountered error",
tag.WorkflowNamespaceID(e.NamespaceID),
tag.WorkflowID(e.WorkflowID),
tag.WorkflowRunID(e.RunID),
Expand Down
8 changes: 8 additions & 0 deletions service/history/replication/executable_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ func (e *executableTaskConverterImpl) convertOne(
taskClusterName,
replicationTask,
)
case enumsspb.REPLICATION_TASK_TYPE_SYNC_VERSIONED_TRANSITION_TASK:
return NewExecutableSyncVersionedTransitionTask(
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
taskClusterName,
replicationTask,
)
default:
e.processToolBox.Logger.Error(fmt.Sprintf("unknown replication task: %v", replicationTask))
return NewExecutableUnknownTask(
Expand Down
3 changes: 3 additions & 0 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,9 @@ func (c *syncVersionedTransitionTaskConverter) convert(
Attributes: &replicationspb.ReplicationTask_SyncVersionedTransitionTaskAttributes{
SyncVersionedTransitionTaskAttributes: &replicationspb.SyncVersionedTransitionTaskAttributes{
VersionedTransitionArtifact: result.VersionedTransitionArtifact,
NamespaceId: taskInfo.NamespaceID,
WorkflowId: taskInfo.WorkflowID,
RunId: taskInfo.RunID,
},
},
VersionedTransition: taskInfo.VersionedTransition,
Expand Down
3 changes: 3 additions & 0 deletions service/history/replication/raw_task_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,9 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_Mutation(
},
},
},
NamespaceId: s.namespaceID,
WorkflowId: s.workflowID,
RunId: s.runID,
},
},
VersionedTransition: task.VersionedTransition,
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/sync_state_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (s *SyncStateRetrieverImpl) getMutation(mutableState workflow.MutableState,
SubStateMachineTombstoneBatches: tombstones,
SignalRequestedIds: signalRequestedIds,
ExecutionInfo: mutableStateClone.ExecutionInfo,
ExecutionState: mutableStateClone.ExecutionState,
}, nil
}

Expand Down
25 changes: 22 additions & 3 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,12 +747,31 @@ func (c *ContextImpl) mergeUpdateWithNewReplicationTasks(
newRunBranchToken := newRunTask.BranchToken
newRunID := newRunTask.RunID
taskUpdated := false

updateTask := func(task interface{}) bool {
switch t := task.(type) {
case *tasks.HistoryReplicationTask:
t.NewRunBranchToken = newRunBranchToken
t.NewRunID = newRunID
return true
case *tasks.SyncVersionedTransitionTask:
t.NewRunID = newRunID
for _, subTask := range t.TaskEquivalents {
if historyTask, ok := subTask.(*tasks.HistoryReplicationTask); ok {
historyTask.NewRunBranchToken = newRunBranchToken
historyTask.NewRunID = newRunID
return true
}
}
default:
}
return false
}

for idx := numCurrentReplicationTasks - 1; idx >= 0; idx-- {
replicationTask := currentWorkflowMutation.Tasks[tasks.CategoryReplication][idx]
if task, ok := replicationTask.(*tasks.HistoryReplicationTask); ok {
if updateTask(replicationTask) {
taskUpdated = true
task.NewRunBranchToken = newRunBranchToken
task.NewRunID = newRunID
break
}
}
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ type (
IsWorkflowPendingOnWorkflowTaskBackoff() bool
UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID)
UpdateActivityInfo(*historyservice.ActivitySyncInfo, bool) error
ApplyMutation(mutation *persistencespb.WorkflowMutableStateMutation) error
ApplySnapshot(snapshot *persistencespb.WorkflowMutableState) error
ApplyActivityTaskCancelRequestedEvent(*historypb.HistoryEvent) error
ApplyActivityTaskCanceledEvent(*historypb.HistoryEvent) error
ApplyActivityTaskCompletedEvent(*historypb.HistoryEvent) error
Expand Down
67 changes: 53 additions & 14 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5559,27 +5559,66 @@ func (ms *MutableStateImpl) closeTransactionPrepareReplicationTasks(
eventBatches [][]*historypb.HistoryEvent,
clearBufferEvents bool,
) error {
var replicationTasks []tasks.Task
if ms.config.ReplicationMultipleBatches() {
if err := ms.eventsToReplicationTask(transactionPolicy, eventBatches); err != nil {
task, err := ms.eventsToReplicationTask(transactionPolicy, eventBatches)
if err != nil {
return err
}
replicationTasks = append(replicationTasks, task...)
} else {
for _, historyEvents := range eventBatches {
if err := ms.eventsToReplicationTask(transactionPolicy, [][]*historypb.HistoryEvent{historyEvents}); err != nil {
task, err := ms.eventsToReplicationTask(transactionPolicy, [][]*historypb.HistoryEvent{historyEvents})
if err != nil {
return err
}
replicationTasks = append(replicationTasks, task...)
}
}

ms.InsertTasks[tasks.CategoryReplication] = append(
ms.InsertTasks[tasks.CategoryReplication],
ms.syncActivityToReplicationTask(transactionPolicy)...,
)

ms.InsertTasks[tasks.CategoryReplication] = append(
ms.InsertTasks[tasks.CategoryReplication],
ms.dirtyHSMToReplicationTask(transactionPolicy, eventBatches, clearBufferEvents)...,
)
replicationTasks = append(replicationTasks, ms.syncActivityToReplicationTask(transactionPolicy)...)
replicationTasks = append(replicationTasks, ms.dirtyHSMToReplicationTask(transactionPolicy, eventBatches, clearBufferEvents)...)
var firstEventID, nextEventID int64
if len(eventBatches) > 0 {
firstEventID = eventBatches[0][0].EventId
lastBatch := eventBatches[len(eventBatches)-1]
nextEventID = lastBatch[len(lastBatch)-1].EventId + 1
}
if ms.config.EnableTransitionHistory() {
switch transactionPolicy {
case TransactionPolicyActive:
if ms.generateReplicationTask() {
now := time.Now().UTC()
workflowKey := definition.NewWorkflowKey(
ms.executionInfo.NamespaceId,
ms.executionInfo.WorkflowId,
ms.executionState.RunId,
)
transitionHistory := ms.executionInfo.TransitionHistory
syncVersionedTransitionTask := &tasks.SyncVersionedTransitionTask{
WorkflowKey: workflowKey,
VisibilityTimestamp: now,
Priority: enumsspb.TASK_PRIORITY_HIGH,
VersionedTransition: transitionHistory[len(transitionHistory)-1],
FirstEventID: firstEventID,
NextEventID: nextEventID,
TaskEquivalents: replicationTasks,
}
ms.InsertTasks[tasks.CategoryReplication] = append(
ms.InsertTasks[tasks.CategoryReplication],
syncVersionedTransitionTask,
)
}
case TransactionPolicyPassive:
default:
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
}
} else {
ms.InsertTasks[tasks.CategoryReplication] = append(
ms.InsertTasks[tasks.CategoryReplication],
replicationTasks...,
)
}

if transactionPolicy == TransactionPolicyPassive &&
len(ms.InsertTasks[tasks.CategoryReplication]) > 0 {
Expand Down Expand Up @@ -5689,15 +5728,15 @@ func (ms *MutableStateImpl) closeTransactionPrepareEvents(
func (ms *MutableStateImpl) eventsToReplicationTask(
transactionPolicy TransactionPolicy,
eventBatches [][]*historypb.HistoryEvent,
) error {
) ([]tasks.Task, error) {
switch transactionPolicy {
case TransactionPolicyActive:
if ms.generateReplicationTask() {
return ms.taskGenerator.GenerateHistoryReplicationTasks(eventBatches)
}
return nil
return nil, nil
case TransactionPolicyPassive:
return nil
return nil, nil
default:
panic(fmt.Sprintf("unknown transaction policy: %v", transactionPolicy))
}
Expand Down
2 changes: 2 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,7 @@ func (s *mutableStateSuite) TestCloseTransactionPrepareReplicationTasks_HistoryT
},
},
}
s.mockConfig.EnableTransitionHistory = func() bool { return false }

ms := s.mutableState

Expand Down Expand Up @@ -2702,6 +2703,7 @@ func (s *mutableStateSuite) TestCloseTransactionPrepareReplicationTasks_SyncHSMT
stateMachineDef := hsmtest.NewDefinition("test")
err := s.mockShard.StateMachineRegistry().RegisterMachine(stateMachineDef)
s.NoError(err)
s.mockConfig.EnableTransitionHistory = func() bool { return false }

testCases := []struct {
name string
Expand Down
28 changes: 28 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading