Skip to content

Commit

Permalink
*: Add zero part and use it as multipart uploadID
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Jun 13, 2024
1 parent b6283bc commit c699af0
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 67 deletions.
2 changes: 0 additions & 2 deletions api/data/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ type PartInfo struct {
HomoHash []byte
// Elements contain [oid.ID] and size for each element for the current part.
Elements []LinkObjectPayload
// FirstSplitOID contains first object part in the split chain.
FirstSplitOID oid.ID
}

// ToHeaderString form short part representation to use in S3-Completed-Parts header.
Expand Down
14 changes: 6 additions & 8 deletions api/handler/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

"github.com/google/uuid"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
Expand Down Expand Up @@ -101,17 +100,14 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
return
}

uploadID := uuid.New()
additional := []zap.Field{
zap.String("uploadID", uploadID.String()),
zap.String("Key", reqInfo.ObjectName),
}

p := &layer.CreateMultipartParams{
Info: &layer.UploadInfoParams{
UploadID: uploadID.String(),
Bkt: bktInfo,
Key: reqInfo.ObjectName,
Bkt: bktInfo,
Key: reqInfo.ObjectName,
},
Data: &layer.UploadData{},
}
Expand Down Expand Up @@ -154,7 +150,8 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
return
}

if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil {
uploadID, err := h.obj.CreateMultipartUpload(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...)
return
}
Expand All @@ -166,9 +163,10 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
resp := InitiateMultipartUploadResponse{
Bucket: reqInfo.BucketName,
Key: reqInfo.ObjectName,
UploadID: uploadID.String(),
UploadID: uploadID,
}

additional = append(additional, zap.String("uploadID", uploadID))
if err = api.EncodeToResponse(w, resp); err != nil {
h.logAndSendError(w, "could not encode InitiateMultipartUploadResponse to response", reqInfo, err, additional...)
return
Expand Down
2 changes: 1 addition & 1 deletion api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ type (

DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject

CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) (string, error)
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error)
UploadPart(ctx context.Context, p *UploadPartParams) (string, error)
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
Expand Down
202 changes: 162 additions & 40 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type (
}
)

func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) (string, error) {
metaSize := len(p.Header)
if p.Data != nil {
metaSize += len(p.Data.ACLHeaders)
Expand All @@ -150,12 +150,11 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar

ownerPubKey, err := n.OwnerPublicKey(ctx)
if err != nil {
return fmt.Errorf("owner pub key: %w", err)
return "", fmt.Errorf("owner pub key: %w", err)
}

info := &data.MultipartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Owner: n.Owner(ctx),
OwnerPubKey: *ownerPubKey,
Created: TimeNow(ctx),
Expand All @@ -179,11 +178,27 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar

if p.Info.Encryption.Enabled() {
if err := addEncryptionHeaders(info.Meta, p.Info.Encryption); err != nil {
return fmt.Errorf("add encryption header: %w", err)
return "", fmt.Errorf("add encryption header: %w", err)
}
}

return n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info)
zeroPartInfo, err := n.uploadZeroPart(ctx, info, p.Info)
if err != nil {
return "", fmt.Errorf("upload zero part: %w", err)
}

info.UploadID = zeroPartInfo.UploadID

nodeID, err := n.treeService.CreateMultipartUpload(ctx, p.Info.Bkt, info)
if err != nil {
return "", fmt.Errorf("create multipart upload: %w", err)
}

if err = n.finalizeZeroPart(ctx, p.Info.Bkt, nodeID, zeroPartInfo); err != nil {
return "", fmt.Errorf("finalize zero part: %w", err)
}

return zeroPartInfo.UploadID, nil
}

func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
Expand Down Expand Up @@ -249,28 +264,28 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf

lastPart, err := n.treeService.GetPartByNumber(ctx, bktInfo, multipartInfo.ID, p.PartNumber-1)
if err != nil {
// if ErrPartListIsEmpty, there is the first part of multipart.
if !errors.Is(err, ErrPartListIsEmpty) {
return nil, fmt.Errorf("getLastPart: %w", err)
}
} else {
// try to restore hash state from the last part.
// the required interface is guaranteed according to the docs, so just cast without checks.
binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler)
if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil {
return nil, fmt.Errorf("unmarshal previous part hash: %w", err)
}

