diff --git a/application/mock/dialogue_mock.go b/application/mock/dialogue_mock.go index f93b802..dfcf862 100644 --- a/application/mock/dialogue_mock.go +++ b/application/mock/dialogue_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ../../multiplexer/multiplexer.go +// Source: ../multiplexer/multiplexer.go // Package mock_multiplexer is a generated GoMock package. package mock @@ -211,6 +211,20 @@ func (mr *MockWriterMockRecorder) Write(pkt interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockWriter)(nil).Write), pkt) } +// WriteWait mocks base method. +func (m *MockWriter) WriteWait(pkt packet.Packet) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteWait", pkt) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteWait indicates an expected call of WriteWait. +func (mr *MockWriterMockRecorder) WriteWait(pkt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteWait", reflect.TypeOf((*MockWriter)(nil).WriteWait), pkt) +} + // MockCloser is a mock of Closer interface. type MockCloser struct { ctrl *gomock.Controller @@ -500,3 +514,17 @@ func (mr *MockDialogueMockRecorder) Write(pkt interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockDialogue)(nil).Write), pkt) } + +// WriteWait mocks base method. +func (m *MockDialogue) WriteWait(pkt packet.Packet) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteWait", pkt) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteWait indicates an expected call of WriteWait. +func (mr *MockDialogueMockRecorder) WriteWait(pkt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteWait", reflect.TypeOf((*MockDialogue)(nil).WriteWait), pkt) +} diff --git a/application/stream.go b/application/stream.go index 9a7bb90..e6a730c 100644 --- a/application/stream.go +++ b/application/stream.go @@ -108,7 +108,7 @@ func newStream(end *End, cn conn.Conn, dg multiplexer.Dialogue, opts *opts) *str failedCh: make(chan packet.Packet), dlReadChList: list.New(), dlWriteChList: list.New(), - writeInCh: make(chan packet.Packet), + writeInCh: make(chan packet.Packet, 32), closeCh: make(chan struct{}), } go sm.handlePkt() @@ -237,7 +237,8 @@ func (sm *stream) handleInMessagePacket(pkt *packet.MessagePacket) iodefine.IORe select { case sm.messageCh <- pkt: default: - // TODO ack error back + pkt := sm.pf.NewMessageAckPacketWithSessionID(sm.dg.DialogueID(), pkt.ID(), iodefine.ErrIOBufferFull) + sm.dg.WriteWait(pkt) return iodefine.IODiscard } return iodefine.IOSuccess @@ -331,7 +332,7 @@ func (sm *stream) handleInRequestPacket(pkt *packet.RequestPacket) iodefine.IORe // no rpc found, return to call error, note that this error is not set to response error err := fmt.Errorf("no such rpc: %s", method) rspPkt := sm.pf.NewResponsePacket(pkt.ID(), []byte(method), nil, err) - err = sm.dg.Write(rspPkt) + err = sm.dg.WriteWait(rspPkt) if err != nil { sm.log.Debugf("write no such rpc response packet err: %s, clientID: %d, dialogueID: %d, packetID: %d, packetType: %s, method: %s", err, sm.cn.ClientID(), sm.dg.DialogueID(), pkt.ID(), pkt.Type().String(), method) @@ -439,11 +440,8 @@ func (sm *stream) handleOutMessagePacket(pkt *packet.MessagePacket) iodefine.IOR } func (sm *stream) handleOutMessageAckPacket(pkt *packet.MessageAckPacket) iodefine.IORet { - err := sm.dg.Write(pkt) + err := sm.dg.WriteWait(pkt) if err != nil { - if err == io.ErrShortBuffer { - return iodefine.IODiscard - } sm.log.Debugf("write message ack packet err: %s, clientID: %d, dialogueID: %d, packetID: %d, packetType: %s", err, sm.cn.ClientID(), sm.dg.DialogueID(), pkt.ID(), pkt.Type().String()) return iodefine.IOErr @@ -507,7 +505,7 @@ func (sm *stream) doRPC(pkt *packet.RequestPacket, rpc methodRPC, method string, sm.rpcMtx.Unlock() rspPkt := sm.pf.NewResponsePacket(pkt.ID(), []byte(req.method), rsp.data, rsp.err) - err := sm.dg.Write(rspPkt) + err := sm.dg.WriteWait(rspPkt) if err != nil { // Write error, the response cannot be delivered, so should be debuged sm.log.Debugf("write response packet err: %s, clientID: %d, dialogueID: %d, packetID: %d, packetType: %s, method: %s", diff --git a/client/end_retry.go b/client/end_retry.go index 2b3e9db..87364a1 100644 --- a/client/end_retry.go +++ b/client/end_retry.go @@ -16,7 +16,7 @@ import ( ) type RetryEnd struct { - opts *RetryEndOptions + opts *EndOptions *delegate.UnimplementedDelegate end unsafe.Pointer @@ -36,10 +36,11 @@ type RetryEnd struct { hijackRPC geminio.HijackRPC } -func NewRetryEndWithDialer(dialer Dialer, opts ...*RetryEndOptions) (geminio.End, error) { +// RetryEnd with retry connection infinity +func NewRetryEndWithDialer(dialer Dialer, opts ...*EndOptions) (geminio.End, error) { // options - eo := MergeRetryEndOptions(opts...) - initRetryEndOptions(eo) + eo := MergeEndOptions(opts...) + initEndOptions(eo) ok := int32(1) re := &RetryEnd{ opts: eo, @@ -72,7 +73,7 @@ ERR: } func (re *RetryEnd) getEnd() (*clientEnd, error) { - end, err := NewEndWithDialer(re.dialer, re.opts.EndOptions) + end, err := NewEndWithDialer(re.dialer, re.opts) if err != nil { return nil, err } diff --git a/conn/conn_client.go b/conn/conn_client.go index 4e138fa..494d2e0 100644 --- a/conn/conn_client.go +++ b/conn/conn_client.go @@ -76,11 +76,9 @@ func OptionClientConnBufferSize(read, write int) ClientConnOption { return func(cc *ClientConn) error { if read != -1 { cc.readOutSize = read - cc.readInSize = read } if write != -1 { cc.writeInSize = write - cc.writeOutSize = write } return nil } diff --git a/examples/messager/client/messager_client.go b/examples/messager/client/messager_client.go index 9bd6f8e..276d9c8 100644 --- a/examples/messager/client/messager_client.go +++ b/examples/messager/client/messager_client.go @@ -40,7 +40,7 @@ func main() { dialer := func() (net.Conn, error) { return net.Dial(*network, *address) } - opt := client.NewRetryEndOptions() + opt := client.NewEndOptions() opt.SetLog(log) end, err := client.NewRetryEndWithDialer(dialer, opt) if err != nil { diff --git a/examples/mq/consumer/consumer.go b/examples/mq/consumer/consumer.go index 9bcc25a..4db929b 100644 --- a/examples/mq/consumer/consumer.go +++ b/examples/mq/consumer/consumer.go @@ -72,7 +72,7 @@ func main() { fc := &FakeClient{ UnimplementedDelegate: &delegate.UnimplementedDelegate{}, } - opt := client.NewRetryEndOptions() + opt := client.NewEndOptions() opt.SetLog(log) opt.SetWaitRemoteRPCs("claim") opt.SetDelegate(fc) diff --git a/examples/mq/producer/producer.go b/examples/mq/producer/producer.go index 55db542..0e79997 100644 --- a/examples/mq/producer/producer.go +++ b/examples/mq/producer/producer.go @@ -78,7 +78,7 @@ func main() { fc := &FakeClient{ UnimplementedDelegate: &delegate.UnimplementedDelegate{}, } - opt := client.NewRetryEndOptions() + opt := client.NewEndOptions() opt.SetLog(glog) opt.SetWaitRemoteRPCs("claim") opt.SetDelegate(fc) diff --git a/multiplexer/dialogue.go b/multiplexer/dialogue.go index ef3b486..e6cec86 100644 --- a/multiplexer/dialogue.go +++ b/multiplexer/dialogue.go @@ -128,12 +128,10 @@ func OptionDialogueNegotiatingID(negotiatingID uint64, dialogueIDPeersCall bool) func OptionDialogueBufferSize(read, write int) DialogueOption { return func(dg *dialogue) { if read != -1 { - dg.readInSize = read dg.readOutSize = read } if write != -1 { dg.writeInSize = write - dg.writeOutSize = write } } } @@ -222,6 +220,18 @@ func (dg *dialogue) Write(pkt packet.Packet) error { return nil } +func (dg *dialogue) WriteWait(pkt packet.Packet) error { + dg.mtx.RLock() + defer dg.mtx.RUnlock() + + if !dg.dialogueOK { + return io.EOF + } + pkt.(packet.SessionAbove).SetSessionID(dg.dialogueID) + dg.writeInCh <- pkt + return nil +} + func (dg *dialogue) Read() (packet.Packet, error) { pkt, ok := <-dg.readOutCh if !ok { diff --git a/multiplexer/multiplexer.go b/multiplexer/multiplexer.go index d83adab..4506ebe 100644 --- a/multiplexer/multiplexer.go +++ b/multiplexer/multiplexer.go @@ -34,6 +34,7 @@ type Reader interface { type Writer interface { Write(pkt packet.Packet) error + WriteWait(pkt packet.Packet) error } type Closer interface {