Skip to content

Commit

Permalink
*: Remove pool's deprecated methods
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Aug 3, 2023
1 parent 93666d5 commit a19fd10
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 70 deletions.
3 changes: 0 additions & 3 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,9 @@ func (n *layer) Owner(ctx context.Context) user.ID {
}

func (n *layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) {
prm.PrivateKey = &n.gateKey.PrivateKey

if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
if bktOwner.Equals(bd.Gate.BearerToken.ResolveIssuer()) {
prm.BearerToken = bd.Gate.BearerToken
return
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions api/layer/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package layer

import (
"context"
"crypto/ecdsa"
"errors"
"io"
"time"
Expand Down Expand Up @@ -47,9 +46,6 @@ type PrmContainerCreate struct {
type PrmAuth struct {
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
BearerToken *bearer.Token

// Private key used for the operation if BearerToken is missing (in this case non-nil).
PrivateKey *ecdsa.PrivateKey
}

// PrmObjectRead groups parameters of NeoFS.ReadObject operation.
Expand Down
110 changes: 47 additions & 63 deletions internal/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,26 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/object/slicer"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/stat"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/waiter"
)

// NeoFS represents virtual connection to the NeoFS network.
// It is used to provide an interface to dependent packages
// which work with NeoFS.
type NeoFS struct {
pool *pool.Pool
await pool.WaitParams
gateSigner user.Signer
}

const (
defaultPollInterval = time.Second // overrides default value from pool
defaultPollTimeout = 120 * time.Second // same as default value from pool
)

// NewNeoFS creates new NeoFS using provided pool.Pool.
func NewNeoFS(p *pool.Pool, signer user.Signer) *NeoFS {
var await pool.WaitParams
await.SetPollInterval(defaultPollInterval)
await.SetTimeout(defaultPollTimeout)

return &NeoFS{
pool: p,
await: await,
gateSigner: signer,
}
}
Expand Down Expand Up @@ -93,7 +84,8 @@ func (x *NeoFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uin

// Container implements neofs.NeoFS interface method.
func (x *NeoFS) Container(ctx context.Context, idCnr cid.ID) (*container.Container, error) {
res, err := x.pool.GetContainer(ctx, idCnr)
var prm client.PrmContainerGet
res, err := x.pool.ContainerGet(ctx, idCnr, prm)
if err != nil {
return nil, fmt.Errorf("read container via connection pool: %w", err)
}
Expand Down Expand Up @@ -140,15 +132,15 @@ func (x *NeoFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreat
return cid.ID{}, fmt.Errorf("sync container with the network state: %w", err)
}

var prmPut pool.PrmContainerPut
prmPut.SetWaitParams(x.await)

var prmPut client.PrmContainerPut
if prm.SessionToken != nil {
prmPut.WithinSession(*prm.SessionToken)
}

putWaiter := waiter.NewContainerPutWaiter(x.pool, waiter.DefaultPollInterval)

// send request to save the container
idCnr, err := x.pool.PutContainer(ctx, cnr, x.gateSigner, prmPut)
idCnr, err := putWaiter.ContainerPut(ctx, cnr, x.gateSigner, prmPut)
if err != nil {
return cid.ID{}, fmt.Errorf("save container via connection pool: %w", err)
}
Expand All @@ -158,7 +150,8 @@ func (x *NeoFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreat

// UserContainers implements neofs.NeoFS interface method.
func (x *NeoFS) UserContainers(ctx context.Context, id user.ID) ([]cid.ID, error) {
r, err := x.pool.ListContainers(ctx, id)
var prm client.PrmContainerList
r, err := x.pool.ContainerList(ctx, id, prm)
if err != nil {
return nil, fmt.Errorf("list user containers via connection pool: %w", err)
}
Expand All @@ -168,14 +161,13 @@ func (x *NeoFS) UserContainers(ctx context.Context, id user.ID) ([]cid.ID, error

// SetContainerEACL implements neofs.NeoFS interface method.
func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table, sessionToken *session.Container, signer user.Signer) error {
var prm pool.PrmContainerSetEACL
prm.SetWaitParams(x.await)

var prm client.PrmContainerSetEACL
if sessionToken != nil {
prm.WithinSession(*sessionToken)
}

err := x.pool.SetEACL(ctx, table, signer, prm)
eaclWaiter := waiter.NewContainerSetEACLWaiter(x.pool, waiter.DefaultPollInterval)
err := eaclWaiter.ContainerSetEACL(ctx, table, signer, prm)
if err != nil {
return fmt.Errorf("save eACL via connection pool: %w", err)
}
Expand All @@ -185,7 +177,8 @@ func (x *NeoFS) SetContainerEACL(ctx context.Context, table eacl.Table, sessionT

// ContainerEACL implements neofs.NeoFS interface method.
func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, error) {
res, err := x.pool.GetEACL(ctx, id)
var prm client.PrmContainerEACL
res, err := x.pool.ContainerEACL(ctx, id, prm)
if err != nil {
return nil, fmt.Errorf("read eACL via connection pool: %w", err)
}
Expand All @@ -195,14 +188,13 @@ func (x *NeoFS) ContainerEACL(ctx context.Context, id cid.ID) (*eacl.Table, erro

// DeleteContainer implements neofs.NeoFS interface method.
func (x *NeoFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container, signer user.Signer) error {
var prm pool.PrmContainerDelete
prm.SetWaitParams(x.await)

var prm client.PrmContainerDelete
if token != nil {
prm.SetSessionToken(*token)
prm.WithinSession(*token)
}

err := x.pool.DeleteContainer(ctx, id, signer, prm)
deleteWaiter := waiter.NewContainerDeleteWaiter(x.pool, waiter.DefaultPollInterval)
err := deleteWaiter.ContainerDelete(ctx, id, signer, prm)
if err != nil {
return fmt.Errorf("delete container via connection pool: %w", err)
}
Expand Down Expand Up @@ -258,18 +250,14 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
}

var prmPut pool.PrmObjectPut
prmPut.SetHeader(*obj)
prmPut.SetPayload(prm.Payload)
prmPut.SetCopiesNumber(prm.CopiesNumber)
var opts slicer.Options
opts.SetCopiesNumber(prm.CopiesNumber)

if prm.BearerToken != nil {
prmPut.UseBearer(*prm.BearerToken)
} else if prm.PrivateKey != nil {
prmPut.UseSigner(user.NewSignerRFC6979(*prm.PrivateKey))
opts.SetBearerToken(*prm.BearerToken)
}

idObj, err := x.pool.PutObject(ctx, prmPut)
idObj, err := slicer.Put(ctx, x.pool, *obj, x.gateSigner, prm.Payload, opts)
if err != nil {
reason, ok := isErrAccessDenied(err)
if ok {
Expand Down Expand Up @@ -300,17 +288,15 @@ func (x payloadReader) Read(p []byte) (int, error) {

// ReadObject implements neofs.NeoFS interface method.
func (x *NeoFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
var prmGet pool.PrmObjectGet
var prmGet client.PrmObjectGet

if prm.BearerToken != nil {
prmGet.UseBearer(*prm.BearerToken)
} else if prm.PrivateKey != nil {
prmGet.UseSigner(user.NewSignerRFC6979(*prm.PrivateKey))
prmGet.WithBearerToken(*prm.BearerToken)
}

if prm.WithHeader {
if prm.WithPayload {
res, err := x.pool.GetObject(ctx, prm.Container, prm.Object, prmGet)
header, res, err := x.pool.ObjectGetInit(ctx, prm.Container, prm.Object, x.gateSigner, prmGet)
if err != nil {
if reason, ok := isErrAccessDenied(err); ok {
return nil, fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason)
Expand All @@ -319,29 +305,27 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer
return nil, fmt.Errorf("init full object reading via connection pool: %w", err)
}

defer res.Payload.Close()
defer res.Close()

payload, err := io.ReadAll(res.Payload)
payload, err := io.ReadAll(res)
if err != nil {
return nil, fmt.Errorf("read full object payload: %w", err)
}

res.Header.SetPayload(payload)
header.SetPayload(payload)

return &layer.ObjectPart{
Head: &res.Header,
Head: &header,
}, nil
}

var prmHead pool.PrmObjectHead
var prmHead client.PrmObjectHead

if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else if prm.PrivateKey != nil {
prmHead.UseSigner(user.NewSignerRFC6979(*prm.PrivateKey))
prmHead.WithBearerToken(*prm.BearerToken)
}

hdr, err := x.pool.HeadObject(ctx, prm.Container, prm.Object, prmHead)
hdrRes, err := x.pool.ObjectHead(ctx, prm.Container, prm.Object, x.gateSigner, prmHead)
if err != nil {
if reason, ok := isErrAccessDenied(err); ok {
return nil, fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason)
Expand All @@ -350,11 +334,16 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer
return nil, fmt.Errorf("read object header via connection pool: %w", err)
}

var hdr object.Object
if !hdrRes.ReadHeader(&hdr) {
return nil, errors.New("header is empty")
}

return &layer.ObjectPart{
Head: &hdr,
}, nil
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, prm.Container, prm.Object, prmGet)
_, res, err := x.pool.ObjectGetInit(ctx, prm.Container, prm.Object, x.gateSigner, prmGet)
if err != nil {
if reason, ok := isErrAccessDenied(err); ok {
return nil, fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason)
Expand All @@ -364,19 +353,17 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer
}

return &layer.ObjectPart{
Payload: res.Payload,
Payload: res,
}, nil
}

var prmRange pool.PrmObjectRange
var prmRange client.PrmObjectRange

if prm.BearerToken != nil {
prmRange.UseBearer(*prm.BearerToken)
} else if prm.PrivateKey != nil {
prmRange.UseSigner(user.NewSignerRFC6979(*prm.PrivateKey))
prmRange.WithBearerToken(*prm.BearerToken)
}

res, err := x.pool.ObjectRange(ctx, prm.Container, prm.Object, prm.PayloadRange[0], prm.PayloadRange[1], prmRange)
res, err := x.pool.ObjectRangeInit(ctx, prm.Container, prm.Object, prm.PayloadRange[0], prm.PayloadRange[1], x.gateSigner, prmRange)
if err != nil {
if reason, ok := isErrAccessDenied(err); ok {
return nil, fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason)
Expand All @@ -386,22 +373,19 @@ func (x *NeoFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer
}

return &layer.ObjectPart{
Payload: payloadReader{&res},
Payload: payloadReader{res},
}, nil
}

// DeleteObject implements neofs.NeoFS interface method.
func (x *NeoFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
var prmDelete pool.PrmObjectDelete
var prmDelete client.PrmObjectDelete

if prm.BearerToken != nil {
prmDelete.UseBearer(*prm.BearerToken)
}
if prm.PrivateKey != nil {
prmDelete.UseSigner(user.NewSignerRFC6979(*prm.PrivateKey))
prmDelete.WithBearerToken(*prm.BearerToken)
}

err := x.pool.DeleteObject(ctx, prm.Container, prm.Object, prmDelete)
_, err := x.pool.ObjectDelete(ctx, prm.Container, prm.Object, x.gateSigner, prmDelete)
if err != nil {
if reason, ok := isErrAccessDenied(err); ok {
return fmt.Errorf("%w: %s", layer.ErrAccessDenied, reason)
Expand Down

0 comments on commit a19fd10

Please sign in to comment.