Skip to content

Commit

Permalink
feat(fs/qingstor): Added gbk encoding support (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
abyss-w authored Aug 2, 2024
1 parent 4f96083 commit a32bdfc
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 18 deletions.
8 changes: 8 additions & 0 deletions constants/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ const (
TaskIgnoreExistingMD5Sum = "md5sum"
)

// Constants for task encoding config.
const (
GBK = "gbk"
HZGB2312 = "gb2312"
Big5 = "big5"
Windows1252 = "cp1252"
)

// Constants for object types.
const (
ObjectTypeDirectory = "directory"
Expand Down
15 changes: 12 additions & 3 deletions endpoint/fs/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ func (c *Client) Name(ctx context.Context) (name string) {

// Read implement source.Read
func (c *Client) Read(ctx context.Context, p string, _ bool) (r io.Reader, err error) {
cp := filepath.Join(c.AbsPath, p)
cp, err := c.Encode(filepath.Join(c.AbsPath, p))
if err != nil {
return
}

r, err = os.Open(cp)
if err != nil {
Expand All @@ -29,7 +32,10 @@ func (c *Client) Read(ctx context.Context, p string, _ bool) (r io.Reader, err e
func (c *Client) ReadRange(
ctx context.Context, p string, offset, size int64,
) (r io.Reader, err error) {
cp := filepath.Join(c.AbsPath, p)
cp, err := c.Encode(filepath.Join(c.AbsPath, p))
if err != nil {
return
}

f, err := os.Open(cp)
if err != nil {
Expand All @@ -42,7 +48,10 @@ func (c *Client) ReadRange(

// Stat implement source.Stat and destination.Stat
func (c *Client) Stat(ctx context.Context, p string, _ bool) (o *model.SingleObject, err error) {
cp := filepath.Join(c.AbsPath, p)
cp, err := c.Encode(filepath.Join(c.AbsPath, p))
if err != nil {
return
}

fi, err := os.Stat(cp)
if err != nil {
Expand Down
67 changes: 66 additions & 1 deletion endpoint/fs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ package fs

import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/traditionalchinese"
"golang.org/x/text/transform"
"io/ioutil"
"path/filepath"
"strings"

"gopkg.in/yaml.v2"

Expand All @@ -19,7 +28,23 @@ type Client struct {

// Options is the struct for fs options
type Options struct {
EnableLinkFollow bool `yaml:"enable_link_follow"`
EnableLinkFollow bool `yaml:"enable_link_follow"`
Encoding string `yaml:"encoding"`
}

func (o *Options) Check() error {
switch o.Encoding {
case "":
case constants.GBK:
case constants.HZGB2312:
case constants.Big5:
case constants.Windows1252:
default:
logrus.Errorf("%s is not a valid value for task encoding", o.Encoding)
return constants.ErrTaskInvalid
}

return nil
}

// New will create a Fs.
Expand Down Expand Up @@ -53,6 +78,46 @@ func New(ctx context.Context, et uint8) (c *Client, err error) {
return
}

err = opt.Check()
if err != nil {
return
}

c.Options = opt
return
}

func (c *Client) Encode(key string) (string, error) {
if c.Options.Encoding != "" {
utf8, err := encode(key, c.Options.Encoding)
if err != nil {
return "", err
}
return utf8, nil
}

return key, nil
}

func encode(input, encodingName string) (string, error) {
var enc encoding.Encoding
switch strings.ToLower(encodingName) {
case constants.GBK:
enc = simplifiedchinese.GBK
case constants.HZGB2312:
enc = simplifiedchinese.HZGB2312
case constants.Big5:
enc = traditionalchinese.Big5
case constants.Windows1252:
enc = charmap.Windows1252
default:
return "", fmt.Errorf("unsupported encoding: %s", encodingName)
}

reader := transform.NewReader(strings.NewReader(input), enc.NewEncoder())
resBytes, err := ioutil.ReadAll(reader)
if err != nil {
return "", err
}
return string(resBytes), nil
}
5 changes: 4 additions & 1 deletion endpoint/fs/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func (c *Client) Delete(ctx context.Context, p string) (err error) {

// Write implement destination.Write
func (c *Client) Write(ctx context.Context, p string, _ int64, r io.Reader, _ bool, _ map[string]string) (err error) {
cp := filepath.Join(c.AbsPath, p)
cp, err := c.Encode(filepath.Join(c.AbsPath, p))
if err != nil {
return
}

_, err = os.Stat(filepath.Dir(cp))
if os.IsNotExist(err) {
Expand Down
15 changes: 12 additions & 3 deletions endpoint/qingstor/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ func (c *Client) Read(ctx context.Context, p string, isDir bool) (r io.Reader, e
if isDir {
return nil, nil
}
cp := utils.Join(c.Path, p)
cp, err := c.Decode(utils.Join(c.Path, p))
if err != nil {
return
}

resp, err := c.client.GetObject(cp, nil)
if err != nil {
Expand All @@ -39,7 +42,10 @@ func (c *Client) Read(ctx context.Context, p string, isDir bool) (r io.Reader, e
func (c *Client) ReadRange(
ctx context.Context, p string, offset, size int64,
) (r io.Reader, err error) {
cp := utils.Join(c.Path, p)
cp, err := c.Decode(utils.Join(c.Path, p))
if err != nil {
return
}

resp, err := c.client.GetObject(cp, &service.GetObjectInput{
Range: convert.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
Expand All @@ -54,7 +60,10 @@ func (c *Client) ReadRange(

// Stat implement source.Stat and destination.Stat
func (c *Client) Stat(ctx context.Context, p string, isDir bool) (o *model.SingleObject, err error) {
cp := utils.Join(c.Path, p)
cp, err := c.Decode(utils.Join(c.Path, p))
if err != nil {
return
}
if isDir {
cp += "/"
}
Expand Down
64 changes: 64 additions & 0 deletions endpoint/qingstor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@ package qingstor

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/qingstor/qingstor-sdk-go/v4/config"
"github.com/qingstor/qingstor-sdk-go/v4/service"
"github.com/sirupsen/logrus"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/charmap"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/traditionalchinese"
"golang.org/x/text/transform"
"gopkg.in/yaml.v2"

"github.com/yunify/qscamel/constants"
Expand All @@ -29,6 +37,7 @@ type Client struct {
StorageClass string `yaml:"storage_class"`
DisableURICleaning bool `yaml:"disable_uri_cleaning"`
EnableVirtualStyle bool `yaml:"enable_virtual_style"`
Decoding string `yaml:"decoding" msgpack:"d"`

// Whether to migrate custom metadata
UserDefineMeta bool `yaml:"user_define_meta"`
Expand All @@ -40,6 +49,21 @@ type Client struct {
client *service.Bucket
}

func (c *Client) Check() error {
switch c.Decoding {
case "":
case constants.GBK:
case constants.HZGB2312:
case constants.Big5:
case constants.Windows1252:
default:
logrus.Errorf("%s is not a valid value for qingstor decoding", c.Decoding)
return constants.ErrTaskInvalid
}

return nil
}

type TimeoutConfig struct {
ConnectTimeout int64 `yaml:"connect_timeout"`
ReadTimeout int64 `yaml:"read_timeout"`
Expand Down Expand Up @@ -69,6 +93,11 @@ func New(ctx context.Context, et uint8, hc *http.Client) (c *Client, err error)
return
}

err = c.Check()
if err != nil {
return
}

// Set protocol.
if c.Protocol == "" {
c.Protocol = "https"
Expand Down Expand Up @@ -168,3 +197,38 @@ func New(ctx context.Context, et uint8, hc *http.Client) (c *Client, err error)
c.client, _ = qs.Bucket(c.BucketName, c.Zone)
return
}

func (c *Client) Decode(key string) (string, error) {
if c.Decoding != "" {
utf8, err := decode(key, c.Decoding)
if err != nil {
return "", err
}
return utf8, nil
}

return key, nil
}

func decode(input, decodingName string) (string, error) {
var enc encoding.Encoding
switch strings.ToLower(decodingName) {
case constants.GBK:
enc = simplifiedchinese.GBK
case constants.HZGB2312:
enc = simplifiedchinese.HZGB2312
case constants.Big5:
enc = traditionalchinese.Big5
case constants.Windows1252:
enc = charmap.Windows1252
default:
return "", fmt.Errorf("unsupported decoding: %s", decodingName)
}

reader := transform.NewReader(strings.NewReader(input), enc.NewDecoder())
utf8Bytes, err := ioutil.ReadAll(reader)
if err != nil {
return "", err
}
return string(utf8Bytes), nil
}
25 changes: 20 additions & 5 deletions endpoint/qingstor/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func (c *Client) Delete(ctx context.Context, p string) (err error) {

// Write implement destination.Write
func (c *Client) Write(ctx context.Context, p string, size int64, r io.Reader, isDir bool, meta map[string]string) (err error) {
cp := utils.Join(c.Path, p)
cp, err := c.Decode(utils.Join(c.Path, p))
if err != nil {
return
}
var input *service.PutObjectInput
if isDir {
cp += "/"
Expand Down Expand Up @@ -108,7 +111,10 @@ func (c *Client) Partable() bool {

// InitPart implement destination.InitPart
func (c *Client) InitPart(ctx context.Context, p string, size int64, meta map[string]string) (uploadID string, partSize int64, partNumbers int, err error) {
cp := utils.Join(c.Path, p)
cp, err := c.Decode(utils.Join(c.Path, p))
if err != nil {
return
}

input := &service.InitiateMultipartUploadInput{
XQSStorageClass: convert.String(c.StorageClass),
Expand Down Expand Up @@ -153,7 +159,10 @@ func (c *Client) InitPart(ctx context.Context, p string, size int64, meta map[st

// UploadPart implement destination.UploadPart
func (c *Client) UploadPart(ctx context.Context, o *model.PartialObject, r io.Reader) (err error) {
cp := utils.Join(c.Path, o.Key)
cp, err := c.Decode(utils.Join(c.Path, o.Key))
if err != nil {
return
}

_, err = c.client.UploadMultipart(cp, &service.UploadMultipartInput{
// wrap by limitReader to keep body consistent with size
Expand All @@ -172,7 +181,10 @@ func (c *Client) UploadPart(ctx context.Context, o *model.PartialObject, r io.Re
}

func (c *Client) CompleteParts(ctx context.Context, path string, uploadId string, totalNumber int) (err error) {
cp := utils.Join(c.Path, path)
cp, err := c.Decode(utils.Join(c.Path, path))
if err != nil {
return
}

logrus.Infof("Object %s start completing part", path)

Expand All @@ -196,7 +208,10 @@ func (c *Client) CompleteParts(ctx context.Context, path string, uploadId string
}

func (c *Client) AbortUploads(ctx context.Context, path string, uploadId string) (err error) {
cp := utils.Join(c.Path, path)
cp, err := c.Decode(utils.Join(c.Path, path))
if err != nil {
return
}

logrus.Infof("Object %s start abort part", path)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/upyun/go-sdk v2.1.0+incompatible
github.com/vmihailenco/msgpack v3.3.3+incompatible
go.uber.org/ratelimit v0.2.0
golang.org/x/text v0.3.3
google.golang.org/api v0.32.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 1 addition & 1 deletion migrate/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"github.com/sirupsen/logrus"
"io"
"strings"
"sync"
"time"

"github.com/sirupsen/logrus"
"github.com/yunify/qscamel/constants"
"github.com/yunify/qscamel/endpoint"
"github.com/yunify/qscamel/model"
Expand Down
6 changes: 2 additions & 4 deletions model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ type Task struct {
CheckMD5 bool `yaml:"check_md5" msgpack:"cm"`
IgnoreExisting string `yaml:"ignore_existing" msgpack:"ie"`
MultipartBoundarySize int64 `yaml:"multipart_boundary_size" msgpack:"mbs"`
// Format: 2006-01-02 15:04:05
IgnoreBefore string `yaml:"ignore_before" msgpack:"ib"`
IgnoreBefore string `yaml:"ignore_before" msgpack:"ib"` // Format: 2006-01-02 15:04:05
IgnoreBeforeTimestamp int64 `yaml:"-" msgpack:"ibt"`
RateLimit int `yaml:"rate_limit" msgpack:"rl"`
// The number of workers for multipart uploads, default 100.
Workers int `yaml:"workers" msgpack:"wk"`
Workers int `yaml:"workers" msgpack:"wk"` // The number of workers for multipart uploads, default 100.

// Statistical Information
SuccessCount int64 `yaml:"-" msgpack:"sc"`
Expand Down

0 comments on commit a32bdfc

Please sign in to comment.