diff --git a/workflow/artifacts/oss/oss.go b/workflow/artifacts/oss/oss.go index 82fb2c977527..e4461558a0c0 100644 --- a/workflow/artifacts/oss/oss.go +++ b/workflow/artifacts/oss/oss.go @@ -159,9 +159,45 @@ func (ossDriver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) return err } -func (ossDriver *ArtifactDriver) OpenStream(a *wfv1.Artifact) (io.ReadCloser, error) { - // todo: this is a temporary implementation which loads file to disk first - return common.LoadToStream(a, ossDriver) +// OpenStream opens a stream reader for an artifact from OSS compliant storage +func (ossDriver *ArtifactDriver) OpenStream(inputArtifact *wfv1.Artifact) (io.ReadCloser, error) { + var stream io.ReadCloser + err := waitutil.Backoff(defaultRetry, + func() (bool, error) { + log.Infof("OSS OpenStream, key: %s", inputArtifact.OSS.Key) + osscli, err := ossDriver.newOSSClient() + if err != nil { + return !isTransientOSSErr(err), err + } + bucketName := inputArtifact.OSS.Bucket + err = setBucketLogging(osscli, bucketName) + if err != nil { + return !isTransientOSSErr(err), err + } + bucket, err := osscli.Bucket(bucketName) + if err != nil { + return !isTransientOSSErr(err), err + } + s, origErr := bucket.GetObject(inputArtifact.OSS.Key) + if origErr == nil { + stream = s + return true, nil + } + if !IsOssErrCode(err, "NoSuchKey") { + return !isTransientOSSErr(origErr), fmt.Errorf("failed to get file: %w", origErr) + } + isDir, err := IsOssDirectory(bucket, inputArtifact.OSS.Key) + if err != nil { + return !isTransientOSSErr(err), fmt.Errorf("failed to test if %s/%s is a directory: %w", bucketName, inputArtifact.OSS.Key, err) + } + if !isDir { + return false, origErr + } + // directory case: + // todo: make a .tgz file which can be streamed to user + return false, errors.New(errors.CodeNotImplemented, "Directory Stream capability currently unimplemented for OSS") + }) + return stream, err } // Save stores an artifact to OSS compliant storage, e.g., uploading a local file to OSS bucket