Skip to content

Commit

Permalink
rpc: Support synchronous writing to the wire
Browse files Browse the repository at this point in the history
Add `client.SyncWrite` option constructor that makes message
transmission methods to wait for the message to be completely written
out to the underlying connection.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Nov 23, 2023
1 parent 4a3b626 commit 3118b92
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 92 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ require (
// This version uses broken NeoFS API with incompatible signature
// definitions. See fix in https://github.com/nspcc-dev/neofs-api/pull/203
retract v2.12.0

replace google.golang.org/grpc => github.com/cthulhu-rider/grpc-go v0.0.0-20231123095204-ced09a8c28ff
87 changes: 2 additions & 85 deletions go.sum

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions rpc/client/call_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type callParameters struct {
ctx context.Context

allowBinarySendingOnly bool

syncWrite bool
}

func defaultCallParameters() *callParameters {
Expand Down Expand Up @@ -41,3 +43,12 @@ func AllowBinarySendingOnly() CallOption {
prm.allowBinarySendingOnly = true
}
}

// SyncWrite makes each [MessageWriter.WriteMessage] call to wait for message to
// be completely written out to the underlying client network connection. By
// default, the method may return before writing to the wire.
func SyncWrite() CallOption {
return func(prm *callParameters) {
prm.syncWrite = true
}
}
12 changes: 7 additions & 5 deletions rpc/client/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ func (c *Client) initGRPC(info common.CallMethodInfo, prm *callParameters) (Mess
return nil, err
}

var grpcCallOpts []grpc.CallOption
ctxCallOpt := grpc.WithContext(prm.ctx)
grpcCallOpts := make([]grpc.CallOption, 1, 3)
grpcCallOpts[0] = grpc.WithContext(prm.ctx)

if prm.allowBinarySendingOnly {
grpcCallOpts = []grpc.CallOption{ctxCallOpt, grpc.AllowBinarySendingOnly()}
} else {
grpcCallOpts = []grpc.CallOption{ctxCallOpt}
grpcCallOpts = append(grpcCallOpts, grpc.AllowBinarySendingOnly())
}

if prm.syncWrite {
grpcCallOpts = append(grpcCallOpts, grpc.SyncWrite())
}

rw, err := c.gRPCClient.Init(info, grpcCallOpts...)
Expand Down
11 changes: 11 additions & 0 deletions rpc/grpc/call_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type callParameters struct {
ctx context.Context

allowBinarySendingOnly bool

syncWrite bool
}

func defaultCallParameters() *callParameters {
Expand All @@ -36,3 +38,12 @@ func AllowBinarySendingOnly() CallOption {
prm.allowBinarySendingOnly = true
}
}

// SyncWrite makes each [MessageReadWriter.WriteMessage] call to wait for
// message to be completely written out to the underlying client network
// connection. By default, the method may return before writing to the wire.
func SyncWrite() CallOption {
return func(prm *callParameters) {
prm.syncWrite = true
}
}
9 changes: 7 additions & 2 deletions rpc/grpc/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ func (c *Client) Init(info common.CallMethodInfo, opts ...CallOption) (MessageRe
opt(prm)
}

var grpcCallOpts []grpc.CallOption
grpcCallOpts := make([]grpc.CallOption, 0, 2)

if prm.allowBinarySendingOnly {
grpcCallOpts = []grpc.CallOption{grpc.ForceCodec(onlyBinarySendingCodec{})}
grpcCallOpts = append(grpcCallOpts, grpc.ForceCodec(onlyBinarySendingCodec{}))
}

if prm.syncWrite {
grpcCallOpts = append(grpcCallOpts, grpc.SyncSend())
}

ctx, cancel := context.WithCancel(prm.ctx)
Expand Down

0 comments on commit 3118b92

Please sign in to comment.