if tzHash != nil {
binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler)
if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil {
return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err)
}
return nil, fmt.Errorf("getLastPart: %w", err)
}

// try to restore hash state from the last part.
// the required interface is guaranteed according to the docs, so just cast without checks.
binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler)
if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil {
return nil, fmt.Errorf("unmarshal previous part hash: %w", err)
}

if tzHash != nil {
binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler)
if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil {
return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err)
}
}

isSetSplitPreviousID = true
splitPreviousID = lastPart.OID
splitFirstID = lastPart.FirstSplitOID
isSetSplitPreviousID = true
splitPreviousID = lastPart.OID

if err = splitFirstID.DecodeString(multipartInfo.UploadID); err != nil {
return nil, fmt.Errorf("failed to decode multipart upload ID: %w", err)
}

var (
Expand Down Expand Up @@ -298,10 +313,6 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
},
}

if lastPart != nil {
splitFirstID = lastPart.FirstSplitOID
}

chunk := n.buffers.Get().(*[]byte)
var totalBytes int
// slice part manually. Simultaneously considering the part is a single object for user.
Expand All @@ -326,10 +337,6 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
return nil, err
}

if splitFirstID.Equals(oid.ID{}) {
splitFirstID = id
}

isSetSplitPreviousID = true
splitPreviousID = id
elements = append(elements, data.LinkObjectPayload{OID: id, Size: uint32(nBts)})
Expand Down Expand Up @@ -367,10 +374,6 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Elements: elements,
}

if !splitFirstID.Equals(oid.ID{}) {
partInfo.FirstSplitOID = splitFirstID
}

// encoding hash.Hash state to save it in tree service.
// the required interface is guaranteed according to the docs, so just cast without checks.
binaryMarshaler := multipartHash.(encoding.BinaryMarshaler)
Expand Down Expand Up @@ -417,6 +420,114 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
return objInfo, nil
}

func (n *layer) uploadZeroPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadInfoParams) (*data.PartInfo, error) {
encInfo := FormEncryptionInfo(multipartInfo.Meta)
if err := p.Encryption.MatchObjectEncryption(encInfo); err != nil {
n.log.Warn("mismatched obj encryptionInfo", zap.Error(err))
return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters)
}

var (
bktInfo = p.Bkt
attributes [][2]string
multipartHash = sha256.New()
tzHash hash.Hash
id oid.ID
elements []data.LinkObjectPayload
creationTime = TimeNow(ctx)
currentPartHash = sha256.New()
)

if p.Encryption.Enabled() {
attributes = append(attributes, [2]string{AttributeDecryptedSize, "0"})
}

if n.neoFS.IsHomomorphicHashingEnabled() {
tzHash = tz.New()
}

objHashes := []hash.Hash{multipartHash, currentPartHash}
if tzHash != nil {
objHashes = append(objHashes, tzHash)
}

prm := PrmObjectCreate{
Container: bktInfo.CID,
Creator: bktInfo.Owner,
Attributes: attributes,
CreationTime: creationTime,
CopiesNumber: multipartInfo.CopiesNumber,
Multipart: &Multipart{
MultipartHashes: objHashes,
},
Payload: bytes.NewBuffer(nil),
}

id, _, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
}

elements = append(elements, data.LinkObjectPayload{OID: id, Size: 0})

reqInfo := api.GetReqInfo(ctx)
n.log.Debug("upload zero part",
zap.String("reqId", reqInfo.RequestID),
zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID),
zap.String("multipart upload", id.String()),
zap.Int("part number", 0), zap.String("object", p.Key), zap.Stringer("oid", id))

