Skip to content

Commit

Permalink
rpc: Support binary request messages
Browse files Browse the repository at this point in the history
This feature will be useful for possible optimizations of protocol
communication.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jan 24, 2024
1 parent e248b50 commit b87efd1
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 2 deletions.
14 changes: 14 additions & 0 deletions rpc/client/call_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type CallOption func(*callParameters)

type callParameters struct {
ctx context.Context

allowBinarySendingOnly bool
}

func defaultCallParameters() *callParameters {
Expand All @@ -27,3 +29,15 @@ func WithContext(ctx context.Context) CallOption {
prm.ctx = ctx
}
}

// AllowBinarySendingOnly allows only [MessageWriter.WriteMessage] method's
// arguments that are convertible to binary gRPC messages ([]byte). For example,
// [BinaryMessage] may be used for such write. By default, only arguments
// convertible to [proto.Message] may be used. Use this option when binary
// message transmission is needed. Note that only [proto.Message] convertible
// response messages are supported even with this option.
func AllowBinarySendingOnly() CallOption {
return func(prm *callParameters) {
prm.allowBinarySendingOnly = true

Check warning on line 41 in rpc/client/call_options.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/call_options.go#L39-L41

Added lines #L39 - L41 were not covered by tests
}
}
30 changes: 29 additions & 1 deletion rpc/client/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"fmt"
"io"

"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
Expand Down Expand Up @@ -67,12 +68,39 @@ func (g rwGRPC) WriteMessage(m message.Message) error {
return g.MessageReadWriter.WriteMessage(m.ToGRPCMessage())
}

// BinaryMessage represents binary [message.Message] that can be used with
// [AllowBinarySendingOnly] option.
type BinaryMessage []byte

func (x BinaryMessage) ToGRPCMessage() grpc.Message {
return []byte(x)

Check warning on line 76 in rpc/client/init.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/init.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

func (x BinaryMessage) FromGRPCMessage(m grpc.Message) error {
bMsg, ok := m.([]byte)
if ok {
copy(x, bMsg)
return nil

Check warning on line 83 in rpc/client/init.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/init.go#L79-L83

Added lines #L79 - L83 were not covered by tests
}

return fmt.Errorf("message is not of type %T", bMsg)

Check warning on line 86 in rpc/client/init.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/init.go#L86

Added line #L86 was not covered by tests
}

func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (MessageReadWriter, error) {
if err := c.createGRPCClient(prm.ctx); err != nil {
return nil, err
}

rw, err := c.gRPCClient.Init(info, grpc.WithContext(prm.ctx))
var grpcCallOpts []grpc.CallOption
ctxCallOpt := grpc.WithContext(prm.ctx)

Check warning on line 95 in rpc/client/init.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/init.go#L94-L95

Added lines #L94 - L95 were not covered by tests

if prm.allowBinarySendingOnly {
grpcCallOpts = []grpc.CallOption{ctxCallOpt, grpc.AllowBinarySendingOnly()}
} else {
grpcCallOpts = []grpc.CallOption{ctxCallOpt}

Check warning on line 100 in rpc/client/init.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/init.go#L97-L100

Added lines #L97 - L100 were not covered by tests
}

rw, err := c.gRPCClient.Init(info, grpcCallOpts...)

Check warning on line 103 in rpc/client/init.go

View check run for this annotation

Codecov / codecov/patch

rpc/client/init.go#L103

Added line #L103 was not covered by tests
if err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions rpc/grpc/call_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type CallOption func(*callParameters)

type callParameters struct {
ctx context.Context

allowBinarySendingOnly bool
}

func defaultCallParameters() *callParameters {
Expand All @@ -23,3 +25,14 @@ func WithContext(ctx context.Context) CallOption {
prm.ctx = ctx
}
}

// AllowBinarySendingOnly allows to pass []byte argument only to
// [MessageReadWriter.WriteMessage] method. By default, only [proto.Message]
// instances may be used. Use this option when binary message transmission is
// needed. Note that only [proto.Message] response messages are supported even
// with this option.
func AllowBinarySendingOnly() CallOption {
return func(prm *callParameters) {
prm.allowBinarySendingOnly = true
}
}
30 changes: 29 additions & 1 deletion rpc/grpc/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package grpc

import (
"context"
"fmt"
"io"
"time"

"github.com/nspcc-dev/neofs-api-go/v2/rpc/common"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
)

// Message represents raw gRPC message.
Expand Down Expand Up @@ -69,6 +72,26 @@ func (w *streamWrapper) withTimeout(closure func() error) error {
}
}

type onlyBinarySendingCodec struct{}

func (x onlyBinarySendingCodec) Name() string {
// may be any non-empty, conflicts are unlikely to arise
return "neofs_binary_sender"
}

func (x onlyBinarySendingCodec) Marshal(msg interface{}) ([]byte, error) {
bMsg, ok := msg.([]byte)
if ok {
return bMsg, nil
}

return nil, fmt.Errorf("message is not of type %T", bMsg)
}

func (x onlyBinarySendingCodec) Unmarshal(raw []byte, msg interface{}) error {
return encoding.GetCodec(proto.Name).Unmarshal(raw, msg)
}

// Init initiates a messaging session within the RPC configured by options.
func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageReadWriter, error) {
prm := defaultCallParameters()
Expand All @@ -77,12 +100,17 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
opt(prm)
}

var grpcCallOpts []grpc.CallOption
if prm.allowBinarySendingOnly {
grpcCallOpts = []grpc.CallOption{grpc.ForceCodec(onlyBinarySendingCodec{})}
}

ctx, cancel := context.WithCancel(prm.ctx)
stream, err := c.con.NewStream(ctx, &grpc.StreamDesc{
StreamName: info.Name,
ServerStreams: info.ServerStream(),
ClientStreams: info.ClientStream(),
}, toMethodName(info))
}, toMethodName(info), grpcCallOpts...)
if err != nil {
cancel()
return nil, err
Expand Down

0 comments on commit b87efd1

Please sign in to comment.