From 3c4db4ce8618bcc2daae65fcbfab6c77d0fbfe57 Mon Sep 17 00:00:00 2001 From: Marwan Sulaiman Date: Fri, 27 Mar 2020 13:35:52 -0400 Subject: [PATCH] pkg/storage: add External implementation (#1587) * pkg/storage: add External implementation * fix conflicts * use newly instantiated client --- cmd/proxy/actions/storage.go | 7 + config.dev.toml | 10 +- docs/content/configuration/storage.md | 40 ++++++ pkg/config/config.go | 2 + pkg/config/external.go | 6 + pkg/config/storage.go | 1 + pkg/storage/compliance/tests.go | 14 +- pkg/storage/external/client.go | 183 ++++++++++++++++++++++++++ pkg/storage/external/external_test.go | 22 ++++ pkg/storage/external/server.go | 133 +++++++++++++++++++ pkg/storage/fs/fs.go | 8 ++ pkg/storage/fs/fs_test.go | 14 +- 12 files changed, 421 insertions(+), 19 deletions(-) create mode 100644 pkg/config/external.go create mode 100644 pkg/storage/external/client.go create mode 100644 pkg/storage/external/external_test.go create mode 100644 pkg/storage/external/server.go diff --git a/cmd/proxy/actions/storage.go b/cmd/proxy/actions/storage.go index e2813af98..7defcb122 100644 --- a/cmd/proxy/actions/storage.go +++ b/cmd/proxy/actions/storage.go @@ -9,6 +9,7 @@ import ( "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/storage" "github.com/gomods/athens/pkg/storage/azureblob" + "github.com/gomods/athens/pkg/storage/external" "github.com/gomods/athens/pkg/storage/fs" "github.com/gomods/athens/pkg/storage/gcp" "github.com/gomods/athens/pkg/storage/mem" @@ -60,6 +61,12 @@ func GetStorage(storageType string, storageConfig *config.StorageConfig, timeout return nil, errors.E(op, "Invalid AzureBlob Storage Configuration") } return azureblob.New(storageConfig.AzureBlob, timeout) + case "external": + if storageConfig.External == nil { + return nil, errors.E(op, "Invalid External Storage Configuration") + } + // TODO(marwan-at-work): add client tracing + return external.NewClient(storageConfig.External.URL, nil), nil default: return nil, fmt.Errorf("storage type %s is unknown", storageType) } diff --git a/config.dev.toml b/config.dev.toml index bfe33510c..b27e12911 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -113,7 +113,7 @@ RobotsFile = "robots.txt" Timeout = 300 # StorageType sets the type of storage backend the proxy will use. -# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob +# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob, external # Defaults to memory # Env override: ATHENS_STORAGE_TYPE StorageType = "memory" @@ -467,3 +467,11 @@ SingleFlightType = "memory" # Name of container in the blob storage # Env override: ATHENS_AZURE_CONTAINER_NAME ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME" + + [Storage.External] + # URL is the external storage URL that Athens + # will use to interact with the backend storage layer. + # See https://docs.gomods.io/configuration/storage for implementation + # details. + # Env override: ATHENS_EXTERNAL_STORAGE_URL + URL = "" diff --git a/docs/content/configuration/storage.md b/docs/content/configuration/storage.md index 4525d5365..797ded00d 100644 --- a/docs/content/configuration/storage.md +++ b/docs/content/configuration/storage.md @@ -27,6 +27,8 @@ The Athens proxy supports many storage types: - [Configuration:](#configuration-7) - [Azure Blob Storage](#azure-blob-storage) - [Configuration:](#configuration-8) +- [External Storage](#external-storage) + - [Configuration:](#configuration-9) All of them can be configured using `config.toml` file. You need to set a valid driver in `StorageType` value or you can set it in environment variable `ATHENS_STORAGE_TYPE` on your server. Also for most of the drivers you need to provide additional configuration data which will be described below. @@ -336,6 +338,44 @@ It assumes that you already have the following: # Env override: ATHENS_AZURE_CONTAINER_NAME ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME" +## External Storage + +External storage lets Athens connect to your own implementation of a storage backend. +All you have to do is implement the (storage.Backend)[https://github.com/gomods/athens/blob/master/pkg/storage/backend.go#L4] interface and run it behind an http server. + +Once you implement the backend server, you must then configure Athens to use that storage backend as such: + +##### Configuration: + # Env override: ATHENS_STORAGE_TYPE + StorageType = "external" + + [Storage] + [Storage.External] + # Env override: ATHENS_EXTERNAL_STORAGE_URL + URL = "http://localhost:9090" + +Athens provides a convenience wrapper that lets you implement a storage backend with ease. See the following example: + + +```golang +package main + +import ( + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/external" +) + +// TODO: implement storage.Backend +type myCustomStorage struct { + storage.Backend +} + +func main() { + handler := external.NewServer(&myCustomStorage{}) + http.ListenAndServe(":9090", handler) +} +``` + ## Running multiple Athens pointed at the same storage Athens has the ability to run concurrently pointed at the same storage medium, using diff --git a/pkg/config/config.go b/pkg/config/config.go index 3c82b4554..2975e942a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -286,6 +286,8 @@ func validateConfig(config Config) error { return validate.Struct(config.Storage.S3) case "azureblob": return validate.Struct(config.Storage.AzureBlob) + case "external": + return validate.Struct(config.Storage.External) default: return fmt.Errorf("storage type %s is unknown", config.StorageType) } diff --git a/pkg/config/external.go b/pkg/config/external.go new file mode 100644 index 000000000..a3f70b7aa --- /dev/null +++ b/pkg/config/external.go @@ -0,0 +1,6 @@ +package config + +// External specifies configuration for an external http storage +type External struct { + URL string `validate:"required" envconfig:"ATHENS_EXTERNAL_STORAGE_URL"` +} diff --git a/pkg/config/storage.go b/pkg/config/storage.go index 8bd150ca0..749977bd8 100644 --- a/pkg/config/storage.go +++ b/pkg/config/storage.go @@ -8,4 +8,5 @@ type StorageConfig struct { Mongo *MongoConfig S3 *S3Config AzureBlob *AzureBlobConfig + External *External } diff --git a/pkg/storage/compliance/tests.go b/pkg/storage/compliance/tests.go index 29f57cdb5..1e0835a73 100644 --- a/pkg/storage/compliance/tests.go +++ b/pkg/storage/compliance/tests.go @@ -33,7 +33,7 @@ func RunTests(t *testing.T, b storage.Backend, clearBackend func() error) { // returns a KindNotFound error when asking for // non existing modules. func testNotFound(t *testing.T, b storage.Backend) { - mod, ver := "xxx", "yyy" + mod, ver := "github.com/gomods/athens", "yyy" ctx := context.Background() err := b.Delete(ctx, mod, ver) @@ -106,7 +106,7 @@ func testListSuffix(t *testing.T, b storage.Backend) { func testList(t *testing.T, b storage.Backend) { ctx := context.Background() - modname := "listMod" + modname := "github.com/gomods/athens" versions := []string{"v1.1.0", "v1.2.0", "v1.3.0"} for _, version := range versions { mock := getMockModule() @@ -133,7 +133,7 @@ func testList(t *testing.T, b storage.Backend) { // testGet saves and retrieves a module successfully. func testGet(t *testing.T, b storage.Backend) { ctx := context.Background() - modname := "getTestModule" + modname := "github.com/gomods/athens" ver := "v1.2.3" mock := getMockModule() zipBts, _ := ioutil.ReadAll(mock.Zip) @@ -146,7 +146,7 @@ func testGet(t *testing.T, b storage.Backend) { mod, err := b.GoMod(ctx, modname, ver) require.NoError(t, err) - require.Equal(t, mock.Mod, mod) + require.Equal(t, string(mock.Mod), string(mod)) zip, err := b.Zip(ctx, modname, ver) require.NoError(t, err) @@ -157,7 +157,7 @@ func testGet(t *testing.T, b storage.Backend) { func testExists(t *testing.T, b storage.Backend) { ctx := context.Background() - modname := "getTestModule" + modname := "github.com/gomods/athens" ver := "v1.2.3" mock := getMockModule() zipBts, _ := ioutil.ReadAll(mock.Zip) @@ -171,7 +171,7 @@ func testExists(t *testing.T, b storage.Backend) { func testShouldNotExist(t *testing.T, b storage.Backend) { ctx := context.Background() - mod := "shouldNotExist" + mod := "github.com/gomods/shouldNotExist" ver := "v1.2.3-pre.1" mock := getMockModule() zipBts, _ := ioutil.ReadAll(mock.Zip) @@ -193,7 +193,7 @@ func testShouldNotExist(t *testing.T, b storage.Backend) { // afterwards. func testDelete(t *testing.T, b storage.Backend) { ctx := context.Background() - modname := "deleteModule" + modname := "github.com/gomods/athens" version := fmt.Sprintf("%s%d", "delete", rand.Int()) mock := getMockModule() diff --git a/pkg/storage/external/client.go b/pkg/storage/external/client.go new file mode 100644 index 000000000..b131e8124 --- /dev/null +++ b/pkg/storage/external/client.go @@ -0,0 +1,183 @@ +package external + +import ( + "bufio" + "context" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "strings" + + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/storage" + "golang.org/x/mod/module" +) + +type service struct { + url string + c *http.Client +} + +// NewClient returns an external storage client +func NewClient(url string, c *http.Client) storage.Backend { + if c == nil { + c = &http.Client{} + } + url = strings.TrimSuffix(url, "/") + return &service{url, c} +} + +func (s *service) List(ctx context.Context, mod string) ([]string, error) { + const op errors.Op = "external.List" + body, err := s.getRequest(ctx, mod, "list", "") + if err != nil { + return nil, errors.E(op, err) + } + list := []string{} + scnr := bufio.NewScanner(body) + for scnr.Scan() { + list = append(list, scnr.Text()) + } + if scnr.Err() != nil { + return nil, errors.E(op, scnr.Err()) + } + return list, nil +} + +func (s *service) Info(ctx context.Context, mod, ver string) ([]byte, error) { + const op errors.Op = "external.Info" + body, err := s.getRequest(ctx, mod, ver, "info") + if err != nil { + return nil, errors.E(op, err) + } + info, err := ioutil.ReadAll(body) + if err != nil { + return nil, errors.E(op, err) + } + return info, nil +} + +func (s *service) GoMod(ctx context.Context, mod, ver string) ([]byte, error) { + const op errors.Op = "external.GoMod" + body, err := s.getRequest(ctx, mod, ver, "mod") + if err != nil { + return nil, errors.E(op, err) + } + modFile, err := ioutil.ReadAll(body) + if err != nil { + return nil, errors.E(op, err) + } + return modFile, nil +} + +func (s *service) Zip(ctx context.Context, mod, ver string) (io.ReadCloser, error) { + const op errors.Op = "external.Zip" + body, err := s.getRequest(ctx, mod, ver, "zip") + if err != nil { + return nil, errors.E(op, err) + } + return body, nil +} + +func (s *service) Save(ctx context.Context, mod, ver string, modFile []byte, zip io.Reader, info []byte) error { + const op errors.Op = "external.Save" + var err error + mod, err = module.EscapePath(mod) + if err != nil { + panic(err) + } + url := s.url + "/" + mod + "/@v/" + ver + ".save" + pr, pw := io.Pipe() + mw := multipart.NewWriter(pw) + go func() { + err := upload(mw, modFile, info, zip) + pw.CloseWithError(err) + }() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, pr) + if err != nil { + return errors.E(op, err) + } + req.Header.Add("Content-Type", mw.FormDataContentType()) + resp, err := s.c.Do(req) + if err != nil { + return errors.E(op, err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + bts, _ := ioutil.ReadAll(resp.Body) + return errors.E(op, fmt.Errorf("unexpected status code: %v - body: %s", resp.StatusCode, bts), resp.StatusCode) + } + return nil +} + +func (s *service) Delete(ctx context.Context, mod, ver string) error { + const op errors.Op = "external.Delete" + body, err := s.doRequest(ctx, "DELETE", mod, ver, "delete") + if err != nil { + return errors.E(op, err) + } + defer body.Close() + return nil +} + +func upload(mw *multipart.Writer, mod, info []byte, zip io.Reader) error { + defer mw.Close() + infoW, err := mw.CreateFormFile("mod.info", "mod.info") + if err != nil { + return fmt.Errorf("error creating info file: %v", err) + } + _, err = infoW.Write(info) + if err != nil { + return fmt.Errorf("error writing info file: %v", err) + } + modW, err := mw.CreateFormFile("mod.mod", "mod.mod") + if err != nil { + return fmt.Errorf("error creating mod file: %v", err) + } + _, err = modW.Write(mod) + if err != nil { + return fmt.Errorf("error writing mod file: %v", err) + } + zipW, err := mw.CreateFormFile("mod.zip", "mod.zip") + if err != nil { + return fmt.Errorf("error creating zip file: %v", err) + } + _, err = io.Copy(zipW, zip) + if err != nil { + return fmt.Errorf("error writing zip file: %v", err) + } + return nil +} + +func (s *service) getRequest(ctx context.Context, mod, ver, ext string) (io.ReadCloser, error) { + return s.doRequest(ctx, "GET", mod, ver, ext) +} + +func (s *service) doRequest(ctx context.Context, method, mod, ver, ext string) (io.ReadCloser, error) { + const op errors.Op = "external.doRequest" + var err error + mod, err = module.EscapePath(mod) + if err != nil { + return nil, errors.E(op, err) + } + url := s.url + "/" + mod + "/@v/" + ver + if ext != "" { + url += "." + ext + } + req, err := http.NewRequestWithContext(ctx, method, url, nil) + if err != nil { + return nil, errors.E(op, err) + } + resp, err := s.c.Do(req) + if err != nil { + return nil, errors.E(op, err) + } + if resp.StatusCode != 200 { + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + return nil, errors.E(op, fmt.Errorf("none 200 status code: %v - body: %s", resp.StatusCode, body), resp.StatusCode) + } + return resp.Body, nil +} diff --git a/pkg/storage/external/external_test.go b/pkg/storage/external/external_test.go new file mode 100644 index 000000000..0378c52b6 --- /dev/null +++ b/pkg/storage/external/external_test.go @@ -0,0 +1,22 @@ +package external + +import ( + "net/http/httptest" + "testing" + + "github.com/gomods/athens/pkg/storage/compliance" + "github.com/gomods/athens/pkg/storage/mem" +) + +func TestExternal(t *testing.T) { + strg, err := mem.NewStorage() + if err != nil { + t.Fatal(err) + } + handler := NewServer(strg) + srv := httptest.NewServer(handler) + defer srv.Close() + externalStrg := NewClient(srv.URL, nil) + clear := strg.(interface{ Clear() error }).Clear + compliance.RunTests(t, externalStrg, clear) +} diff --git a/pkg/storage/external/server.go b/pkg/storage/external/server.go new file mode 100644 index 000000000..8b345f8ca --- /dev/null +++ b/pkg/storage/external/server.go @@ -0,0 +1,133 @@ +package external + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + + "github.com/gomods/athens/pkg/download" + "github.com/gomods/athens/pkg/errors" + "github.com/gomods/athens/pkg/paths" + "github.com/gomods/athens/pkg/storage" + "github.com/gorilla/mux" + "golang.org/x/mod/zip" +) + +// NewServer takes a storage.Backend implementation of your +// choice, and returns a new http.Handler that Athens can +// reach out to for storage operations +func NewServer(strg storage.Backend) http.Handler { + r := mux.NewRouter() + r.HandleFunc(download.PathList, func(w http.ResponseWriter, r *http.Request) { + mod := mux.Vars(r)["module"] + list, err := strg.List(r.Context(), mod) + if err != nil { + http.Error(w, err.Error(), errors.Kind(err)) + return + } + fmt.Fprintf(w, "%s", strings.Join(list, "\n")) + }).Methods(http.MethodGet) + r.HandleFunc(download.PathVersionInfo, func(w http.ResponseWriter, r *http.Request) { + params, err := paths.GetAllParams(r) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + info, err := strg.Info(r.Context(), params.Module, params.Version) + if err != nil { + http.Error(w, err.Error(), errors.Kind(err)) + return + } + w.Write(info) + }).Methods(http.MethodGet) + r.HandleFunc(download.PathVersionModule, func(w http.ResponseWriter, r *http.Request) { + params, err := paths.GetAllParams(r) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + mod, err := strg.GoMod(r.Context(), params.Module, params.Version) + if err != nil { + http.Error(w, err.Error(), errors.Kind(err)) + return + } + w.Write(mod) + }).Methods(http.MethodGet) + r.HandleFunc(download.PathVersionZip, func(w http.ResponseWriter, r *http.Request) { + params, err := paths.GetAllParams(r) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + zip, err := strg.Zip(r.Context(), params.Module, params.Version) + if err != nil { + http.Error(w, err.Error(), errors.Kind(err)) + return + } + defer zip.Close() + io.Copy(w, zip) + }).Methods(http.MethodGet) + r.HandleFunc("/{module:.+}/@v/{version}.save", func(w http.ResponseWriter, r *http.Request) { + params, err := paths.GetAllParams(r) + if err != nil { + fmt.Println("REALLY?", err) + http.Error(w, err.Error(), 400) + return + } + err = r.ParseMultipartForm(zip.MaxZipFile + zip.MaxGoMod) + if err != nil { + fmt.Printf("parse: %v\n", err) + http.Error(w, err.Error(), 400) + return + } + infoFile, _, err := r.FormFile("mod.info") + if err != nil { + http.Error(w, err.Error(), 400) + return + } + defer infoFile.Close() + info, err := ioutil.ReadAll(infoFile) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + modReader, _, err := r.FormFile("mod.mod") + if err != nil { + http.Error(w, err.Error(), 400) + return + } + defer modReader.Close() + modFile, err := ioutil.ReadAll(modReader) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + modZ, _, err := r.FormFile("mod.zip") + if err != nil { + http.Error(w, err.Error(), 400) + return + } + defer modZ.Close() + err = strg.Save(r.Context(), params.Module, params.Version, modFile, modZ, info) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + }).Methods(http.MethodPost) + + r.HandleFunc("/{module:.+}/@v/{version}.delete", func(w http.ResponseWriter, r *http.Request) { + params, err := paths.GetAllParams(r) + if err != nil { + http.Error(w, err.Error(), 400) + return + } + err = strg.Delete(r.Context(), params.Module, params.Version) + if err != nil { + http.Error(w, err.Error(), errors.Kind(err)) + return + } + }).Methods(http.MethodDelete) + return r +} diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go index 9f753ff60..bfc3bbf83 100644 --- a/pkg/storage/fs/fs.go +++ b/pkg/storage/fs/fs.go @@ -2,6 +2,7 @@ package fs import ( "fmt" + "os" "path/filepath" "github.com/gomods/athens/pkg/errors" @@ -37,3 +38,10 @@ func NewStorage(rootDir string, filesystem afero.Fs) (storage.Backend, error) { } return &storageImpl{rootDir: rootDir, filesystem: filesystem}, nil } + +func (s *storageImpl) Clear() error { + if err := s.filesystem.RemoveAll(s.rootDir); err != nil { + return err + } + return s.filesystem.Mkdir(s.rootDir, os.ModeDir|os.ModePerm) +} diff --git a/pkg/storage/fs/fs_test.go b/pkg/storage/fs/fs_test.go index 32c9d204b..c9a2da71a 100644 --- a/pkg/storage/fs/fs_test.go +++ b/pkg/storage/fs/fs_test.go @@ -1,7 +1,6 @@ package fs import ( - "os" "testing" "github.com/gomods/athens/pkg/storage/compliance" @@ -12,27 +11,20 @@ import ( func TestBackend(t *testing.T) { fs := afero.NewMemMapFs() b := getStorage(t, fs) - compliance.RunTests(t, b, b.clear) + compliance.RunTests(t, b, b.Clear) fs.RemoveAll(b.rootDir) } func BenchmarkBackend(b *testing.B) { fs := afero.NewOsFs() backend := getStorage(b, fs) - compliance.RunBenchmarks(b, backend, backend.clear) + compliance.RunBenchmarks(b, backend, backend.Clear) fs.RemoveAll(backend.rootDir) } func BenchmarkMemory(b *testing.B) { backend := getStorage(b, afero.NewMemMapFs()) - compliance.RunBenchmarks(b, backend, backend.clear) -} - -func (s *storageImpl) clear() error { - if err := s.filesystem.RemoveAll(s.rootDir); err != nil { - return err - } - return s.filesystem.Mkdir(s.rootDir, os.ModeDir|os.ModePerm) + compliance.RunBenchmarks(b, backend, backend.Clear) } func getStorage(tb testing.TB, fs afero.Fs) *storageImpl {