Skip to content

Commit

Permalink
pkg/storage: add External implementation (#1587)
Browse files Browse the repository at this point in the history
* pkg/storage: add External implementation

* fix conflicts

* use newly instantiated client
  • Loading branch information
marwan-at-work authored Mar 27, 2020
1 parent a36be99 commit 3c4db4c
Show file tree
Hide file tree
Showing 12 changed files with 421 additions and 19 deletions.
7 changes: 7 additions & 0 deletions cmd/proxy/actions/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/azureblob"
"github.com/gomods/athens/pkg/storage/external"
"github.com/gomods/athens/pkg/storage/fs"
"github.com/gomods/athens/pkg/storage/gcp"
"github.com/gomods/athens/pkg/storage/mem"
Expand Down Expand Up @@ -60,6 +61,12 @@ func GetStorage(storageType string, storageConfig *config.StorageConfig, timeout
return nil, errors.E(op, "Invalid AzureBlob Storage Configuration")
}
return azureblob.New(storageConfig.AzureBlob, timeout)
case "external":
if storageConfig.External == nil {
return nil, errors.E(op, "Invalid External Storage Configuration")
}
// TODO(marwan-at-work): add client tracing
return external.NewClient(storageConfig.External.URL, nil), nil
default:
return nil, fmt.Errorf("storage type %s is unknown", storageType)
}
Expand Down
10 changes: 9 additions & 1 deletion config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ RobotsFile = "robots.txt"
Timeout = 300

# StorageType sets the type of storage backend the proxy will use.
# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob
# Possible values are memory, disk, mongo, gcp, minio, s3, azureblob, external
# Defaults to memory
# Env override: ATHENS_STORAGE_TYPE
StorageType = "memory"
Expand Down Expand Up @@ -467,3 +467,11 @@ SingleFlightType = "memory"
# Name of container in the blob storage
# Env override: ATHENS_AZURE_CONTAINER_NAME
ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME"

