From 1e2d61fa48c1ffe2f8cfb50eb01a59675f0a9052 Mon Sep 17 00:00:00 2001 From: Victor Luchits Date: Wed, 31 Jul 2024 16:21:17 +0300 Subject: [PATCH] Let the reader and writer shutdown cleanly before closing the server connection --- server.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index 5a9d1f0..e19ee8e 100644 --- a/server.go +++ b/server.go @@ -32,6 +32,7 @@ type IprotoServer struct { firstError error perf PerfCount schemaID uint64 + wg sync.WaitGroup } type IprotoServerOptions struct { @@ -131,7 +132,10 @@ func (s *IprotoServer) Shutdown() error { if s.onShutdown != nil { s.onShutdown(err) } - s.conn.Close() + go func() { + s.wg.Wait() + s.conn.Close() + }() }) return err @@ -166,8 +170,17 @@ func (s *IprotoServer) greet() (err error) { } func (s *IprotoServer) loop() { - go s.read() - go s.write() + s.wg.Add(2) + + go func() { + defer s.wg.Done() + s.read() + }() + + go func() { + defer s.wg.Done() + s.write() + }() } func (s *IprotoServer) read() { @@ -175,6 +188,7 @@ func (s *IprotoServer) read() { var pp *BinaryPacket r := s.reader + var wg sync.WaitGroup READER_LOOP: for { @@ -193,8 +207,10 @@ READER_LOOP: s.perf.NetPacketsIn.Add(1) } + wg.Add(1) go func(pp *BinaryPacket) { packet := &pp.packet + defer wg.Done() err := packet.UnmarshalBinary(pp.body) @@ -244,6 +260,7 @@ READER_LOOP: if err != nil { s.setError(err) } + wg.Wait() s.Shutdown() CLEANUP_LOOP: