Skip to content

Commit

Permalink
updating externalResourceID
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Jan 4, 2024
1 parent a726c36 commit aa9f7ff
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
29 changes: 25 additions & 4 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package array
import (
"context"
"fmt"
"strconv"
"time"

"github.com/golang/protobuf/ptypes"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/encoding"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
)

type arrayEventRecorder interface {
interfaces.EventRecorder
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32)
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error
finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error
finalizeRequired(ctx context.Context) bool
}
Expand All @@ -39,8 +42,23 @@ func (e *externalResourcesEventRecorder) RecordTaskEvent(ctx context.Context, ev
return nil
}

func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
externalResourceID := fmt.Sprintf("%s-%d", buildSubNodeID(nCtx, index), retryAttempt)
func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error {
// generate externalResourceID
currentNodeUniqueID := nCtx.NodeID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
var err error
currentNodeUniqueID, err = common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID())
if err != nil {
return err
}
}

uniqueID, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(retryAttempt))})
if err != nil {
return err
}

externalResourceID := fmt.Sprintf("%s-n%d-%d", uniqueID, index, retryAttempt)

// process events
cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED
Expand Down Expand Up @@ -83,6 +101,8 @@ func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx inter
// clear nodeEvents and taskEvents
e.nodeEvents = e.nodeEvents[:0]
e.taskEvents = e.taskEvents[:0]

return nil
}

func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
Expand Down Expand Up @@ -165,7 +185,8 @@ type passThroughEventRecorder struct {
interfaces.EventRecorder
}

func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error {
return nil
}

func (*passThroughEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
Expand Down
12 changes: 9 additions & 3 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

eventRecorder.process(ctx, nCtx, i, retryAttempt)
if err := eventRecorder.process(ctx, nCtx, i, retryAttempt); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
}
}
}
Expand Down Expand Up @@ -241,7 +243,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

eventRecorder.process(ctx, nCtx, i, 0)
if err := eventRecorder.process(ctx, nCtx, i, 0); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
}

// transition ArrayNode to `ArrayNodePhaseExecuting`
Expand Down Expand Up @@ -331,7 +335,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
}
}
eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts())
if err := eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()); err != nil {
return handler.UnknownTransition, err
}

// update subNode state
arrayNodeState.SubNodePhases.SetItem(index, uint64(subNodeStatus.GetPhase()))
Expand Down

0 comments on commit aa9f7ff

Please sign in to comment.