Skip to content

Commit

Permalink
Feat: Allow controlling in which task phases log links are shown (fly…
Browse files Browse the repository at this point in the history
…teorg#4726)

* Add ShowWhilePending arg to TaskLog flyteidl message

Signed-off-by: Fabio Graetz <[email protected]>

* Allow showing specific logs already during queued phase

Signed-off-by: Fabio Graetz <[email protected]>

* Use core.PhaseInfoQueuedWithTaskInfo instead of core.PhaseInfoQueued in plugins so log links are available

Signed-off-by: Fabio Graetz <[email protected]>

* Bump phase version in pytorch plugin

Signed-off-by: Fabio Graetz <[email protected]>

* Fix nil containerId in pending phase

Signed-off-by: Fabio Graetz <[email protected]>

* Undo changes from rebase in ray.go

Signed-off-by: Fabio Graetz <[email protected]>

* Regenerate protos

Signed-off-by: Fabio Graetz <[email protected]>

* Fix after rebasing

Signed-off-by: Fabio Graetz <[email protected]>

* Add HideOnceFinished option to TaskLog proto message

Signed-off-by: Fabio Graetz <[email protected]>

* Hide certain logs once finished

Signed-off-by: Fabio Graetz <[email protected]>

* Move log link filtering (by phase) from propeller to admin

Signed-off-by: Fabio Graetz <[email protected]>

* Move bumping of plugin state phase version into function

Signed-off-by: Fabio Graetz <[email protected]>

* Move helper function which bumps phase version to k8s plugin package

Signed-off-by: Fabio Graetz <[email protected]>

* Consistently bump phase version when reason changes in pod, pytorch, tensorflow, and mpi plugins

Signed-off-by: Fabio Graetz <[email protected]>

* Make controlling lifetime of log links work with dask plugin

Signed-off-by: Fabio Graetz <[email protected]>

* Make controlling lifetime of log links work with ray plugin

Signed-off-by: Fabio Graetz <[email protected]>

* Make controlling lifetime of log links work with spark plugin

Signed-off-by: Fabio Graetz <[email protected]>

* Don't return pluginsCore.PhaseInfoUndefined but already known phaseInfo if we fail to update the phase version

Signed-off-by: Fabio Graetz <[email protected]>

* Remove now obsolete logic to check whether dask job is queued

Signed-off-by: Fabio Graetz <[email protected]>

* Adapt docstring explaining why we treat queued and init phase the same while filtering log links

Signed-off-by: Fabio Graetz <[email protected]>

* Make propeller tests pass

Signed-off-by: Fabio Graetz <[email protected]>

* Make pluginmachinery/flytek8s tests pass

Signed-off-by: Fabio Graetz <[email protected]>

* Fix dask, pytorch, tensorflow, and mpi tests

Signed-off-by: Fabio Graetz <[email protected]>

* Make log link filtering by phase work for map tasks

Signed-off-by: Fabio Graetz <[email protected]>

* Add tests for filtering log links when updating task execution

Signed-off-by: Fabio Graetz <[email protected]>

* Show All user logs while queueing phase as before

Signed-off-by: Fabio Graetz <[email protected]>

* Fix spark tests

Signed-off-by: Fabio Graetz <[email protected]>

* Fix after rebase

Signed-off-by: Fabio Graetz <[email protected]>

* Fix flyteidl go.mod

Signed-off-by: Fabio Graetz <[email protected]>

* Fix mpi test

Signed-off-by: Fabio Graetz <[email protected]>

* Add tests for PR flyteorg#4726 (flyteorg#5200)

* Add tests to ensure the phase version is bumped in kubeflow plugin if reason changes within the same phase

Signed-off-by: Fabio Graetz <[email protected]>

* Test that ray and dask plugins bump phase version in GetTaskPhase

Signed-off-by: Fabio Graetz <[email protected]>

* Test phase version increase when reason changes for spark plugin

Signed-off-by: Fabio Graetz <[email protected]>

* Fix ray tests after rebase

Signed-off-by: Fabio Graetz <[email protected]>

* Make lint pass

Signed-off-by: Fabio Graetz <[email protected]>

---------

Signed-off-by: Fabio Graetz <[email protected]>

* Update flyteplugins/go/tasks/logs/logging_utils.go

Signed-off-by: Fabio M. Graetz, Ph.D. <[email protected]>
Signed-off-by: Fabio Graetz <[email protected]>

* Update go.mod after flyteidl make generate

Signed-off-by: Fabio Graetz <[email protected]>

* Restrict numpy version in single binary e2e tests

Signed-off-by: Fabio Graetz <[email protected]>

---------

Signed-off-by: Fabio Graetz <[email protected]>
Signed-off-by: Fabio M. Graetz, Ph.D. <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Fabio Grätz <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: Vladyslav Libov <[email protected]>
  • Loading branch information
3 people authored and VladyslavLibov committed Aug 16, 2024
1 parent cf4a9ed commit 4589d62
Show file tree
Hide file tree
Showing 39 changed files with 953 additions and 253 deletions.
37 changes: 36 additions & 1 deletion flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,31 @@ func mergeMetadata(existing, latest *event.TaskExecutionMetadata) *event.TaskExe
return existing
}

func filterExternalResourceLogsByPhase(externalResources []*event.ExternalResourceInfo, phase core.TaskExecution_Phase) {
for _, externalResource := range externalResources {
externalResource.Logs = filterLogsByPhase(externalResource.Logs, phase)
}
}

