Skip to content

Commit

Permalink
Add tests for filtering log links when updating task execution
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Graetz <[email protected]>
  • Loading branch information
fg91 committed Apr 11, 2024
1 parent 24bddac commit a845802
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 1 deletion.
2 changes: 1 addition & 1 deletion flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
}
taskExecutionClosure.Metadata = mergeMetadata(taskExecutionClosure.Metadata, request.Event.Metadata)

if isPhaseChange {
if isPhaseChange && taskExecutionClosure.Metadata != nil && len(taskExecutionClosure.Metadata.ExternalResources) > 0 {
filterExternalResourceLogsByPhase(taskExecutionClosure.Metadata.ExternalResources, request.Event.Phase)
}

Expand Down
296 changes: 296 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,183 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {

}

func TestUpdateTaskExecutionModelFilterLogLinks(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_QUEUED,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{},
Reason: "task submitted to k8s",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "task submitted to k8s",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_QUEUED",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

updatedEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_QUEUED,
OccurredAt: occuredAtProto,
Logs: []*core.TaskLog{
{
Uri: "uri-show-pending",
ShowWhilePending: true,
},
{
Uri: "uri-default",
},
},
Reason: "task update",
},
}

err = UpdateTaskExecutionModel(context.TODO(), updatedEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

updatedClosure := &admin.TaskExecutionClosure{}
err = proto.Unmarshal(existingTaskExecution.Closure, updatedClosure)
assert.Nil(t, err)

assert.Equal(t, updatedClosure.Logs, []*core.TaskLog{
{
Uri: "uri-show-pending",
ShowWhilePending: true,
},
},
)

}

func TestUpdateTaskExecutionModelFilterLogLinksArray(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{},
Reason: "task started",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "task started",
},
},
Metadata: &event.TaskExecutionMetadata{
ExternalResources: []*event.ExternalResourceInfo{
{
Logs: []*core.TaskLog{
{
Uri: "uri-default",
},
{
Uri: "uri-hide-finished",
HideOnceFinished: true,
},
},
},
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

failedEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_FAILED,
OccurredAt: occuredAtProto,
Reason: "something went wrong",
},
}

err = UpdateTaskExecutionModel(context.TODO(), failedEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

updatedClosure := &admin.TaskExecutionClosure{}
err = proto.Unmarshal(existingTaskExecution.Closure, updatedClosure)
assert.Nil(t, err)

assert.Equal(t, updatedClosure.Metadata.ExternalResources[0].Logs, []*core.TaskLog{
{
Uri: "uri-default",
},
},
)

}

func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
Expand Down Expand Up @@ -1208,6 +1385,125 @@ func TestMergeLogs(t *testing.T) {
}
}

func TestFilterLogsByPhase(t *testing.T) {
type testCase struct {
existing []*core.TaskLog
expected []*core.TaskLog
phase core.TaskExecution_Phase
name string
}

testCases := []testCase{
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
},
phase: core.TaskExecution_QUEUED,
name: "Filtered logs in QUEUED phase",
},
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
phase: core.TaskExecution_RUNNING,
name: "Filtered logs in RUNNING phase",
},
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
},
phase: core.TaskExecution_SUCCEEDED,
name: "Filtered logs in terminated phase",
},
}
for _, filterTestCase := range testCases {
filteredLogs := filterLogsByPhase(filterTestCase.existing, filterTestCase.phase)

assert.Equal(t, len(filterTestCase.expected), len(filteredLogs), fmt.Sprintf("%s failed", filterTestCase.name))
for idx, expectedLog := range filterTestCase.expected {
assert.True(t, proto.Equal(expectedLog, filteredLogs[idx]), fmt.Sprintf("%s failed", filterTestCase.name))
}
}
}

func TestMergeCustoms(t *testing.T) {
t.Run("nothing to do", func(t *testing.T) {
custom, err := mergeCustom(nil, nil)
Expand Down

0 comments on commit a845802

Please sign in to comment.