Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytepropeller][flyteadmin] Streaming Decks V2 #6053

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.GetEvent().GetDeckUri()
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{
const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow"

const testInputURI = "fake://bucket/inputs.pb"
const DeckURI = "fake://bucket/deck.html"

var testInputs = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand All @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) {
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: startedAtProto,
DeckUri: DeckURI,
},
}
nodeExecutionModel := models.NodeExecution{}
Expand All @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt)
assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt()))
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputURI(t *testing.T) {
Expand All @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
OutputUri: outputURI,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
Expand All @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, outputURI, closure.GetOutputUri())
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputData(t *testing.T) {
Expand Down
138 changes: 119 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@
)

const pluginContextKey = contextutils.Key("plugin")
const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK")

type DeckStatus int

const (
DeckUnknown DeckStatus = iota
DeckEnabled
DeckDisabled
)

type metrics struct {
pluginPanics labeled.Counter
Expand Down Expand Up @@ -71,10 +80,47 @@
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

Check warning on line 90 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L83-L90

Added lines #L83 - L90 were not covered by tests

p.execInfo.OutputInfo.DeckURI = deckURI

Check warning on line 92 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L92

Added line #L92 was not covered by tests
}

func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error {
reader := tCtx.ow.GetReader()
if reader == nil && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
return nil
}

Check warning on line 100 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L95-L100

Added lines #L95 - L100 were not covered by tests

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return regErrors.Wrapf(err, "failed to check existence of deck file")
}

Check warning on line 106 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L102-L106

Added lines #L102 - L106 were not covered by tests

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

Check warning on line 110 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L108-L110

Added lines #L108 - L110 were not covered by tests

if exists {
deckURIValue := tCtx.ow.GetDeckPath()
p.execInfo.OutputInfo.DeckURI = &deckURIValue
}

Check warning on line 115 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L112-L115

Added lines #L112 - L115 were not covered by tests

return nil

Check warning on line 117 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L117

Added line #L117 was not covered by tests
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {

Check warning on line 120 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L120

Added line #L120 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider impact of removing deckPath parameter

The method signature for CacheHit has been modified to remove the deckPath parameter, but it seems this parameter may still be needed based on the usage in the code. Consider if this change could cause issues with deck path handling.

Code suggestion
Check the AI-generated fix before applying
Suggested change
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {

Code Review Run #f3ef5e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})

Check warning on line 123 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L123

Added line #L123 was not covered by tests
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -144,10 +190,13 @@
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}
} else {
p.execInfo.OutputInfo.OutputURI = outputPath

Check warning on line 199 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L199

Added line #L199 was not covered by tests
Comment on lines +192 to +198
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating OutputURI before update

Consider checking if the OutputURI is empty before updating it. The current implementation may overwrite an existing valid output URI with an empty one.

Code suggestion
Check the AI-generated fix before applying
 -	if p.execInfo.OutputInfo == nil {
 +	if p.execInfo.OutputInfo == nil && len(outputPath) > 0 {
  		p.execInfo.OutputInfo = &handler.OutputInfo{
  			OutputURI: outputPath,
  		}
  	} else {
 -		p.execInfo.OutputInfo.OutputURI = outputPath
 +		if len(outputPath) > 0 {
 +			p.execInfo.OutputInfo.OutputURI = outputPath
 +		}
  	}

Code Review Run #ecb1b4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -171,7 +220,8 @@
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
// Here will send the deck uri to flyteadmin
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// The plugin interface available especially for testing.
Expand Down Expand Up @@ -380,6 +430,27 @@
return t.taskMetricsMap[metricNameKey], nil
}

func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) {
// FLYTE_ENABLE_DECK is used when flytekit > 1.14.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update this comment

// For backward compatibility,
// we will return DeckUnknown and call a HEAD request to check if the deck file exists in the terminal state.

template, err := tCtx.tr.Read(ctx)
if err != nil {
return DeckUnknown, regErrors.Wrapf(err, "failed to read task template")
}

Check warning on line 441 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L440-L441

Added lines #L440 - L441 were not covered by tests

metadata := template.GetMetadata()
if metadata == nil {
return DeckUnknown, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct in older versions of flytekit? didn't tasks in the past also have this field? this means that this function will always return Disabled right for older versions of flytekit. meaning the condition on line 567 won't get triggered cuz it'll be disabled instead of unknown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes, you are right, thinking solution.

}

Check warning on line 446 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L445-L446

Added lines #L445 - L446 were not covered by tests
if metadata.GetGeneratesDeck() {
return DeckEnabled, nil
}

Check warning on line 449 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L448-L449

Added lines #L448 - L449 were not covered by tests

return DeckDisabled, nil
}

func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) {
pluginTrns := &pluginRequestedTransition{}

Expand Down Expand Up @@ -464,8 +535,41 @@
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
deckStatus, err := GetDeckStatus(ctx, tCtx)
if err != nil {
return nil, err
}

Check warning on line 545 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L544-L545

Added lines #L544 - L545 were not covered by tests
if deckStatus == DeckEnabled {
pluginTrns.AddDeckURI(tCtx)
}

Check warning on line 548 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L547-L548

Added lines #L547 - L548 were not covered by tests

// Handle backward compatibility for Flyte deck display behavior.
//
// Before (legacy behavior):
// - Deck URI was only shown if the deck file existed in the terminal state.
// - We relied on a HEAD request to check if the deck file exists, then added the URI to the event.
//
// After (new behavior):
// - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is no longer correct right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes super nice catch

// we display the deck URI from the beginning rather than waiting until the terminal state.
//
// For backward compatibility with older Flytekit versions (which don't support `FLYTE_ENABLE_DECK`),
// we still need to check deck file existence in the terminal state. This ensures that when the deck
// isn't enabled via config or doesn't exist, we only show the URI in terminal states if the deck file
// is actually present.
switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is for backward compatibility with older Flytekit versions.
if deckStatus == DeckUnknown {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}

Check warning on line 569 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L568-L569

Added lines #L568 - L569 were not covered by tests
if err != nil {
return pluginTrns, err
}

Check warning on line 572 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L571-L572

Added lines #L571 - L572 were not covered by tests
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand Down Expand Up @@ -501,25 +605,21 @@
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider impact of method signature change

The ObserveSuccess method signature has been modified to remove the deckURI parameter, but the deck URI is now being added through separate methods. Consider if this change maintains backward compatibility with existing code.

Code suggestion
Check the AI-generated fix before applying
Suggested change
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
// TODO: Deprecated - deckURI parameter will be removed in future versions
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), nil,

Code Review Run #ecb1b4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
// This is for backward compatibility with older Flytekit versions.
if deckStatus == DeckUnknown {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
return pluginTrns, err
}

Check warning on line 622 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L616-L622

Added lines #L616 - L622 were not covered by tests
pluginTrns.ObservedFailure(
&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
Expand Down
Loading