Skip to content

Commit

Permalink
Render pipeline topology correctly, add missing nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
DaoDaoNoCode committed May 9, 2024
1 parent ee7e684 commit 4329ea9
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 125 deletions.
45 changes: 25 additions & 20 deletions frontend/src/concepts/pipelines/kfTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,17 @@ export type InputOutputArtifactType = {
};

export type InputOutputDefinition = {
artifacts?: Record<
string,
{
artifactType: InputOutputArtifactType;
}
>;
artifacts?: InputOutputDefinitionArtifacts;
parameters?: ParametersKF;
};

export type InputOutputDefinitionArtifacts = Record<
string,
{
artifactType: InputOutputArtifactType;
}
>;

type GroupNodeComponent = {
dag: DAG;
};
Expand Down Expand Up @@ -234,17 +236,22 @@ export type TaskKF = {
inputs?: {
artifacts?: Record<
string,
{
taskOutputArtifact?: {
/** Artifact node name */
outputArtifactKey: string;
/**
* The task string for runAfter
* @see DAG.tasks
*/
producerTask: string;
};
}
EitherNotBoth<
{
taskOutputArtifact?: {
/** Artifact node name */
outputArtifactKey: string;
/**
* The task string for runAfter
* @see DAG.tasks
*/
producerTask: string;
};
},
{
componentInputArtifact: string;
}
>
>;
parameters?: Record<
string,
Expand Down Expand Up @@ -326,9 +333,7 @@ export type PipelineSpec = {
};
root: {
dag: DAG;
inputDefinitions?: {
parameters: ParametersKF;
};
inputDefinitions?: InputOutputDefinition;
};
schemaVersion: string;
sdkVersion: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,33 +417,44 @@ describe('pipeline topology parseUtils', () => {
taskInfo: { name: 'task-2' },
triggerPolicy: { strategy: TriggerStrategy.ALL_UPSTREAM_TASKS_SUCCEEDED },
},
'task-3': {
cachingOptions: { enableCache: true },
componentRef: { name: 'comp-task-3' },
dependentTasks: [],
inputs: {
artifacts: {
'task-3-artifact-3': {
componentInputArtifact: 'test-artifact',
},
},
},
taskInfo: { name: 'task-3' },
triggerPolicy: { strategy: TriggerStrategy.ALL_UPSTREAM_TASKS_SUCCEEDED },
},
};
const consoleWarnSpy = jest.spyOn(global.console, 'warn');

it('returns empty object when no task artifacts exist', () => {
const result = parseTasksForArtifactRelationship({
const result = parseTasksForArtifactRelationship('root', {
'task-1': { ...testTasks['task-1'], inputs: {} },
'task-2': { ...testTasks['task-2'], inputs: {} },
});
expect(result).toEqual({});
});

it('returns task artifact map when artifacts are provided', () => {
const result = parseTasksForArtifactRelationship(testTasks);
const result = parseTasksForArtifactRelationship('root', testTasks);

expect(result).toEqual({
'some-dag-task-2': [
{ outputArtifactKey: 'task-1-artifact-name', artifactId: 'task-1-artifact-1' },
],
'some-dag-task-1': [
{ outputArtifactKey: 'task-2-artifact-name', artifactId: 'task-2-artifact-2' },
],
'task-1': [{ artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-2.task-1-artifact-name' }],
'task-2': [{ artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-1.task-2-artifact-name' }],
'task-3': [{ artifactNodeId: 'GROUP.root.ARTIFACT..test-artifact' }],
});
});

describe('returns warning with unmapped artifact for a task when', () => {
it('no producerTask is found', () => {
const result = parseTasksForArtifactRelationship({
const result = parseTasksForArtifactRelationship('root', {
...testTasks,
'task-2': {
...testTasks['task-2'],
Expand All @@ -464,17 +475,17 @@ describe('pipeline topology parseUtils', () => {
taskOutputArtifact: { outputArtifactKey: 'task-2-artifact-name', producerTask: '' },
});
expect(result).toEqual({
'some-dag-task-2': [
'task-1': [
{
artifactId: 'task-1-artifact-1',
outputArtifactKey: 'task-1-artifact-name',
artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-2.task-1-artifact-name',
},
],
'task-3': [{ artifactNodeId: 'GROUP.root.ARTIFACT..test-artifact' }],
});
});

it('no outputArtifactKey is found', () => {
const result = parseTasksForArtifactRelationship({
const result = parseTasksForArtifactRelationship('root', {
...testTasks,
'task-2': {
...testTasks['task-2'],
Expand All @@ -495,17 +506,17 @@ describe('pipeline topology parseUtils', () => {
taskOutputArtifact: { outputArtifactKey: '', producerTask: 'some-dag-task-1' },
});
expect(result).toEqual({
'some-dag-task-2': [
'task-1': [
{
artifactId: 'task-1-artifact-1',
outputArtifactKey: 'task-1-artifact-name',
artifactNodeId: 'GROUP.root.ARTIFACT.some-dag-task-2.task-1-artifact-name',
},
],
'task-3': [{ artifactNodeId: 'GROUP.root.ARTIFACT..test-artifact' }],
});
});

it('no taskOutputArtifact is found', () => {
const result = parseTasksForArtifactRelationship({
const result = parseTasksForArtifactRelationship('root', {
'task-1': {
...testTasks['task-1'],
inputs: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { ICON_TASK_NODE_TYPE } from '~/concepts/topology/utils';
import { EXECUTION_TASK_NODE_TYPE } from '~/concepts/topology/const';

describe('usePipelineTaskTopology', () => {
beforeEach(() => {
jest.spyOn(console, 'warn').mockImplementation(jest.fn());
});
it('returns the correct number of nodes', () => {
const renderResult = testHook(usePipelineTaskTopology)(mockLargePipelineSpec);
const nodes = renderResult.result.current;
Expand All @@ -14,9 +17,9 @@ describe('usePipelineTaskTopology', () => {
const groups = nodes.filter((n) => n.type === EXECUTION_TASK_NODE_TYPE);
const artifactNodes = nodes.filter((n) => n.type === ICON_TASK_NODE_TYPE);

expect(nodes).toHaveLength(86);
expect(nodes).toHaveLength(107);
expect(tasks).toHaveLength(35);
expect(groups).toHaveLength(5);
expect(artifactNodes).toHaveLength(46);
expect(artifactNodes).toHaveLength(67);
});
});
59 changes: 41 additions & 18 deletions frontend/src/concepts/pipelines/topology/parseUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,47 @@ export const parseComponentsForArtifactRelationship = (
);

export type TaskArtifactMap = {
[taskName: string]: { outputArtifactKey: string; artifactId: string }[] | undefined;
[taskName: string]: { artifactNodeId: string }[] | undefined;
};
export const parseTasksForArtifactRelationship = (tasks: DAG['tasks']): TaskArtifactMap =>
Object.values(tasks).reduce<TaskArtifactMap>(
(map, taskValue) =>
Object.entries(taskValue.inputs?.artifacts ?? {}).reduce(
(artifactItems, [artifactId, value]) => {
const { producerTask: taskId, outputArtifactKey } = value.taskOutputArtifact || {};
if (!taskId || !outputArtifactKey) {
// eslint-disable-next-line no-console
console.warn('Issue constructing artifact node', value);
return artifactItems;
}

export const parseTasksForArtifactRelationship = (
groupId: string | undefined,
tasks: DAG['tasks'],
): TaskArtifactMap =>
Object.entries(tasks).reduce<TaskArtifactMap>(
(map, [taskId, taskValue]) =>
Object.entries(taskValue.inputs?.artifacts ?? {}).reduce((artifactItems, [, value]) => {
// artifact without inputs
if (value.componentInputArtifact) {
return {
...artifactItems,
[taskId]: [
...(artifactItems[taskId] ?? []),
{
outputArtifactKey,
artifactId,
artifactNodeId: idForTaskArtifact(groupId, '', value.componentInputArtifact),
},
],
};
},
map,
),
}

// else, artifacts with inputs from tasks
const { producerTask, outputArtifactKey } = value.taskOutputArtifact || {};

if (!producerTask || !outputArtifactKey) {
// eslint-disable-next-line no-console
console.warn('Issue constructing artifact node', value);
return artifactItems;
}

return {
...artifactItems,
[taskId]: [
...(artifactItems[taskId] ?? []),
{
artifactNodeId: idForTaskArtifact(groupId, producerTask, outputArtifactKey),
},
],
};
}, map),
{},
);

Expand Down Expand Up @@ -314,3 +328,12 @@ export const parseVolumeMounts = (
name: pvc.taskOutputParameter?.producerTask ?? '',
}));
};

export const idForTaskArtifact = (
groupId: string | undefined,
taskId: string,
artifactId: string,
): string =>
groupId
? `GROUP.${groupId}.ARTIFACT.${taskId}.${artifactId}`
: `ARTIFACT.${taskId}.${artifactId}`;
Loading

0 comments on commit 4329ea9

Please sign in to comment.