Skip to content

Commit

Permalink
avoid background goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
egonelbre committed Oct 3, 2022
1 parent 13588e6 commit a620147
Showing 1 changed file with 48 additions and 58 deletions.
106 changes: 48 additions & 58 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package eventkit
import (
"bytes"
"compress/zlib"
"context"
"net"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"golang.org/x/sync/errgroup"

"github.com/jtolio/eventkit/pb"
"github.com/jtolio/eventkit/utils"
)

const (
Expand All @@ -37,8 +34,11 @@ type UDPClient struct {
CompressionLevel int
FlushInterval time.Duration

initOnce sync.Once
submitQueue chan *Event
mu sync.Mutex
closed bool
active sync.WaitGroup
pending *outgoingPacket
flush chan struct{}
}

func NewUDPClient(application, version, instance, addr string) *UDPClient {
Expand All @@ -56,12 +56,6 @@ func NewUDPClient(application, version, instance, addr string) *UDPClient {
return c
}

func (c *UDPClient) init() {
c.initOnce.Do(func() {
c.submitQueue = make(chan *Event, c.QueueDepth)
})
}

type outgoingPacket struct {
buf bytes.Buffer
zl *zlib.Writer
Expand Down Expand Up @@ -153,47 +147,52 @@ func (op *outgoingPacket) addEvent(ev *Event) (full bool) {
return (op.written + trailerSize) > op.maxUncompressed
}

func (c *UDPClient) Run(ctx context.Context) {
c.init()
func (c *UDPClient) Submit(event *Event) {
c.mu.Lock()
defer c.mu.Unlock()

ticker := utils.NewJitteredTicker(c.FlushInterval)
var background errgroup.Group
defer func() { _ = background.Wait() }()
background.Go(func() error {
ticker.Run(ctx)
return nil
})
if c.closed {
return
}

p := c.newOutgoingPacket()

sendAndReset := func() {
_ = c.send(p, c.Addr)
p = c.newOutgoingPacket()
}

for {
select {
case em := <-c.submitQueue:
if p.addEvent(em) {
sendAndReset()
}
case <-ticker.C:
if p.events > 0 {
sendAndReset()
}
case <-ctx.Done():
left := len(c.submitQueue)
for i := 0; i < left; i++ {
if p.addEvent(<-c.submitQueue) {
sendAndReset()
}
}
if p.events > 0 {
_ = c.send(p, c.Addr)
}
return
}
if c.pending == nil {
c.pending = c.newOutgoingPacket()
c.flush = make(chan struct{}, 1)
c.active.Add(1)
go c.waitForFull(c.pending, c.flush)
}

if c.pending.addEvent(event) {
c.flush <- struct{}{}
c.pending, c.flush = nil, nil
}
}

func (c *UDPClient) Close() {
c.mu.Lock()
c.closed = true
c.mu.Unlock()

c.active.Wait()
}

func (c *UDPClient) waitForFull(active *outgoingPacket, work chan struct{}) {
defer c.active.Done()

tick := time.NewTimer(c.FlushInterval) // TODO: jitter
select {
case <-work:
case <-tick.C:
}
tick.Stop()

c.mu.Lock()
if c.pending == active {
c.pending = nil
}
c.mu.Unlock()

c.send(active, c.Addr)
}

func (c *UDPClient) send(packet *outgoingPacket, addr string) (err error) {
Expand All @@ -215,12 +214,3 @@ func (c *UDPClient) send(packet *outgoingPacket, addr string) (err error) {
_, _, err = conn.WriteMsgUDP(packet.finalize(), nil, nil)
return err
}

func (c *UDPClient) Submit(event *Event) {
c.init()

select {
case c.submitQueue <- event:
default:
}
}

0 comments on commit a620147

Please sign in to comment.