func filterLogsByPhase(logs []*core.TaskLog, phase core.TaskExecution_Phase) []*core.TaskLog {
filteredLogs := make([]*core.TaskLog, 0, len(logs))

for _, l := range logs {
if common.IsTaskExecutionTerminal(phase) && l.HideOnceFinished {
continue
}
// Some plugins like e.g. Dask, Ray start with or very quickly transition to core.TaskExecution_INITIALIZING
// once the CR has been created even though the underlying pods are still pending. We thus treat queued and
// initializing the same here.
if (phase == core.TaskExecution_QUEUED || phase == core.TaskExecution_INITIALIZING) && !l.ShowWhilePending {
continue
}
filteredLogs = append(filteredLogs, l)

}
return filteredLogs
}

func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution,
inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error {
err := handleTaskExecutionInputs(ctx, taskExecutionModel, request, storageClient)
Expand All @@ -384,6 +409,7 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
return errors.NewFlyteAdminErrorf(codes.Internal,
"failed to unmarshal task execution closure with error: %+v", err)
}
isPhaseChange := taskExecutionModel.Phase != request.Event.Phase.String()
existingTaskPhase := taskExecutionModel.Phase
taskExecutionModel.Phase = request.Event.Phase.String()
taskExecutionModel.PhaseVersion = request.Event.PhaseVersion
Expand All @@ -393,7 +419,11 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
reportedAt = request.Event.OccurredAt
}
taskExecutionClosure.UpdatedAt = reportedAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)

mergedLogs := mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
filteredLogs := filterLogsByPhase(mergedLogs, request.Event.Phase)
taskExecutionClosure.Logs = filteredLogs

if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append(
Expand Down Expand Up @@ -437,6 +467,11 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to merge task event custom_info with error: %v", err)
}
taskExecutionClosure.Metadata = mergeMetadata(taskExecutionClosure.Metadata, request.Event.Metadata)

if isPhaseChange && taskExecutionClosure.Metadata != nil && len(taskExecutionClosure.Metadata.ExternalResources) > 0 {
filterExternalResourceLogsByPhase(taskExecutionClosure.Metadata.ExternalResources, request.Event.Phase)
}

if request.Event.EventVersion > taskExecutionClosure.EventVersion {
taskExecutionClosure.EventVersion = request.Event.EventVersion
}
Expand Down
296 changes: 296 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,183 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {

}

func TestUpdateTaskExecutionModelFilterLogLinks(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_QUEUED,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{},
Reason: "task submitted to k8s",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "task submitted to k8s",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_QUEUED",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

updatedEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_QUEUED,
OccurredAt: occuredAtProto,
Logs: []*core.TaskLog{
{
Uri: "uri-show-pending",
ShowWhilePending: true,
},
{
Uri: "uri-default",
},
},
Reason: "task update",
},
}

err = UpdateTaskExecutionModel(context.TODO(), updatedEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

updatedClosure := &admin.TaskExecutionClosure{}
err = proto.Unmarshal(existingTaskExecution.Closure, updatedClosure)
assert.Nil(t, err)

assert.Equal(t, updatedClosure.Logs, []*core.TaskLog{
{
Uri: "uri-show-pending",
ShowWhilePending: true,
},
},
)

}

func TestUpdateTaskExecutionModelFilterLogLinksArray(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{},
Reason: "task started",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "task started",
},
},
Metadata: &event.TaskExecutionMetadata{
ExternalResources: []*event.ExternalResourceInfo{
{
Logs: []*core.TaskLog{
{
Uri: "uri-default",
},
{
Uri: "uri-hide-finished",
HideOnceFinished: true,
},
},
},
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

failedEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_FAILED,
OccurredAt: occuredAtProto,
Reason: "something went wrong",
},
}

err = UpdateTaskExecutionModel(context.TODO(), failedEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

updatedClosure := &admin.TaskExecutionClosure{}
err = proto.Unmarshal(existingTaskExecution.Closure, updatedClosure)
assert.Nil(t, err)

assert.Equal(t, updatedClosure.Metadata.ExternalResources[0].Logs, []*core.TaskLog{
{
Uri: "uri-default",
},
},
)

}

func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
Expand Down Expand Up @@ -1208,6 +1385,125 @@ func TestMergeLogs(t *testing.T) {
}
}

func TestFilterLogsByPhase(t *testing.T) {
type testCase struct {
existing []*core.TaskLog
expected []*core.TaskLog
phase core.TaskExecution_Phase
name string
}

testCases := []testCase{
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
},
phase: core.TaskExecution_QUEUED,
name: "Filtered logs in QUEUED phase",
},
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
phase: core.TaskExecution_RUNNING,
name: "Filtered logs in RUNNING phase",
},
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
},
phase: core.TaskExecution_SUCCEEDED,
name: "Filtered logs in terminated phase",
},
}
for _, filterTestCase := range testCases {
filteredLogs := filterLogsByPhase(filterTestCase.existing, filterTestCase.phase)

assert.Equal(t, len(filterTestCase.expected), len(filteredLogs), fmt.Sprintf("%s failed", filterTestCase.name))
for idx, expectedLog := range filterTestCase.expected {
assert.True(t, proto.Equal(expectedLog, filteredLogs[idx]), fmt.Sprintf("%s failed", filterTestCase.name))
}
}
}

func TestMergeCustoms(t *testing.T) {
t.Run("nothing to do", func(t *testing.T) {
custom, err := mergeCustom(nil, nil)
Expand Down
6 changes: 6 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4589d62

Please sign in to comment.