Skip to content

Commit

Permalink
Execute narrow lookup before broad task lookup
Browse files Browse the repository at this point in the history
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
  • Loading branch information
3 people committed Oct 21, 2024
1 parent 93d1f3c commit b8efd3c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
22 changes: 17 additions & 5 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,18 +1247,30 @@ func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error {
if outputParameterKey == "" {
return cfg.paramError(fmt.Errorf("output parameter key is empty"))
}
tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false)

// Get a list of tasks for the current DAG first.
tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, true)
if err != nil {
return cfg.paramError(err)
}
glog.V(4).Infof("tasks: %#v", tasks)
// The producer is the task that produces the output that we need to
// consume.

// Check to see if the producer is in the list of tasks.
producer, ok := tasks[producerTaskName]
if !ok {
return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName))
// If the producer is not in the list of tasks for the current DAG,
// lookup all of the tasks in the context (which includes other DAGs).
tasks, err = cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false)
if err != nil {
return cfg.paramError(err)
}
producer, ok = tasks[producerTaskName]
if !ok {
return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName))
}
}

glog.V(4).Info("producer: ", producer)
glog.V(4).Infof("tasks: %#v", tasks)
currentTask := producer
currentSubTaskMaybeDAG := true
// Continue looping until we reach a sub-task that is NOT a DAG.
Expand Down
7 changes: 7 additions & 0 deletions backend/src/v2/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,13 @@ func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pip
}
existing, ok := executionsMap[taskName]
if ok {
// TODO: The failure to handle this results in a specific edge
// case which has yet to be solved for. If you have three nested
// pipelines: A, which calls B, which calls C, and B and C share
// a task that A does not have but depends on in a producer
// subtask, when GetExecutionsInDAG is called, it will raise
// this error.

// TODO(Bobgy): to support retry, we need to handle multiple tasks with the same task name.
return nil, fmt.Errorf("two tasks have the same task name %q, id1=%v id2=%v", taskName, existing.GetID(), execution.GetID())
}
Expand Down

0 comments on commit b8efd3c

Please sign in to comment.