From 4a6deecfa618e11d3ecc9f733fdeac4baacab13f Mon Sep 17 00:00:00 2001 From: "Yuan (Terry) Tang" Date: Mon, 18 Sep 2023 21:03:18 -0400 Subject: [PATCH] feat: Support artifact streaming for HTTP/Artifactory artifact driver (#11823) --- workflow/artifacts/http/http.go | 60 ++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/workflow/artifacts/http/http.go b/workflow/artifacts/http/http.go index ed2c300b116e..4c8578be1202 100644 --- a/workflow/artifacts/http/http.go +++ b/workflow/artifacts/http/http.go @@ -21,29 +21,22 @@ type ArtifactDriver struct { var _ common.ArtifactDriver = &ArtifactDriver{} -// Load reads the artifact from the HTTP URL -func (h *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { - lf, err := os.Create(path) - if err != nil { - return err - } - defer func() { - _ = lf.Close() - }() +func (h *ArtifactDriver) retrieveContent(inputArtifact *wfv1.Artifact) (http.Response, error) { var req *http.Request var url string + var err error if inputArtifact.Artifactory != nil && inputArtifact.HTTP == nil { url = inputArtifact.Artifactory.URL req, err = http.NewRequest(http.MethodGet, url, nil) if err != nil { - return err + return http.Response{}, err } req.SetBasicAuth(h.Username, h.Password) - } else { + } else if inputArtifact.Artifactory == nil && inputArtifact.HTTP != nil { url = inputArtifact.HTTP.URL req, err = http.NewRequest(http.MethodGet, url, nil) if err != nil { - return err + return http.Response{}, err } for _, h := range inputArtifact.HTTP.Headers { req.Header.Add(h.Name, h.Value) @@ -51,30 +44,51 @@ func (h *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { if h.Username != "" && h.Password != "" { req.SetBasicAuth(h.Username, h.Password) } + } else { + return http.Response{}, errors.InternalErrorf("Either Artifactory or HTTP artifact needs to be configured") } - res, err := h.Client.Do(req) + // Note that we will close the response body in either `Load()` + // or `ArtifactServer.returnArtifact()`, which is the caller of `OpenStream()`. + res, err := h.Client.Do(req) //nolint:bodyclose if err != nil { - return err + return http.Response{}, err } - defer func() { - _ = res.Body.Close() - }() if res.StatusCode == 404 { - return errors.New(errors.CodeNotFound, res.Status) + return http.Response{}, errors.New(errors.CodeNotFound, res.Status) } if res.StatusCode < 200 || res.StatusCode >= 300 { - return errors.InternalErrorf("loading file from %s failed with reason: %s", url, res.Status) + return http.Response{}, errors.InternalErrorf("loading content from %s failed with reason: %s", url, res.Status) } + return *res, nil +} +// Load reads the artifact from the HTTP URL +func (h *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { + lf, err := os.Create(path) + if err != nil { + return err + } + defer func() { + _ = lf.Close() + }() + res, err := h.retrieveContent(inputArtifact) + if err != nil { + return err + } + defer func() { + _ = res.Body.Close() + }() _, err = io.Copy(lf, res.Body) - return err } -func (h *ArtifactDriver) OpenStream(a *wfv1.Artifact) (io.ReadCloser, error) { - // todo: this is a temporary implementation which loads file to disk first - return common.LoadToStream(a, h) +func (h *ArtifactDriver) OpenStream(inputArtifact *wfv1.Artifact) (io.ReadCloser, error) { + res, err := h.retrieveContent(inputArtifact) + if err != nil { + return nil, err + } + return res.Body, nil } // Save writes the artifact to the URL