Skip to content

Commit

Permalink
fix byte vs event logic
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed May 30, 2024
1 parent 5a158ec commit 24c5564
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
14 changes: 8 additions & 6 deletions libbeat/publisher/queue/memqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ type config struct {

const minQueueBytes = 32768
const minQueueEvents = 32
const defaultMaxQueueEvents = 3200

func (c *config) Validate() error {
if c.Bytes != nil && *c.Bytes < minQueueBytes {
return errors.New(fmt.Sprintf("queue byte size must be at least %v", minQueueBytes))
return fmt.Errorf("queue byte size must be at least %v", minQueueBytes)
}
if c.Events != nil && *c.Events < minQueueEvents {
return errors.New(fmt.Sprintf("queue event size must be at least %v", minQueueEvents))
return fmt.Errorf("queue event size must be at least %v", minQueueEvents)
}
if c.Events == nil && c.Bytes == nil {
return errors.New("queue must have an event limit or a byte limit")
if c.Events != nil && c.Bytes != nil {
return errors.New("memory queue can only have an event limit or a byte limit, not both")
}
if c.Events != nil && c.MaxGetEvents > *c.Events {
return errors.New("flush.min_events must be less than events")
Expand All @@ -66,7 +67,7 @@ var defaultConfig = config{
// SettingsForUserConfig unpacks a ucfg config from a Beats queue
// configuration and returns the equivalent memqueue.Settings object.
func SettingsForUserConfig(cfg *c.C) (Settings, error) {
var config config
config := defaultConfig
if cfg != nil {
if err := cfg.Unpack(&config); err != nil {
return Settings{}, fmt.Errorf("couldn't unpack memory queue config: %w", err)
Expand All @@ -76,6 +77,7 @@ func SettingsForUserConfig(cfg *c.C) (Settings, error) {
MaxGetRequest: config.MaxGetEvents,
FlushTimeout: config.FlushTimeout,
}

if config.Events != nil {
result.Events = *config.Events
}
Expand All @@ -84,7 +86,7 @@ func SettingsForUserConfig(cfg *c.C) (Settings, error) {
}
// If no size constraint was given, fall back on the default event cap
if config.Events == nil && config.Bytes == nil {
result.Events = 3200
result.Events = defaultMaxQueueEvents
}
return result, nil
}
13 changes: 6 additions & 7 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newRunLoop(broker *broker) *runLoop {
}

eventBufSize := broker.settings.Events
if eventBufSize <= 0 {
if broker.useByteLimits() {
// The queue is using byte limits, start with a buffer of 2^10 and
// we will expand it as needed.
eventBufSize = 1 << 10
Expand Down Expand Up @@ -212,9 +212,9 @@ func (l *runLoop) getRequestShouldBlock(req *getRequest) bool {
// Respond to the given get request without blocking or waiting for more events
func (l *runLoop) handleGetReply(req *getRequest) {
entriesAvailable := l.eventCount - l.consumedEventCount
// backwards compatibility: if all byte bounds are <= 0 then batch size
// backwards compatibility: when using event-based limits, batch size
// can't be more than settings.MaxGetRequest.
if req.byteCount <= 0 && l.broker.settings.Bytes <= 0 {
if l.broker.useByteLimits() {
if entriesAvailable > l.broker.settings.MaxGetRequest {
entriesAvailable = l.broker.settings.MaxGetRequest
}
Expand Down Expand Up @@ -343,10 +343,9 @@ func (l *runLoop) growEventBuffer() {
// Insert the given new event without bounds checks, and report the result
// to the caller via the push request's response channel.
func (l *runLoop) doInsert(req pushRequest) {
maxEvents := l.broker.settings.Events
// If there is no event limit, check if we need to grow the current queue
// buffer to fit the new event.
if maxEvents <= 0 && l.eventCount >= l.buf.size() {
// If using byte limits (no hard limit on event count), check if we need to
// grow the current queue buffer to fit the new event.
if l.broker.useByteLimits() && l.eventCount >= l.buf.size() {
l.growEventBuffer()
}

Expand Down

0 comments on commit 24c5564

Please sign in to comment.