Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split mutable_state_util.go by purpose #6577

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 0 additions & 254 deletions service/history/execution/mutable_state_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@
package execution

import (
"encoding/json"
"testing"

"golang.org/x/exp/slices"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -144,234 +139,6 @@ func FindAutoResetPoint(
return "", nil
}

// CreatePersistenceMutableState creates a persistence mutable state based on the its in-memory version
func CreatePersistenceMutableState(t *testing.T, ms MutableState) *persistence.WorkflowMutableState {
builder := ms.(*mutableStateBuilder)
builder.FlushBufferedEvents() //nolint:errcheck
info := CopyWorkflowExecutionInfo(t, builder.GetExecutionInfo())
stats := &persistence.ExecutionStats{}
activityInfos := make(map[int64]*persistence.ActivityInfo)
for id, info := range builder.GetPendingActivityInfos() {
activityInfos[id] = CopyActivityInfo(t, info)
}
timerInfos := make(map[string]*persistence.TimerInfo)
for id, info := range builder.GetPendingTimerInfos() {
timerInfos[id] = CopyTimerInfo(t, info)
}
cancellationInfos := make(map[int64]*persistence.RequestCancelInfo)
for id, info := range builder.GetPendingRequestCancelExternalInfos() {
cancellationInfos[id] = CopyCancellationInfo(t, info)
}
signalInfos := make(map[int64]*persistence.SignalInfo)
for id, info := range builder.GetPendingSignalExternalInfos() {
signalInfos[id] = CopySignalInfo(t, info)
}
childInfos := make(map[int64]*persistence.ChildExecutionInfo)
for id, info := range builder.GetPendingChildExecutionInfos() {
childInfos[id] = CopyChildInfo(t, info)
}

builder.FlushBufferedEvents() //nolint:errcheck
var bufferedEvents []*types.HistoryEvent
if len(builder.bufferedEvents) > 0 {
bufferedEvents = append(bufferedEvents, builder.bufferedEvents...)
}
if len(builder.updateBufferedEvents) > 0 {
bufferedEvents = append(bufferedEvents, builder.updateBufferedEvents...)
}

var versionHistories *persistence.VersionHistories
if ms.GetVersionHistories() != nil {
versionHistories = ms.GetVersionHistories().Duplicate()
}
return &persistence.WorkflowMutableState{
ExecutionInfo: info,
ExecutionStats: stats,
ActivityInfos: activityInfos,
TimerInfos: timerInfos,
BufferedEvents: bufferedEvents,
SignalInfos: signalInfos,
RequestCancelInfos: cancellationInfos,
ChildExecutionInfos: childInfos,
VersionHistories: versionHistories,
}
}

// CopyWorkflowExecutionInfo copies WorkflowExecutionInfo
func CopyWorkflowExecutionInfo(t *testing.T, sourceInfo *persistence.WorkflowExecutionInfo) *persistence.WorkflowExecutionInfo {
return &persistence.WorkflowExecutionInfo{
DomainID: sourceInfo.DomainID,
WorkflowID: sourceInfo.WorkflowID,
RunID: sourceInfo.RunID,
FirstExecutionRunID: sourceInfo.FirstExecutionRunID,
ParentDomainID: sourceInfo.ParentDomainID,
ParentWorkflowID: sourceInfo.ParentWorkflowID,
ParentRunID: sourceInfo.ParentRunID,
IsCron: sourceInfo.IsCron,
InitiatedID: sourceInfo.InitiatedID,
CompletionEventBatchID: sourceInfo.CompletionEventBatchID,
CompletionEvent: sourceInfo.CompletionEvent,
TaskList: sourceInfo.TaskList,
StickyTaskList: sourceInfo.StickyTaskList,
StickyScheduleToStartTimeout: sourceInfo.StickyScheduleToStartTimeout,
WorkflowTypeName: sourceInfo.WorkflowTypeName,
WorkflowTimeout: sourceInfo.WorkflowTimeout,
DecisionStartToCloseTimeout: sourceInfo.DecisionStartToCloseTimeout,
ExecutionContext: sourceInfo.ExecutionContext,
State: sourceInfo.State,
CloseStatus: sourceInfo.CloseStatus,
LastFirstEventID: sourceInfo.LastFirstEventID,
LastEventTaskID: sourceInfo.LastEventTaskID,
NextEventID: sourceInfo.NextEventID,
LastProcessedEvent: sourceInfo.LastProcessedEvent,
StartTimestamp: sourceInfo.StartTimestamp,
LastUpdatedTimestamp: sourceInfo.LastUpdatedTimestamp,
CreateRequestID: sourceInfo.CreateRequestID,
SignalCount: sourceInfo.SignalCount,
DecisionVersion: sourceInfo.DecisionVersion,
DecisionScheduleID: sourceInfo.DecisionScheduleID,
DecisionStartedID: sourceInfo.DecisionStartedID,
DecisionRequestID: sourceInfo.DecisionRequestID,
DecisionTimeout: sourceInfo.DecisionTimeout,
DecisionAttempt: sourceInfo.DecisionAttempt,
DecisionScheduledTimestamp: sourceInfo.DecisionScheduledTimestamp,
DecisionStartedTimestamp: sourceInfo.DecisionStartedTimestamp,
DecisionOriginalScheduledTimestamp: sourceInfo.DecisionOriginalScheduledTimestamp,
CancelRequested: sourceInfo.CancelRequested,
CancelRequestID: sourceInfo.CancelRequestID,
CronSchedule: sourceInfo.CronSchedule,
ClientLibraryVersion: sourceInfo.ClientLibraryVersion,
ClientFeatureVersion: sourceInfo.ClientFeatureVersion,
ClientImpl: sourceInfo.ClientImpl,
AutoResetPoints: sourceInfo.AutoResetPoints,
Memo: sourceInfo.Memo,
SearchAttributes: sourceInfo.SearchAttributes,
PartitionConfig: sourceInfo.PartitionConfig,
Attempt: sourceInfo.Attempt,
HasRetryPolicy: sourceInfo.HasRetryPolicy,
InitialInterval: sourceInfo.InitialInterval,
BackoffCoefficient: sourceInfo.BackoffCoefficient,
MaximumInterval: sourceInfo.MaximumInterval,
ExpirationTime: sourceInfo.ExpirationTime,
MaximumAttempts: sourceInfo.MaximumAttempts,
NonRetriableErrors: sourceInfo.NonRetriableErrors,
BranchToken: sourceInfo.BranchToken,
ExpirationSeconds: sourceInfo.ExpirationSeconds,
}
}

// CopyActivityInfo copies ActivityInfo
func CopyActivityInfo(t *testing.T, sourceInfo *persistence.ActivityInfo) *persistence.ActivityInfo {
details := slices.Clone(sourceInfo.Details)

return &persistence.ActivityInfo{
Version: sourceInfo.Version,
ScheduleID: sourceInfo.ScheduleID,
ScheduledEventBatchID: sourceInfo.ScheduledEventBatchID,
ScheduledEvent: deepCopyHistoryEvent(t, sourceInfo.ScheduledEvent),
StartedID: sourceInfo.StartedID,
StartedEvent: deepCopyHistoryEvent(t, sourceInfo.StartedEvent),
ActivityID: sourceInfo.ActivityID,
RequestID: sourceInfo.RequestID,
Details: details,
ScheduledTime: sourceInfo.ScheduledTime,
StartedTime: sourceInfo.StartedTime,
ScheduleToStartTimeout: sourceInfo.ScheduleToStartTimeout,
ScheduleToCloseTimeout: sourceInfo.ScheduleToCloseTimeout,
StartToCloseTimeout: sourceInfo.StartToCloseTimeout,
HeartbeatTimeout: sourceInfo.HeartbeatTimeout,
LastHeartBeatUpdatedTime: sourceInfo.LastHeartBeatUpdatedTime,
CancelRequested: sourceInfo.CancelRequested,
CancelRequestID: sourceInfo.CancelRequestID,
TimerTaskStatus: sourceInfo.TimerTaskStatus,
Attempt: sourceInfo.Attempt,
DomainID: sourceInfo.DomainID,
StartedIdentity: sourceInfo.StartedIdentity,
TaskList: sourceInfo.TaskList,
HasRetryPolicy: sourceInfo.HasRetryPolicy,
InitialInterval: sourceInfo.InitialInterval,
BackoffCoefficient: sourceInfo.BackoffCoefficient,
MaximumInterval: sourceInfo.MaximumInterval,
ExpirationTime: sourceInfo.ExpirationTime,
MaximumAttempts: sourceInfo.MaximumAttempts,
NonRetriableErrors: sourceInfo.NonRetriableErrors,
LastFailureReason: sourceInfo.LastFailureReason,
LastWorkerIdentity: sourceInfo.LastWorkerIdentity,
LastFailureDetails: sourceInfo.LastFailureDetails,
// Not written to database - This is used only for deduping heartbeat timer creation
LastHeartbeatTimeoutVisibilityInSeconds: sourceInfo.LastHeartbeatTimeoutVisibilityInSeconds,
}
}

// CopyTimerInfo copies TimerInfo
func CopyTimerInfo(t *testing.T, sourceInfo *persistence.TimerInfo) *persistence.TimerInfo {
return &persistence.TimerInfo{
Version: sourceInfo.Version,
TimerID: sourceInfo.TimerID,
StartedID: sourceInfo.StartedID,
ExpiryTime: sourceInfo.ExpiryTime,
TaskStatus: sourceInfo.TaskStatus,
}
}

// CopyCancellationInfo copies RequestCancelInfo
func CopyCancellationInfo(t *testing.T, sourceInfo *persistence.RequestCancelInfo) *persistence.RequestCancelInfo {
return &persistence.RequestCancelInfo{
Version: sourceInfo.Version,
InitiatedID: sourceInfo.InitiatedID,
InitiatedEventBatchID: sourceInfo.InitiatedEventBatchID,
CancelRequestID: sourceInfo.CancelRequestID,
}
}

// CopySignalInfo copies SignalInfo
func CopySignalInfo(t *testing.T, sourceInfo *persistence.SignalInfo) *persistence.SignalInfo {
return &persistence.SignalInfo{
Version: sourceInfo.Version,
InitiatedEventBatchID: sourceInfo.InitiatedEventBatchID,
InitiatedID: sourceInfo.InitiatedID,
SignalRequestID: sourceInfo.SignalRequestID,
SignalName: sourceInfo.SignalName,
Input: slices.Clone(sourceInfo.Input),
Control: slices.Clone(sourceInfo.Control),
}
}

// CopyChildInfo copies ChildExecutionInfo
func CopyChildInfo(t *testing.T, sourceInfo *persistence.ChildExecutionInfo) *persistence.ChildExecutionInfo {
return &persistence.ChildExecutionInfo{
Version: sourceInfo.Version,
InitiatedID: sourceInfo.InitiatedID,
InitiatedEventBatchID: sourceInfo.InitiatedEventBatchID,
StartedID: sourceInfo.StartedID,
StartedWorkflowID: sourceInfo.StartedWorkflowID,
StartedRunID: sourceInfo.StartedRunID,
CreateRequestID: sourceInfo.CreateRequestID,
DomainID: sourceInfo.DomainID,
DomainNameDEPRECATED: sourceInfo.DomainNameDEPRECATED,
WorkflowTypeName: sourceInfo.WorkflowTypeName,
ParentClosePolicy: sourceInfo.ParentClosePolicy,
InitiatedEvent: deepCopyHistoryEvent(t, sourceInfo.InitiatedEvent),
StartedEvent: deepCopyHistoryEvent(t, sourceInfo.StartedEvent),
}
}

func deepCopyHistoryEvent(t *testing.T, e *types.HistoryEvent) *types.HistoryEvent {
if e == nil {
return nil
}
bytes, err := json.Marshal(e)
if err != nil {
panic(err)
}
var copy types.HistoryEvent
err = json.Unmarshal(bytes, &copy)
if err != nil {
panic(err)
}
return &copy
}

// GetChildExecutionDomainName gets domain name for the child workflow
// NOTE: DomainName in ChildExecutionInfo is being deprecated, and
// we should always use DomainID field instead.
Expand Down Expand Up @@ -412,27 +179,6 @@ func GetChildExecutionDomainID(
return parentDomainEntry.GetInfo().ID, nil
}

// GetChildExecutionDomainEntry get domain entry for the child workflow
// NOTE: DomainName in ChildExecutionInfo is being deprecated, and
// we should always use DomainID field instead.
// this function exists for backward compatibility reason
func GetChildExecutionDomainEntry(
t *testing.T,
childInfo *persistence.ChildExecutionInfo,
domainCache cache.DomainCache,
parentDomainEntry *cache.DomainCacheEntry,
) (*cache.DomainCacheEntry, error) {
if childInfo.DomainID != "" {
return domainCache.GetDomainByID(childInfo.DomainID)
}

if childInfo.DomainNameDEPRECATED != "" {
return domainCache.GetDomain(childInfo.DomainNameDEPRECATED)
}

return parentDomainEntry, nil
}

func trimBinaryChecksums(recentBinaryChecksums []string, currResetPoints []*types.ResetPointInfo, maxResetPoints int) ([]string, []*types.ResetPointInfo) {
numResetPoints := len(currResetPoints)
if numResetPoints >= maxResetPoints {
Expand Down
Loading
Loading