Skip to content

Commit

Permalink
Fix concurrent conflicts in elasticsearch (#53) (#58)
Browse files Browse the repository at this point in the history
This patch makes use of concurrent package to fix conflicting writes/reads
to/from ES buffer when bulk index is configured

Cherry picked from commit e61141a

Co-authored-by: Martin Mágr <[email protected]>
  • Loading branch information
leifmadsen and paramite authored Jun 29, 2021
1 parent 4e516c5 commit a37c100
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
18 changes: 10 additions & 8 deletions plugins/application/elasticsearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"

"github.com/infrawatch/sg-core/pkg/concurrent"
"github.com/infrawatch/sg-core/plugins/application/elasticsearch/pkg/lib"
)

Expand Down Expand Up @@ -51,15 +52,15 @@ type Elasticsearch struct {
configuration *lib.AppConfig
logger *logging.Logger
client *lib.Client
buffer map[string][]string
buffer *concurrent.Map
dump chan esIndex
}

// New constructor
func New(logger *logging.Logger) application.Application {
return &Elasticsearch{
logger: logger,
buffer: make(map[string][]string),
buffer: concurrent.NewMap(),
dump: make(chan esIndex, 100),
}
}
Expand Down Expand Up @@ -88,19 +89,20 @@ func (es *Elasticsearch) ReceiveEvent(event data.Event) {
// buffer or index record
var recordList []string
if es.configuration.BufferSize > 1 {
if _, ok := es.buffer[event.Index]; !ok {
es.buffer[event.Index] = make([]string, 0, es.configuration.BufferSize)
if !es.buffer.Contains(event.Index) {
es.buffer.Set(event.Index, make([]string, 0, es.configuration.BufferSize))
}

es.buffer[event.Index] = append(es.buffer[event.Index], record)
if len(es.buffer[event.Index]) < es.configuration.BufferSize {
recordList = (es.buffer.Get(event.Index)).([]string)
recordList = append(recordList, record)
if len(recordList) < es.configuration.BufferSize {
// buffer is not full, don't send
es.logger.Metadata(logging.Metadata{"plugin": appname, "record": record})
es.logger.Debug("buffering record")
es.buffer.Set(event.Index, recordList)
return
}
recordList = es.buffer[event.Index]
delete(es.buffer, event.Index)
es.buffer.Delete(event.Index)
} else {
recordList = []string{record}
}
Expand Down
5 changes: 3 additions & 2 deletions plugins/application/elasticsearch/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/infrawatch/apputils/logging"
"github.com/infrawatch/sg-core/pkg/concurrent"
"github.com/infrawatch/sg-core/pkg/data"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -232,7 +233,7 @@ func TestElasticsearchApp(t *testing.T) {
results := make(chan esIndex, len(eventCases))
app := &Elasticsearch{
logger: logger,
buffer: make(map[string][]string),
buffer: concurrent.NewMap(),
dump: results,
}
err := app.Config([]byte(testConf))
Expand All @@ -254,7 +255,7 @@ func TestElasticsearchApp(t *testing.T) {
results := make(chan esIndex, len(logCases))
app := &Elasticsearch{
logger: logger,
buffer: make(map[string][]string),
buffer: concurrent.NewMap(),
dump: results,
}
err := app.Config([]byte(testConf))
Expand Down

0 comments on commit a37c100

Please sign in to comment.