Skip to content

Commit

Permalink
Merge pull request #97 from singchia/feat/adjust-server-delegate
Browse files Browse the repository at this point in the history
dialouge: add WriteWait for block write back ack message
  • Loading branch information
singchia authored May 26, 2024
2 parents ca1dca0 + a64d167 commit d8792fb
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 21 deletions.
30 changes: 29 additions & 1 deletion application/mock/dialogue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newStream(end *End, cn conn.Conn, dg multiplexer.Dialogue, opts *opts) *str
failedCh: make(chan packet.Packet),
dlReadChList: list.New(),
dlWriteChList: list.New(),
writeInCh: make(chan packet.Packet),
writeInCh: make(chan packet.Packet, 32),
closeCh: make(chan struct{}),
}
go sm.handlePkt()
Expand Down Expand Up @@ -237,7 +237,8 @@ func (sm *stream) handleInMessagePacket(pkt *packet.MessagePacket) iodefine.IORe
select {
case sm.messageCh <- pkt:
default:
// TODO ack error back
pkt := sm.pf.NewMessageAckPacketWithSessionID(sm.dg.DialogueID(), pkt.ID(), iodefine.ErrIOBufferFull)
sm.dg.WriteWait(pkt)
return iodefine.IODiscard
}
return iodefine.IOSuccess
Expand Down Expand Up @@ -331,7 +332,7 @@ func (sm *stream) handleInRequestPacket(pkt *packet.RequestPacket) iodefine.IORe
// no rpc found, return to call error, note that this error is not set to response error
err := fmt.Errorf("no such rpc: %s", method)
rspPkt := sm.pf.NewResponsePacket(pkt.ID(), []byte(method), nil, err)
err = sm.dg.Write(rspPkt)
err = sm.dg.WriteWait(rspPkt)
if err != nil {
sm.log.Debugf("write no such rpc response packet err: %s, clientID: %d, dialogueID: %d, packetID: %d, packetType: %s, method: %s",
err, sm.cn.ClientID(), sm.dg.DialogueID(), pkt.ID(), pkt.Type().String(), method)
Expand Down Expand Up @@ -439,11 +440,8 @@ func (sm *stream) handleOutMessagePacket(pkt *packet.MessagePacket) iodefine.IOR
}

func (sm *stream) handleOutMessageAckPacket(pkt *packet.MessageAckPacket) iodefine.IORet {
err := sm.dg.Write(pkt)
err := sm.dg.WriteWait(pkt)
if err != nil {
if err == io.ErrShortBuffer {
return iodefine.IODiscard
}
sm.log.Debugf("write message ack packet err: %s, clientID: %d, dialogueID: %d, packetID: %d, packetType: %s",
err, sm.cn.ClientID(), sm.dg.DialogueID(), pkt.ID(), pkt.Type().String())
return iodefine.IOErr
Expand Down Expand Up @@ -507,7 +505,7 @@ func (sm *stream) doRPC(pkt *packet.RequestPacket, rpc methodRPC, method string,
sm.rpcMtx.Unlock()

rspPkt := sm.pf.NewResponsePacket(pkt.ID(), []byte(req.method), rsp.data, rsp.err)
err := sm.dg.Write(rspPkt)
err := sm.dg.WriteWait(rspPkt)
if err != nil {
// Write error, the response cannot be delivered, so should be debuged
sm.log.Debugf("write response packet err: %s, clientID: %d, dialogueID: %d, packetID: %d, packetType: %s, method: %s",
Expand Down
11 changes: 6 additions & 5 deletions client/end_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

type RetryEnd struct {
opts *RetryEndOptions
opts *EndOptions
*delegate.UnimplementedDelegate

end unsafe.Pointer
Expand All @@ -36,10 +36,11 @@ type RetryEnd struct {
hijackRPC geminio.HijackRPC
}

func NewRetryEndWithDialer(dialer Dialer, opts ...*RetryEndOptions) (geminio.End, error) {
// RetryEnd with retry connection infinity
func NewRetryEndWithDialer(dialer Dialer, opts ...*EndOptions) (geminio.End, error) {
// options
eo := MergeRetryEndOptions(opts...)
initRetryEndOptions(eo)
eo := MergeEndOptions(opts...)
initEndOptions(eo)
ok := int32(1)
re := &RetryEnd{
opts: eo,
Expand Down Expand Up @@ -72,7 +73,7 @@ ERR:
}

func (re *RetryEnd) getEnd() (*clientEnd, error) {
end, err := NewEndWithDialer(re.dialer, re.opts.EndOptions)
end, err := NewEndWithDialer(re.dialer, re.opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions conn/conn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,9 @@ func OptionClientConnBufferSize(read, write int) ClientConnOption {
return func(cc *ClientConn) error {
if read != -1 {
cc.readOutSize = read
cc.readInSize = read
}
if write != -1 {
cc.writeInSize = write
cc.writeOutSize = write
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion examples/messager/client/messager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
opt := client.NewRetryEndOptions()
opt := client.NewEndOptions()
opt.SetLog(log)
end, err := client.NewRetryEndWithDialer(dialer, opt)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion examples/mq/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
fc := &FakeClient{
UnimplementedDelegate: &delegate.UnimplementedDelegate{},
}
opt := client.NewRetryEndOptions()
opt := client.NewEndOptions()
opt.SetLog(log)
opt.SetWaitRemoteRPCs("claim")
opt.SetDelegate(fc)
Expand Down
2 changes: 1 addition & 1 deletion examples/mq/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
fc := &FakeClient{
UnimplementedDelegate: &delegate.UnimplementedDelegate{},
}
opt := client.NewRetryEndOptions()
opt := client.NewEndOptions()
opt.SetLog(glog)
opt.SetWaitRemoteRPCs("claim")
opt.SetDelegate(fc)
Expand Down
14 changes: 12 additions & 2 deletions multiplexer/dialogue.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ func OptionDialogueNegotiatingID(negotiatingID uint64, dialogueIDPeersCall bool)
func OptionDialogueBufferSize(read, write int) DialogueOption {
return func(dg *dialogue) {
if read != -1 {
dg.readInSize = read
dg.readOutSize = read
}
if write != -1 {
dg.writeInSize = write
dg.writeOutSize = write
}
}
}
Expand Down Expand Up @@ -222,6 +220,18 @@ func (dg *dialogue) Write(pkt packet.Packet) error {
return nil
}

func (dg *dialogue) WriteWait(pkt packet.Packet) error {
dg.mtx.RLock()
defer dg.mtx.RUnlock()

if !dg.dialogueOK {
return io.EOF
}
pkt.(packet.SessionAbove).SetSessionID(dg.dialogueID)
dg.writeInCh <- pkt
return nil
}

func (dg *dialogue) Read() (packet.Packet, error) {
pkt, ok := <-dg.readOutCh
if !ok {
Expand Down
1 change: 1 addition & 0 deletions multiplexer/multiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Reader interface {

type Writer interface {
Write(pkt packet.Packet) error
WriteWait(pkt packet.Packet) error
}

type Closer interface {
Expand Down

0 comments on commit d8792fb

Please sign in to comment.