Skip to content

Commit

Permalink
feat:add minio fs
Browse files Browse the repository at this point in the history
  • Loading branch information
geebytes committed Jun 28, 2024
1 parent 9528040 commit 405cc74
Show file tree
Hide file tree
Showing 12 changed files with 324 additions and 132 deletions.
6 changes: 4 additions & 2 deletions gateway/formdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type FormDataDecoder struct {
}

func (f *FormDataMarshaler) NewDecoder(r io.Reader) runtime.Decoder {
// log.Printf("new form data decoder")
return &FormDataDecoder{r: r}
}

Expand All @@ -45,14 +46,14 @@ func (f *FormDataDecoder) Decode(v interface{}) error {
}

}

// log.Printf("decode form-data")
// 使用multipart.Reader来解析formData
reader := multipart.NewReader(f.r, f.boundary)
formData, err := reader.ReadForm(32 << 20) // 32MB是formData的最大内存使用
if err != nil && err != io.EOF {
return err
}

// log.Printf("form data file:%v", len(formData.File))
for key, files := range formData.File {
file := files[0]
fd, err := file.Open()
Expand All @@ -68,6 +69,7 @@ func (f *FormDataDecoder) Decode(v interface{}) error {
if _, ok := formData.Value[key]; !ok {
formData.Value[key] = make([]string, 0)
}
// log.Printf("recv file %s, size %d", file.Filename, len(fileBytes))
formData.Value[key] = append(formData.Value[key], string(fileBytes))

}
Expand Down
2 changes: 2 additions & 0 deletions gateway/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -385,6 +386,7 @@ func (h *HttpEndpointImpl) RegisterHandlerClient(ctx context.Context, pd Protobu
if req.Header.Get("accept") == "" || req.Header.Get("accept") == "*/*" {
req.Header.Set("accept", "application/json")
}
log.Printf("request content-type:%s", req.Header.Get("content-type"))
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
Expand Down
12 changes: 7 additions & 5 deletions gateway/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gateway

import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -118,23 +119,23 @@ func IncomingHeadersToMetadata(ctx context.Context, req *http.Request) metadata.
accessKey := md.Get(XAccessKey)
apikey:=md.Get(XApiKey)
author := ""
idType := gosdk.UidType
// idType := gosdk.UidType
if len(xuid) > 0 {
author = xuid[0]
}
if author == "" && len(accessKey) > 0 {
author = accessKey[0]
idType = gosdk.AccessKeyType
// idType = gosdk.AccessKeyType
}
if author == ""&& len(apikey)>0{
author = apikey[0]
idType = gosdk.ApiKeyType
// idType = gosdk.ApiKeyType
}
if author == "" {
return md
}
md.Set(XIdentity, author)
md.Set(XIdentityType, idType)
// md.Set(XIdentity, author)
// md.Set(XIdentityType, idType)

return md
}
Expand Down Expand Up @@ -247,6 +248,7 @@ func HandleErrorWithLogger(logger logger.Logger) runtime.ErrorHandlerFunc {
"status": statusCode,
},
)
fmt.Printf("error type:%T, error:%v", err, err)
if _, ok := metadata.FromIncomingContext(ctx); !ok {
md := IncomingHeadersToMetadata(ctx, req)
ctx = metadata.NewIncomingContext(ctx, md)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/smartystreets/goconvey v1.8.1
github.com/spark-lence/tiga v0.0.0-20240617161215-fd493c5e06ac
github.com/spf13/cobra v1.8.0
google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3
google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
)
Expand Down Expand Up @@ -80,7 +80,7 @@ require (
require (
github.com/agiledragon/gomonkey/v2 v2.11.0
github.com/begonia-org/go-loadbalancer v0.0.0-20240519060752-71ca464f0f1a
github.com/begonia-org/go-sdk v0.0.0-20240617104023-3182694e4a5f
github.com/begonia-org/go-sdk v0.0.0-20240628071225-2864d45934ea
github.com/go-git/go-git/v5 v5.11.0
github.com/go-playground/validator/v10 v10.19.0
github.com/gorilla/websocket v1.5.0
Expand Down Expand Up @@ -139,7 +139,7 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
)

Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/begonia-org/go-loadbalancer v0.0.0-20240519060752-71ca464f0f1a h1:Mpw
github.com/begonia-org/go-loadbalancer v0.0.0-20240519060752-71ca464f0f1a/go.mod h1:crPS67sfgmgv47psftwfmTMbmTfdepVm8MPeqApINlI=
github.com/begonia-org/go-sdk v0.0.0-20240617104023-3182694e4a5f h1:1roWosaPjA7vE5CnnF6irDWt05SLD57bsHLun4HYeLU=
github.com/begonia-org/go-sdk v0.0.0-20240617104023-3182694e4a5f/go.mod h1:I70a3fiAADGrOoOC3lv408rFcTRhTwLt3pwr6cQwB4Y=
github.com/begonia-org/go-sdk v0.0.0-20240628071225-2864d45934ea h1:jdDBLZVsGKfmF/V+U4WUrz4Hzd3/62GqMIly9gnSppw=
github.com/begonia-org/go-sdk v0.0.0-20240628071225-2864d45934ea/go.mod h1:I70a3fiAADGrOoOC3lv408rFcTRhTwLt3pwr6cQwB4Y=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
Expand Down Expand Up @@ -401,8 +403,12 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3 h1:QW9+G6Fir4VcRXVH8x3LilNAb6cxBGLa6+GM4hRwexE=
google.golang.org/genproto/googleapis/api v0.0.0-20240610135401-a8a62080eff3/go.mod h1:kdrSS/OiLkPrNUpzD4aHgCq2rVuC/YRxok32HXZ4vRE=
google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d h1:Aqf0fiIdUQEj0Gn9mKFFXoQfTTEaNopWpfVyYADxiSg=
google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Od4k8V1LQSizPRUK4OzZ7TBE/20k+jPczUDAEyvn69Y=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3 h1:9Xyg6I9IWQZhRVfCWjKK+l6kI0jHcPesVlMnT//aHNo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
77 changes: 49 additions & 28 deletions internal/biz/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
goErr "errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
Expand All @@ -31,9 +30,9 @@ import (

type FileRepo interface {
// mysql
UpsertFile(ctx context.Context, file *api.Files) error
UpsertFile(ctx context.Context, file *api.Files) (bool, error)
DelFile(ctx context.Context, engine, bucket, key string) error
UpsertBucket(ctx context.Context, bucket *api.Buckets) error
UpsertBucket(ctx context.Context, bucket *api.Buckets) (bool, error)
DelBucket(ctx context.Context, bucketId string) error
GetFileById(ctx context.Context, fid string) (*api.Files, error)
List(ctx context.Context, page, pageSize int32, bucket, engine, owner string) ([]*api.Files, error)
Expand All @@ -50,7 +49,7 @@ type FileUsecase interface {
Version(ctx context.Context, bucket, key, authorId string) (string, error)
Download(ctx context.Context, in *api.DownloadRequest, authorId string) ([]byte, error)
Delete(ctx context.Context, in *api.DeleteRequest, authorId string) (*api.DeleteResponse, error)
MakeBucket(ctx context.Context, in *api.MakeBucketRequest,authorId string) (*api.MakeBucketResponse, error)
MakeBucket(ctx context.Context, in *api.MakeBucketRequest, authorId string) (*api.MakeBucketResponse, error)
List(ctx context.Context, in *api.ListFilesRequest, authorId string) ([]*api.Files, error)
GetFileByID(ctx context.Context, fileId string) (*api.Files, error)
}
Expand Down Expand Up @@ -131,14 +130,14 @@ func getSHA256(data []byte) string {
func (f *FileUsecaseImpl) commitFile(dir string, filename string, authorId string, authorEmail string) (commitId string, err error) {
repo, err := git.PlainInit(dir, false)
if err != nil && err != git.ErrRepositoryAlreadyExists {
return "", fmt.Errorf("version plain init err:%w",err)
return "", fmt.Errorf("version plain init err:%w", err)
}

// 如果仓库已存在,则打开它
if err == git.ErrRepositoryAlreadyExists {
repo, err = git.PlainOpen(dir)
if err != nil {
return "", fmt.Errorf("version plain open err:%w",err)
return "", fmt.Errorf("version plain open err:%w", err)
}
}

Expand All @@ -152,7 +151,7 @@ func (f *FileUsecaseImpl) commitFile(dir string, filename string, authorId strin
// 工作树
w, err := repo.Worktree()
if err != nil {
return "", fmt.Errorf("version work tree err:%w",err)
return "", fmt.Errorf("version work tree err:%w", err)
}
defer func() {
if p := recover(); p != nil {
Expand All @@ -166,15 +165,15 @@ func (f *FileUsecaseImpl) commitFile(dir string, filename string, authorId strin
// 添加文件到暂存区
_, err = w.Add(filename)
if err != nil {
return "", fmt.Errorf("add file err:%w",err)
return "", fmt.Errorf("add file err:%w", err)
}

// 创建提交
commit, err := w.Commit(fmt.Sprintf("Add %s", filename), &git.CommitOptions{
Author: author,
})
if err != nil && !goErr.Is(err, git.ErrEmptyCommit) {
return "", fmt.Errorf("commit file err:%w",err)
return "", fmt.Errorf("commit file err:%w", err)
}
// 空提交处理
if goErr.Is(err, git.ErrEmptyCommit) {
Expand All @@ -189,7 +188,7 @@ func (f *FileUsecaseImpl) commitFile(dir string, filename string, authorId strin
obj, err := repo.CommitObject(commit)

if err != nil {
return "", fmt.Errorf("get commit object err:%w",err)
return "", fmt.Errorf("get commit object err:%w", err)
}

return obj.ID().String(), nil
Expand All @@ -216,33 +215,36 @@ func (f *FileUsecaseImpl) checkIn(key string) (string, error) {
}
return key, nil
}
func (f *FileUsecaseImpl) MakeBucket(ctx context.Context, in *api.MakeBucketRequest,authorId string) (*api.MakeBucketResponse, error) {
func (f *FileUsecaseImpl) MakeBucket(ctx context.Context, in *api.MakeBucketRequest, authorId string) (*api.MakeBucketResponse, error) {

saveDir := filepath.Join(f.config.GetUploadDir(), in.Bucket)
if err := os.MkdirAll(saveDir, 0755); err != nil {
return nil, gosdk.NewError(err, int32(common.Code_INTERNAL_ERROR), codes.Internal, "make_bucket")
}
if in.EnableVersion{
_,_=os.Create(filepath.Join(saveDir, ".gitkeep"))
_,err:=f.commitFile(saveDir, ".gitkeep", authorId, "[email protected]")
if in.EnableVersion {
_, _ = os.Create(filepath.Join(saveDir, ".gitkeep"))
_, err := f.commitFile(saveDir, ".gitkeep", authorId, "[email protected]")
if err != nil {
return nil, gosdk.NewError(fmt.Errorf("init bucket version err:%w",err), int32(common.Code_INTERNAL_ERROR), codes.Internal, "commit_file")
return nil, gosdk.NewError(fmt.Errorf("init bucket version err:%w", err), int32(common.Code_INTERNAL_ERROR), codes.Internal, "commit_file")

}
}

return &api.MakeBucketResponse{}, nil
}
func (f *FileUsecaseImpl) checkBucket(bucket string) bool {
saveDir := filepath.Join(f.config.GetUploadDir(), bucket)
return pathExists(saveDir)
}
func (f *FileUsecaseImpl)isEnableVersion(bucket string)bool{
path:=filepath.Join(f.config.GetUploadDir(), bucket)
func (f *FileUsecaseImpl) isEnableVersion(bucket string) bool {
path := filepath.Join(f.config.GetUploadDir(), bucket)
repo, err := git.PlainOpen(path)
if err!=nil||repo==nil{
if err != nil || repo == nil {
return false
}
return true
}

// Upload uploads a file.
//
// The file is saved in the directory specified by the key.
Expand Down Expand Up @@ -315,15 +317,24 @@ func (f *FileUsecaseImpl) Upload(ctx context.Context, in *api.UploadFileRequest,
CreatedAt: timestamppb.Now(),
UpdatedAt: timestamppb.Now(),
}
err = f.repo.UpsertFile(ctx, fileObj)
updated, err := f.repo.UpsertFile(ctx, fileObj)
if err != nil {
return nil, err
}
log.Printf("上传文件:%s成功", filePath)
uid := fileObj.Uid
if updated {
existsObj, err := f.repo.GetFile(ctx, fileObj.Engine, fileObj.Bucket, fileObj.Key)
if err != nil {
return nil, gosdk.NewError(fmt.Errorf("get updated file error:%w",err), int32(common.Code_INTERNAL_ERROR), codes.Internal, "get_file")
}
if existsObj != nil {
uid = existsObj.Uid
}
}
return &api.UploadFileResponse{
Uri: uri,
Version: commitId,
Uid: fileObj.Uid,
Uid: uid,
}, err

}
Expand Down Expand Up @@ -465,7 +476,7 @@ func (f *FileUsecaseImpl) CompleteMultipartUploadFile(ctx context.Context, in *a
if err != nil {
return nil, err
}
if ok := f.checkBucket(in.Bucket); in.Bucket == "" || !ok && in.Engine==api.FileEngine_FILE_ENGINE_LOCAL.String() {
if ok := f.checkBucket(in.Bucket); in.Bucket == "" || !ok && in.Engine == api.FileEngine_FILE_ENGINE_LOCAL.String() {
return nil, gosdk.NewError(pkg.ErrBucketNotFound, int32(api.FileSvrStatus_FILE_INVALIDATE_BUCKET_ERR), codes.NotFound, "bucket_not_found")

}
Expand Down Expand Up @@ -507,7 +518,7 @@ func (f *FileUsecaseImpl) CompleteMultipartUploadFile(ctx context.Context, in *a

}
commit := ""
if f.isEnableVersion(in.Bucket)&& in.Engine==api.FileEngine_FILE_ENGINE_LOCAL.String() {
if f.isEnableVersion(in.Bucket) && in.Engine == api.FileEngine_FILE_ENGINE_LOCAL.String() {
commit, err = f.commitFile(saveDir, filename, authorId, "[email protected]")
if err != nil {
return nil, gosdk.NewError(err, int32(common.Code_INTERNAL_ERROR), codes.Internal, "commit_file")
Expand All @@ -527,17 +538,27 @@ func (f *FileUsecaseImpl) CompleteMultipartUploadFile(ctx context.Context, in *a
CreatedAt: timestamppb.Now(),
UpdatedAt: timestamppb.Now(),
}
log.Printf("insert %s,%s,%s,%s", fileObj.Uid, fileObj.Bucket, fileObj.Key, fileObj.Engine)
err = f.repo.UpsertFile(ctx, fileObj)
// log.Printf("insert %s,%s,%s,%s", fileObj.Uid, fileObj.Bucket, fileObj.Key, fileObj.Engine)
updated, err := f.repo.UpsertFile(ctx, fileObj)
if err != nil {
return nil, gosdk.NewError(err, int32(common.Code_INTERNAL_ERROR), codes.Internal, "upsert_file")
return nil, gosdk.NewError(fmt.Errorf("insert or update file err:%w",err), int32(common.Code_INTERNAL_ERROR), codes.Internal, "upsert_file")
}
uid := fileObj.Uid
if updated {
existsObj, err := f.repo.GetFile(ctx, fileObj.Engine, fileObj.Bucket, fileObj.Key)
if err != nil {
return nil, gosdk.NewError(fmt.Errorf("get updated file error:%w",err), int32(common.Code_INTERNAL_ERROR), codes.Internal, "get_file")
}
if existsObj != nil {
uid = existsObj.Uid
}
}
os.RemoveAll(filepath.Join(f.config.GetUploadDir(), in.UploadId))

return &api.CompleteMultipartUploadResponse{
Uri: uri,
Version: commit,
Uid: fileObj.Uid,
Uid: uid,
}, err
}

Expand Down
Loading

0 comments on commit 405cc74

Please sign in to comment.