From f43bf107471b1310de6d870a2cf16438dbeaad06 Mon Sep 17 00:00:00 2001 From: levimm Date: Thu, 19 Jul 2018 17:41:36 +0800 Subject: [PATCH] use kodo form upload for small content --- registry/storage/driver/kodo/kodo.go | 59 ++++++++++++++------------ registry/storage/driver/kodo/writer.go | 3 +- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/registry/storage/driver/kodo/kodo.go b/registry/storage/driver/kodo/kodo.go index 9118fa45b4a..abf3b93652a 100644 --- a/registry/storage/driver/kodo/kodo.go +++ b/registry/storage/driver/kodo/kodo.go @@ -118,13 +118,15 @@ func New(params *DriverParameters) (*Driver, error) { Zone: params.Zone, } - uploader := storage.NewResumeUploader(&cfg) + largeUploader := storage.NewResumeUploader(&cfg) + smallUploader := storage.NewFormUploader(&cfg) bucketManager := storage.NewBucketManager(mac, &cfg) d := &driver{ - params: params, - uploader: uploader, - bucketManager: bucketManager, + params: params, + largeFileUploader: largeUploader, + smallFileUploader: smallUploader, + bucketManager: bucketManager, } return &Driver{ @@ -137,9 +139,10 @@ func New(params *DriverParameters) (*Driver, error) { } type driver struct { - params *DriverParameters - uploader *storage.ResumeUploader - bucketManager *storage.BucketManager + params *DriverParameters + largeFileUploader *storage.ResumeUploader + smallFileUploader *storage.FormUploader + bucketManager *storage.BucketManager } // Name returns the human-readable "name" of the driver, useful in error @@ -230,21 +233,29 @@ func (d *driver) PutContent(ctx context.Context, path string, content []byte) er "path": path, "func": "PutContent", }) - writer, err := d.Writer(ctx, path, false) - if err != nil { - logger.Error("d.Writer:", err) - return err + kodoKey := d.kodoKey(path) + putPolicy := storage.PutPolicy{ + Scope: fmt.Sprintf("%s:%s", BUCKET, kodoKey), //覆盖上传 + Expires: 3600 * 3, //token过期时间 3小时 } - defer writer.Close() + upToken := putPolicy.UploadToken(mac) + + ret := storage.PutRet{} + //设置kodo的reqid,不知道为啥reqid的key需要设置为0 + //没有找到相关文档,我是直接从rpc的代码里找到的 + kodoCtx := context.WithValue(ctx, 0, ctx.Value("trace.id")) + + putExtra := storage.PutExtra{ + OnProgress: func(fsize, uploaded int64) { + logger.Infof("file size: %d, uploaded: %d", fsize, uploaded) + }, + } + err := d.smallFileUploader.Put(kodoCtx, &ret, upToken, kodoKey, bytes.NewReader(content), int64(len(content)), &putExtra) - _, err = io.Copy(writer, bytes.NewReader(content)) if err != nil { - writer.Cancel() - logger.Error("io.Copy:", err) - return err + logger.Error("PutFile:", err) } - logger.Info("ok") - return writer.Commit() + return err } // Writer returns a FileWriter which will store the content written to it @@ -479,7 +490,7 @@ func (d *driver) privateURL(ctx context.Context, path string) string { return privateAccessURL } -func (d *driver) upload(ctx context.Context, localFile, kodoKey string) error { +func (d *driver) upload(ctx context.Context, localFile string, fileSize int64, kodoKey string) (err error) { logger := ReqEntry(ctx).WithFields(logrus.Fields{ "kodoKey": kodoKey, "func": "upload", }) @@ -490,13 +501,6 @@ func (d *driver) upload(ctx context.Context, localFile, kodoKey string) error { } upToken := putPolicy.UploadToken(mac) - fileInfo, err := os.Stat(localFile) - if err != nil { - logger.Error("os.Stat:", err) - return err - } - - fileSize := fileInfo.Size() logger = logger.WithFields(logrus.Fields{ "fileSize": fileSize, "blockCount": storage.BlockCount(fileSize), @@ -512,7 +516,8 @@ func (d *driver) upload(ctx context.Context, localFile, kodoKey string) error { //设置kodo的reqid,不知道为啥reqid的key需要设置为0 //没有找到相关文档,我是直接从rpc的代码里找到的 kodoCtx := context.WithValue(ctx, 0, ctx.Value("trace.id")) - err = d.uploader.PutFile(kodoCtx, &ret, upToken, kodoKey, localFile, &putExtra) + err = d.largeFileUploader.PutFile(kodoCtx, &ret, upToken, kodoKey, localFile, &putExtra) + if err != nil { logger.Error("PutFile:", err) } diff --git a/registry/storage/driver/kodo/writer.go b/registry/storage/driver/kodo/writer.go index 09d7acdeeef..76440e6cea1 100644 --- a/registry/storage/driver/kodo/writer.go +++ b/registry/storage/driver/kodo/writer.go @@ -104,7 +104,8 @@ func (fw *fileWriter) Commit() error { //调用驱动上传文件 localFile := fw.file.Name() - if err := fw.driver.upload(fw.ctx, localFile, fw.kodoKey); err != nil { + fileSize := fw.Size() + if err := fw.driver.upload(fw.ctx, localFile, fileSize, fw.kodoKey); err != nil { return err }