From a620147a23d06b7275c6bf4d91010a6d852c0262 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Mon, 3 Oct 2022 18:37:41 +0300 Subject: [PATCH] avoid background goroutine --- client.go | 106 +++++++++++++++++++++++++----------------------------- 1 file changed, 48 insertions(+), 58 deletions(-) diff --git a/client.go b/client.go index 42add0a..37b7b5b 100644 --- a/client.go +++ b/client.go @@ -3,16 +3,13 @@ package eventkit import ( "bytes" "compress/zlib" - "context" "net" "sync" "time" "github.com/gogo/protobuf/proto" - "golang.org/x/sync/errgroup" "github.com/jtolio/eventkit/pb" - "github.com/jtolio/eventkit/utils" ) const ( @@ -37,8 +34,11 @@ type UDPClient struct { CompressionLevel int FlushInterval time.Duration - initOnce sync.Once - submitQueue chan *Event + mu sync.Mutex + closed bool + active sync.WaitGroup + pending *outgoingPacket + flush chan struct{} } func NewUDPClient(application, version, instance, addr string) *UDPClient { @@ -56,12 +56,6 @@ func NewUDPClient(application, version, instance, addr string) *UDPClient { return c } -func (c *UDPClient) init() { - c.initOnce.Do(func() { - c.submitQueue = make(chan *Event, c.QueueDepth) - }) -} - type outgoingPacket struct { buf bytes.Buffer zl *zlib.Writer @@ -153,47 +147,52 @@ func (op *outgoingPacket) addEvent(ev *Event) (full bool) { return (op.written + trailerSize) > op.maxUncompressed } -func (c *UDPClient) Run(ctx context.Context) { - c.init() +func (c *UDPClient) Submit(event *Event) { + c.mu.Lock() + defer c.mu.Unlock() - ticker := utils.NewJitteredTicker(c.FlushInterval) - var background errgroup.Group - defer func() { _ = background.Wait() }() - background.Go(func() error { - ticker.Run(ctx) - return nil - }) + if c.closed { + return + } - p := c.newOutgoingPacket() - - sendAndReset := func() { - _ = c.send(p, c.Addr) - p = c.newOutgoingPacket() - } - - for { - select { - case em := <-c.submitQueue: - if p.addEvent(em) { - sendAndReset() - } - case <-ticker.C: - if p.events > 0 { - sendAndReset() - } - case <-ctx.Done(): - left := len(c.submitQueue) - for i := 0; i < left; i++ { - if p.addEvent(<-c.submitQueue) { - sendAndReset() - } - } - if p.events > 0 { - _ = c.send(p, c.Addr) - } - return - } + if c.pending == nil { + c.pending = c.newOutgoingPacket() + c.flush = make(chan struct{}, 1) + c.active.Add(1) + go c.waitForFull(c.pending, c.flush) + } + + if c.pending.addEvent(event) { + c.flush <- struct{}{} + c.pending, c.flush = nil, nil + } +} + +func (c *UDPClient) Close() { + c.mu.Lock() + c.closed = true + c.mu.Unlock() + + c.active.Wait() +} + +func (c *UDPClient) waitForFull(active *outgoingPacket, work chan struct{}) { + defer c.active.Done() + + tick := time.NewTimer(c.FlushInterval) // TODO: jitter + select { + case <-work: + case <-tick.C: } + tick.Stop() + + c.mu.Lock() + if c.pending == active { + c.pending = nil + } + c.mu.Unlock() + + c.send(active, c.Addr) } func (c *UDPClient) send(packet *outgoingPacket, addr string) (err error) { @@ -215,12 +214,3 @@ func (c *UDPClient) send(packet *outgoingPacket, addr string) (err error) { _, _, err = conn.WriteMsgUDP(packet.finalize(), nil, nil) return err } - -func (c *UDPClient) Submit(event *Event) { - c.init() - - select { - case c.submitQueue <- event: - default: - } -}