[Storage.External]
# URL is the external storage URL that Athens
# will use to interact with the backend storage layer.
# See https://docs.gomods.io/configuration/storage for implementation
# details.
# Env override: ATHENS_EXTERNAL_STORAGE_URL
URL = ""
40 changes: 40 additions & 0 deletions docs/content/configuration/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The Athens proxy supports many storage types:
- [Configuration:](#configuration-7)
- [Azure Blob Storage](#azure-blob-storage)
- [Configuration:](#configuration-8)
- [External Storage](#external-storage)
- [Configuration:](#configuration-9)

All of them can be configured using `config.toml` file. You need to set a valid driver in `StorageType` value or you can set it in environment variable `ATHENS_STORAGE_TYPE` on your server.
Also for most of the drivers you need to provide additional configuration data which will be described below.
Expand Down Expand Up @@ -336,6 +338,44 @@ It assumes that you already have the following:
# Env override: ATHENS_AZURE_CONTAINER_NAME
ContainerName = "MY_AZURE_BLOB_CONTAINER_NAME"

## External Storage

External storage lets Athens connect to your own implementation of a storage backend.
All you have to do is implement the (storage.Backend)[https://github.com/gomods/athens/blob/master/pkg/storage/backend.go#L4] interface and run it behind an http server.

Once you implement the backend server, you must then configure Athens to use that storage backend as such:

##### Configuration:
# Env override: ATHENS_STORAGE_TYPE
StorageType = "external"

[Storage]
[Storage.External]
# Env override: ATHENS_EXTERNAL_STORAGE_URL
URL = "http://localhost:9090"

Athens provides a convenience wrapper that lets you implement a storage backend with ease. See the following example:


```golang
package main

import (
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/external"
)

// TODO: implement storage.Backend
type myCustomStorage struct {
storage.Backend
}

func main() {
handler := external.NewServer(&myCustomStorage{})
http.ListenAndServe(":9090", handler)
}
```

## Running multiple Athens pointed at the same storage

Athens has the ability to run concurrently pointed at the same storage medium, using
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ func validateConfig(config Config) error {
return validate.Struct(config.Storage.S3)
case "azureblob":
return validate.Struct(config.Storage.AzureBlob)
case "external":
return validate.Struct(config.Storage.External)
default:
return fmt.Errorf("storage type %s is unknown", config.StorageType)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/external.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package config

// External specifies configuration for an external http storage
type External struct {
URL string `validate:"required" envconfig:"ATHENS_EXTERNAL_STORAGE_URL"`
}
1 change: 1 addition & 0 deletions pkg/config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type StorageConfig struct {
Mongo *MongoConfig
S3 *S3Config
AzureBlob *AzureBlobConfig
External *External
}
14 changes: 7 additions & 7 deletions pkg/storage/compliance/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func RunTests(t *testing.T, b storage.Backend, clearBackend func() error) {
// returns a KindNotFound error when asking for
// non existing modules.
func testNotFound(t *testing.T, b storage.Backend) {
mod, ver := "xxx", "yyy"
mod, ver := "github.com/gomods/athens", "yyy"
ctx := context.Background()

err := b.Delete(ctx, mod, ver)
Expand Down Expand Up @@ -106,7 +106,7 @@ func testListSuffix(t *testing.T, b storage.Backend) {
func testList(t *testing.T, b storage.Backend) {
ctx := context.Background()

modname := "listMod"
modname := "github.com/gomods/athens"
versions := []string{"v1.1.0", "v1.2.0", "v1.3.0"}
for _, version := range versions {
mock := getMockModule()
Expand All @@ -133,7 +133,7 @@ func testList(t *testing.T, b storage.Backend) {
// testGet saves and retrieves a module successfully.
func testGet(t *testing.T, b storage.Backend) {
ctx := context.Background()
modname := "getTestModule"
modname := "github.com/gomods/athens"
ver := "v1.2.3"
mock := getMockModule()
zipBts, _ := ioutil.ReadAll(mock.Zip)
Expand All @@ -146,7 +146,7 @@ func testGet(t *testing.T, b storage.Backend) {

mod, err := b.GoMod(ctx, modname, ver)
require.NoError(t, err)
require.Equal(t, mock.Mod, mod)
require.Equal(t, string(mock.Mod), string(mod))

zip, err := b.Zip(ctx, modname, ver)
require.NoError(t, err)
Expand All @@ -157,7 +157,7 @@ func testGet(t *testing.T, b storage.Backend) {

func testExists(t *testing.T, b storage.Backend) {
ctx := context.Background()
modname := "getTestModule"
modname := "github.com/gomods/athens"
ver := "v1.2.3"
mock := getMockModule()
zipBts, _ := ioutil.ReadAll(mock.Zip)
Expand All @@ -171,7 +171,7 @@ func testExists(t *testing.T, b storage.Backend) {

func testShouldNotExist(t *testing.T, b storage.Backend) {
ctx := context.Background()
mod := "shouldNotExist"
mod := "github.com/gomods/shouldNotExist"
ver := "v1.2.3-pre.1"
mock := getMockModule()
zipBts, _ := ioutil.ReadAll(mock.Zip)
Expand All @@ -193,7 +193,7 @@ func testShouldNotExist(t *testing.T, b storage.Backend) {
// afterwards.
func testDelete(t *testing.T, b storage.Backend) {
ctx := context.Background()
modname := "deleteModule"
modname := "github.com/gomods/athens"
version := fmt.Sprintf("%s%d", "delete", rand.Int())

mock := getMockModule()
Expand Down
183 changes: 183 additions & 0 deletions pkg/storage/external/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package external

import (
"bufio"
"context"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"strings"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/storage"
"golang.org/x/mod/module"
)

type service struct {
url string
c *http.Client
}

// NewClient returns an external storage client
func NewClient(url string, c *http.Client) storage.Backend {
if c == nil {
c = &http.Client{}
}
url = strings.TrimSuffix(url, "/")
return &service{url, c}
}

func (s *service) List(ctx context.Context, mod string) ([]string, error) {
const op errors.Op = "external.List"
body, err := s.getRequest(ctx, mod, "list", "")
if err != nil {
return nil, errors.E(op, err)
}
list := []string{}
scnr := bufio.NewScanner(body)
for scnr.Scan() {
list = append(list, scnr.Text())
}
if scnr.Err() != nil {
return nil, errors.E(op, scnr.Err())
}
return list, nil
}

func (s *service) Info(ctx context.Context, mod, ver string) ([]byte, error) {
const op errors.Op = "external.Info"
body, err := s.getRequest(ctx, mod, ver, "info")
if err != nil {
return nil, errors.E(op, err)
}
info, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.E(op, err)
}
return info, nil
}

func (s *service) GoMod(ctx context.Context, mod, ver string) ([]byte, error) {
const op errors.Op = "external.GoMod"
body, err := s.getRequest(ctx, mod, ver, "mod")
if err != nil {
return nil, errors.E(op, err)
}
modFile, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.E(op, err)
}
return modFile, nil
}

func (s *service) Zip(ctx context.Context, mod, ver string) (io.ReadCloser, error) {
const op errors.Op = "external.Zip"
body, err := s.getRequest(ctx, mod, ver, "zip")
if err != nil {
return nil, errors.E(op, err)
}
return body, nil
}

func (s *service) Save(ctx context.Context, mod, ver string, modFile []byte, zip io.Reader, info []byte) error {
const op errors.Op = "external.Save"
var err error
mod, err = module.EscapePath(mod)
if err != nil {
panic(err)
}
url := s.url + "/" + mod + "/@v/" + ver + ".save"
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
go func() {
err := upload(mw, modFile, info, zip)
pw.CloseWithError(err)
}()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, pr)
if err != nil {
return errors.E(op, err)
}
req.Header.Add("Content-Type", mw.FormDataContentType())
resp, err := s.c.Do(req)
if err != nil {
return errors.E(op, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
bts, _ := ioutil.ReadAll(resp.Body)
return errors.E(op, fmt.Errorf("unexpected status code: %v - body: %s", resp.StatusCode, bts), resp.StatusCode)
}
return nil
}

func (s *service) Delete(ctx context.Context, mod, ver string) error {
const op errors.Op = "external.Delete"
body, err := s.doRequest(ctx, "DELETE", mod, ver, "delete")
if err != nil {
return errors.E(op, err)
}
defer body.Close()
return nil
}

func upload(mw *multipart.Writer, mod, info []byte, zip io.Reader) error {
defer mw.Close()
infoW, err := mw.CreateFormFile("mod.info", "mod.info")
if err != nil {
return fmt.Errorf("error creating info file: %v", err)
}
_, err = infoW.Write(info)
if err != nil {
return fmt.Errorf("error writing info file: %v", err)
}
modW, err := mw.CreateFormFile("mod.mod", "mod.mod")
if err != nil {
return fmt.Errorf("error creating mod file: %v", err)
}
_, err = modW.Write(mod)
if err != nil {
return fmt.Errorf("error writing mod file: %v", err)
}
zipW, err := mw.CreateFormFile("mod.zip", "mod.zip")
if err != nil {
return fmt.Errorf("error creating zip file: %v", err)
}
_, err = io.Copy(zipW, zip)
if err != nil {
return fmt.Errorf("error writing zip file: %v", err)
}
return nil
}

func (s *service) getRequest(ctx context.Context, mod, ver, ext string) (io.ReadCloser, error) {
return s.doRequest(ctx, "GET", mod, ver, ext)
}

func (s *service) doRequest(ctx context.Context, method, mod, ver, ext string) (io.ReadCloser, error) {
const op errors.Op = "external.doRequest"
var err error
mod, err = module.EscapePath(mod)
if err != nil {
return nil, errors.E(op, err)
}
url := s.url + "/" + mod + "/@v/" + ver
if ext != "" {
url += "." + ext
}
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, errors.E(op, err)
}
resp, err := s.c.Do(req)
if err != nil {
return nil, errors.E(op, err)
}
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
return nil, errors.E(op, fmt.Errorf("none 200 status code: %v - body: %s", resp.StatusCode, body), resp.StatusCode)
}
return resp.Body, nil
}
22 changes: 22 additions & 0 deletions pkg/storage/external/external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package external

import (
"net/http/httptest"
"testing"

"github.com/gomods/athens/pkg/storage/compliance"
"github.com/gomods/athens/pkg/storage/mem"
)

func TestExternal(t *testing.T) {
strg, err := mem.NewStorage()
if err != nil {
t.Fatal(err)
}
handler := NewServer(strg)
srv := httptest.NewServer(handler)
defer srv.Close()
externalStrg := NewClient(srv.URL, nil)
clear := strg.(interface{ Clear() error }).Clear
compliance.RunTests(t, externalStrg, clear)
}
Loading

0 comments on commit 3c4db4c

Please sign in to comment.