Skip to content

Commit

Permalink
materialize-mongodb: use multiple concurrent load workers
Browse files Browse the repository at this point in the history
Use multiple concurrent load workers to maximize throughput when there are a lot
of load requests to evaluate. This can be a limiting factor particularly when
round-trip latency is high.

The original code was structured to allow for easily adding concurrency to these
requests, but I just hadn't seen a need for it until now, when we have an
example of a task that seems to be getting limited at this point.
  • Loading branch information
williamhbaker committed Jan 22, 2025
1 parent 5108330 commit c3f833b
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions materialize-mongodb/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"sync"

m "github.com/estuary/connectors/go/protocols/materialize"
pf "github.com/estuary/flow/go/protocols/flow"
Expand All @@ -26,6 +27,8 @@ const (
// https://www.mongodb.com/docs/manual/reference/operator/query/in/#-in
loadBatchSize = 100

concurrentLoadWorkers = 5

// The default batchWriteLimit is 100,000 documents. Practically speaking we will be limited to
// less than that to keep connector memory usage reasonable.
storeBatchSize = 10_000
Expand All @@ -49,12 +52,22 @@ func (t *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
ctx := it.Context()
it.WaitForAcknowledged()

var mu sync.Mutex
lockedAndLoaded := func(binding int, doc json.RawMessage) error {
// Prevent concurrent load workers from interleaving |loaded| responses.
mu.Lock()
defer mu.Unlock()
return loaded(binding, doc)
}

sendBatches := make(chan loadBatch)

group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
return t.loadWorker(groupCtx, loaded, sendBatches)
})
for idx := 0; idx < concurrentLoadWorkers; idx++ {
group.Go(func() error {
return t.loadWorker(groupCtx, lockedAndLoaded, sendBatches)
})
}

sendBatch := func(binding int, batch []string) error {
select {
Expand Down

0 comments on commit c3f833b

Please sign in to comment.