partInfo := &data.PartInfo{
Key: p.Key,
// UploadID equals zero part ID intentionally.
UploadID: id.String(),
Number: 0,
OID: id,
Size: 0,
ETag: hex.EncodeToString(currentPartHash.Sum(nil)),
Created: prm.CreationTime,
Elements: elements,
}

// encoding hash.Hash state to save it in tree service.
// the required interface is guaranteed according to the docs, so just cast without checks.
binaryMarshaler := multipartHash.(encoding.BinaryMarshaler)
partInfo.MultipartHash, err = binaryMarshaler.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("marshalBinary: %w", err)
}

if tzHash != nil {
binaryMarshaler = tzHash.(encoding.BinaryMarshaler)
partInfo.HomoHash, err = binaryMarshaler.MarshalBinary()

if err != nil {
return nil, fmt.Errorf("marshalBinary: %w", err)
}
}

return partInfo, nil
}

func (n *layer) finalizeZeroPart(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, partInfo *data.PartInfo) error {
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, nodeID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound {
return err
}

if !oldPartIDNotFound {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.log.Error("couldn't delete old part object", zap.Error(err),
zap.String("cnrID", bktInfo.CID.EncodeToString()),
zap.String("bucket name", bktInfo.Name),
zap.String("objID", oldPartID.EncodeToString()))
}
}

return nil
}

func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadPartParams, partID int, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
parts, err := n.treeService.GetPartsAfter(ctx, bktInfo, multipartInfo.ID, partID)
if err != nil {
Expand Down Expand Up @@ -535,8 +646,21 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var encMultipartObjectSize uint64
var lastPartID int
var completedPartsHeader strings.Builder
var splitFirstID oid.ID

if err = splitFirstID.DecodeString(multipartInfo.UploadID); err != nil {
return nil, nil, fmt.Errorf("decode splitFirstID from UploadID :%w", err)
}

// +1 is the zero part, it equals to the uploadID.
// +1 is the last part, it will be created later in the code.
var measuredObjects = make([]object.MeasuredObject, 0, len(p.Parts)+1)
var measuredObjects = make([]object.MeasuredObject, 0, len(p.Parts)+2)

// user know nothing about zero part, we have to add this part manually.
var zeroObject object.MeasuredObject
zeroObject.SetObjectID(splitFirstID)
measuredObjects = append(measuredObjects, zeroObject)

for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber]
if partInfo == nil || part.ETag != partInfo.ETag {
Expand Down Expand Up @@ -580,15 +704,13 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
multipartHash := sha256.New()
var homoHash hash.Hash
var splitPreviousID oid.ID
var splitFirstID oid.ID

if lastPartID > 0 {
lastPart := partsInfo[lastPartID]

if lastPart != nil {
if len(lastPart.MultipartHash) > 0 {
splitPreviousID = lastPart.OID
splitFirstID = lastPart.FirstSplitOID

if len(lastPart.MultipartHash) > 0 {
binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler)
Expand Down
6 changes: 3 additions & 3 deletions api/layer/tree_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,13 @@ func (t *TreeServiceMock) GetAllVersionsByPrefix(_ context.Context, bktInfo *dat
return result, nil
}

func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error {
func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) (uint64, error) {
cnrMultipartsMap, ok := t.multiparts[bktInfo.CID.EncodeToString()]
if !ok {
t.multiparts[bktInfo.CID.EncodeToString()] = map[string][]*data.MultipartInfo{
info.Key: {info},
}
return nil
return 0, nil
}

multiparts := cnrMultipartsMap[info.Key]
Expand All @@ -291,7 +291,7 @@ func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, bktInfo *data
}
cnrMultipartsMap[info.Key] = append(multiparts, info)

return nil
return info.ID, nil
}

func (t *TreeServiceMock) GetMultipartUploadsByPrefix(_ context.Context, _ *data.BucketInfo, _ string) ([]*data.MultipartInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion api/layer/tree_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type TreeService interface {
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
GetLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) (*data.LockInfo, error)

CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) error
CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, info *data.MultipartInfo) (uint64, error)
DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error
GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
Expand Down
Loading

0 comments on commit c699af0

Please sign in to comment.