diff --git a/client.go b/client.go index 7a21a91..42add0a 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "golang.org/x/sync/errgroup" "github.com/jtolio/eventkit/pb" "github.com/jtolio/eventkit/utils" @@ -156,7 +157,12 @@ func (c *UDPClient) Run(ctx context.Context) { c.init() ticker := utils.NewJitteredTicker(c.FlushInterval) - defer ticker.Stop() + var background errgroup.Group + defer func() { _ = background.Wait() }() + background.Go(func() error { + ticker.Run(ctx) + return nil + }) p := c.newOutgoingPacket() diff --git a/utils/jitter.go b/utils/jitter.go index f5e87cf..d6f2d04 100644 --- a/utils/jitter.go +++ b/utils/jitter.go @@ -1,6 +1,7 @@ package utils import ( + "context" "math/rand" "sync" "time" @@ -14,35 +15,32 @@ type JitteredTicker struct { } func NewJitteredTicker(interval time.Duration) *JitteredTicker { - t := &JitteredTicker{ + return &JitteredTicker{ C: make(chan struct{}, 1), interval: interval, - closed: make(chan struct{}), } - go t.tick() - return t } -func (t *JitteredTicker) tick() { +func (t *JitteredTicker) Run(ctx context.Context) { r := rand.New(rand.NewSource(time.Now().UnixNano())) timer := time.NewTimer(Jitter(r, t.interval)) defer timer.Stop() for { select { - case <-t.closed: + case <-ctx.Done(): return case <-timer.C: - t.C <- struct{}{} + select { + case t.C <- struct{}{}: + case <-ctx.Done(): + return + } timer.Reset(Jitter(r, t.interval)) } } } -func (t *JitteredTicker) Stop() { - t.closeOnce.Do(func() { close(t.closed) }) -} - func Jitter(r *rand.Rand, t time.Duration) time.Duration { nanos := r.NormFloat64()*float64(t/4) + float64(t) if nanos <= 0 {