Skip to content

Commit

Permalink
Merge pull request #1 from tablelandnetwork/bcalza/resources
Browse files Browse the repository at this point in the history
changes for supporting big files
  • Loading branch information
brunocalza authored Feb 9, 2024
2 parents 2d81af5 + 771bee6 commit 29d0e14
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 38 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ run:
@HTTP_PORT=8081 \
PRIVATEKEY=$(shell cat ${pk_file}) \
PROOF=$(shell cat ${proof_file} | xxd -p | tr -d '\n') \
SPACEID=did:key:z6MkfKyf5T9keTQqNpYpSPasLi7WSvzy1SmT5gvYZSPEVLpp \
go run .
.PHONY: run

Expand Down
3 changes: 2 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (

type config struct {
PrivateKey string `default:""`
SpaceID string `default:""`
Proof string `default:""`
HTTP struct {
Port string `default:"8080"`
}

Log struct {
Human bool `default:"false"`
Debug bool `default:"false"`
}
TmpDir string `default:"/tmp"`
}

func initConfig() (*config, error) {
Expand Down
32 changes: 21 additions & 11 deletions handlers.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package main

import (
"bufio"
"encoding/hex"
"encoding/json"
"io"
"net/http"
"os"

"golang.org/x/exp/slog"
)

// Handlers groups a bunch of HTTP handlers.
type Handlers struct {
uploader *Uploader
tmpDir string
}

// UploadResponse ...
Expand All @@ -27,19 +29,26 @@ func (h *Handlers) Health(rw http.ResponseWriter, _ *http.Request) {

// Upload handles POST /api/v1/upload.
func (h *Handlers) Upload(rw http.ResponseWriter, r *http.Request) {
f, _, err := r.FormFile("file")
reader, err := r.MultipartReader()
if err != nil {
slog.Error("form file", err)
rw.WriteHeader(http.StatusInternalServerError)
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}

// parse file field
p, err := reader.NextPart()
if err != nil && err != io.EOF {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}

if p.FormName() != "file" {
http.Error(rw, "file is expected", http.StatusBadRequest)
return
}
defer func() {
if err := f.Close(); err != nil {
slog.Error("close file", err)
}
}()

result, err := h.uploader.Upload(r.Context(), f)
buf := bufio.NewReader(p)
result, err := h.uploader.Upload(r.Context(), buf)
if err != nil {
slog.Error("file upload", err)
rw.WriteHeader(http.StatusInternalServerError)
Expand All @@ -66,12 +75,13 @@ func initHandlers(cfg *config) (*Handlers, error) {
return nil, err
}

uploader, err := NewUploader(cfg.PrivateKey, proof, os.TempDir())
uploader, err := NewUploader(cfg.SpaceID, cfg.PrivateKey, proof, cfg.TmpDir)
if err != nil {
return nil, err
}

return &Handlers{
uploader: uploader,
tmpDir: cfg.TmpDir,
}, nil
}
32 changes: 29 additions & 3 deletions k8s/base/api.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: ephemeral-ssd
provisioner: pd.csi.storage.gke.io
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
parameters:
type: pd-standard
---
apiVersion: v1
kind: Service
metadata:
Expand Down Expand Up @@ -49,8 +59,8 @@ spec:
name: configmap-basin-w3s
resources:
requests:
cpu: 250m
memory: 512Mi
cpu: 500m
memory: 1Gi
readinessProbe:
httpGet:
path: /api/v1/health
Expand All @@ -64,4 +74,20 @@ spec:
port: 8080
initialDelaySeconds: 15
periodSeconds: 20
failureThreshold: 3
failureThreshold: 3
volumeMounts:
- mountPath: "/tmp"
name: tmp-volume
volumes:
- name: tmp-volume
ephemeral:
volumeClaimTemplate:
metadata:
labels:
type: tmp-volume
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "ephemeral-ssd"
resources:
requests:
storage: 20Gi
3 changes: 2 additions & 1 deletion k8s/staging/configBasinW3s.env
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
HTTP_PORT=8080
HTTP_PORT=8080
SPACEID=did:key:z6MkfKyf5T9keTQqNpYpSPasLi7WSvzy1SmT5gvYZSPEVLpp
6 changes: 3 additions & 3 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func TestUploadApi(t *testing.T) {
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)

defer require.NoError(t, res.Body.Close())
out, err := io.ReadAll(res.Body)
require.NoError(t, err)
require.NoError(t, res.Body.Close())

var r UploadResponse

err = json.Unmarshal(out, &r)
require.NoError(t, err)

require.Equal(t, "bafkreibz5pyalh2oqppwsm4lf3czqzcg6ockskzwik5lte5qtrxihqerdy", r.Root)
require.Equal(t, "bagbaieramjiiu3p4ufmfz2jugnoqfrdptzwmaw466oxyd2afxs6qaclpy72q", r.Shard)
require.Equal(t, "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", r.Root)
require.Equal(t, "bagbaierakdtubdzo53sy6crqkmmwdhomjse3vj5yijkvbopbwt66zqbangpa", r.Shard)
}
40 changes: 21 additions & 19 deletions uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ import (
"github.com/web3-storage/go-w3up/client"
"github.com/web3-storage/go-w3up/cmd/util"
w3sdelegation "github.com/web3-storage/go-w3up/delegation"
"golang.org/x/exp/slog"
)

// SpaceID is the id of a Web3 Storage space.
const SpaceID = "did:key:z6Mkv4YhtLqTKWis8KfLWGhUEcHFPYgH97BrCZia7xsUxMWj"

// w3s interface to make it easier to mock w3s.
type w3s interface {
upload(cid.Cid, string) (cid.Cid, error)
Expand All @@ -54,8 +50,8 @@ type UploadResult struct {
}

// NewUploader returns a new uploader.
func NewUploader(sk string, proofBytes []byte, tmpDir string) (*Uploader, error) {
client, err := newW3sclient(sk, proofBytes)
func NewUploader(spaceID string, sk string, proofBytes []byte, tmpDir string) (*Uploader, error) {
client, err := newW3sclient(spaceID, sk, proofBytes)
if err != nil {
return nil, fmt.Errorf("creating new w3s client: %s", err)
}
Expand All @@ -66,14 +62,14 @@ func NewUploader(sk string, proofBytes []byte, tmpDir string) (*Uploader, error)
}

// Upload uploads the content of a io.Reader.
func (u *Uploader) Upload(ctx context.Context, r io.Reader) (UploadResult, error) {
func (u *Uploader) Upload(ctx context.Context, r io.Reader) (_ UploadResult, err error) {
dest, err := u.saveTmp(r)
if err != nil {
return UploadResult{}, fmt.Errorf("failed saving into tmp: %s", err)
}
defer func() {
if err := u.removeTmp(dest); err != nil {
slog.Error("failed to remove tmp file", err)
if cErr := u.removeTmp(dest); err == nil {
err = cErr
}
}()

Expand All @@ -93,7 +89,7 @@ func (u *Uploader) Upload(ctx context.Context, r io.Reader) (UploadResult, error
}, nil
}

func (u *Uploader) saveTmp(r io.Reader) (string, error) {
func (u *Uploader) saveTmp(r io.Reader) (_ string, err error) {
randBytes := make([]byte, 16)
_, _ = rand.Read(randBytes)
dest := filepath.Join(u.tmpDir, hex.EncodeToString(randBytes))
Expand All @@ -103,8 +99,9 @@ func (u *Uploader) saveTmp(r io.Reader) (string, error) {
return "", err
}
defer func() {
if err := f.Close(); err != nil {
slog.Error("close file", err)
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

Expand Down Expand Up @@ -163,7 +160,7 @@ func (*Uploader) removeTmp(dest string) error {
return nil
}

func writeFile(ctx context.Context, bs *blockstore.ReadWrite, path string) (cid.Cid, uint64, error) {
func writeFile(ctx context.Context, bs *blockstore.ReadWrite, path string) (_ cid.Cid, sz uint64, err error) {
ls := cidlink.DefaultLinkSystem()
ls.TrustedStorage = true
ls.StorageReadOpener = func(_ ipld.LinkContext, l ipld.Link) (io.Reader, error) {
Expand Down Expand Up @@ -200,8 +197,9 @@ func writeFile(ctx context.Context, bs *blockstore.ReadWrite, path string) (cid.
return cid.Undef, 0, err
}
defer func() {
if err := f.Close(); err != nil {
slog.Error("close file", err)
// Close file and override return error type if it is nil.
if cerr := f.Close(); err == nil {
err = cerr
}
}()

Expand All @@ -223,7 +221,7 @@ type w3sclient struct {
proof delegation.Delegation
}

func newW3sclient(sk string, proofBytes []byte) (*w3sclient, error) {
func newW3sclient(spaceID string, sk string, proofBytes []byte) (*w3sclient, error) {
// private key to sign UCAN invocations with
issuer, err := signer.Parse(sk)
if err != nil {
Expand All @@ -236,7 +234,7 @@ func newW3sclient(sk string, proofBytes []byte) (*w3sclient, error) {
return nil, fmt.Errorf("failed to extract proof: %s", err)
}

space, err := did.Parse(SpaceID)
space, err := did.Parse(spaceID)
if err != nil {
return nil, fmt.Errorf("failed to parse space id: %s", err)
}
Expand All @@ -248,8 +246,8 @@ func newW3sclient(sk string, proofBytes []byte) (*w3sclient, error) {
}, nil
}

func (c *w3sclient) upload(root cid.Cid, dest string) (cid.Cid, error) {
// no need to close the file here, because the http client will do.
func (c *w3sclient) upload(root cid.Cid, dest string) (_ cid.Cid, err error) {
// no need to close the file because the http client is doing that
f, err := os.Open(fmt.Sprintf("%s.car", dest))
if err != nil {
return cid.Undef, err
Expand Down Expand Up @@ -278,6 +276,10 @@ func (c *w3sclient) upload(root cid.Cid, dest string) (cid.Cid, error) {
return cid.Undef, err
}

if rcpt.Out().Error() != nil {
return cid.Undef, fmt.Errorf(rcpt.Out().Error().Message)
}

if rcpt.Out().Ok().Status == "upload" {
_, err := f.Seek(0, io.SeekStart)
if err != nil {
Expand Down

0 comments on commit 29d0e14

Please sign in to comment.