Skip to content

Commit

Permalink
materialize-boilerplate: fix logic error when loads are not staged fo…
Browse files Browse the repository at this point in the history
…r evaluation

This fixes a logic error for materializations that do not stage loads for
deferred evaluation & use the extended logging capabilities, either directly or
by turning on debug logging. The prior code did not account for the possibility
of a load response to be sent prior to reading flush, which is what happens with
these kinds of materializations.
  • Loading branch information
williamhbaker committed Jan 22, 2025
1 parent 047ceec commit 5108330
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
11 changes: 6 additions & 5 deletions materialize-boilerplate/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (l *extendedLogger) logAsync(asyncLog func()) (stopLogger func(doneLog func
}

func (l *extendedLogger) handler() func(transactionsEvent) {
var stopLoadLogger func(func())
var stopReadingLoadsLogger func(func())
var stopProcessingLoadsLogger func(func())
var stopStoreLogger func(func())
var stopWaitingForDocsLogger func(func())
var waitingForDocsMu sync.Mutex
Expand Down Expand Up @@ -184,19 +185,19 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
switch event {
case readLoad:
if n := l.readLoads.Add(1); n == 1 {
stopLoadLogger = l.logAsync(l.readingLoadsLogFn(round))
stopReadingLoadsLogger = l.logAsync(l.readingLoadsLogFn(round))
}
case readFlush:
if total := l.readLoads.Swap(0); total != 0 {
stopLoadLogger(l.finishedReadingLoadsLogFn(round, total))
stopReadingLoadsLogger(l.finishedReadingLoadsLogFn(round, total))
}
case sentLoaded:
if n := l.sentLoaded.Add(1); n == 1 {
stopLoadLogger = l.logAsync(l.processingLoadedsLogFn(round))
stopProcessingLoadsLogger = l.logAsync(l.processingLoadedsLogFn(round))
}
case sentFlushed:
if total := l.sentLoaded.Swap(0); total != 0 {
stopLoadLogger(l.finishedProcessingLoadedsLogFn(round, total))
stopProcessingLoadsLogger(l.finishedProcessingLoadedsLogFn(round, total))
}
case readStore:
if n := l.readStores.Add(1); n == 1 {
Expand Down
49 changes: 49 additions & 0 deletions materialize-boilerplate/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,52 @@ func TestExtendedLoggerWaitingForDocsRace(t *testing.T) {
wg.Wait()
}
}

func TestLoads(t *testing.T) {
for _, tt := range []struct {
name string
do func(func(transactionsEvent))
}{
{
"no load requests",
func(handler func(transactionsEvent)) {
handler(readFlush)
handler(sentFlushed)
},
},
{
"no load results",
func(handler func(transactionsEvent)) {
handler(readLoad)
handler(readFlush)
handler(sentFlushed)
},
},
{
"staged with results",
func(handler func(transactionsEvent)) {
handler(readLoad)
handler(readFlush)
handler(sentLoaded)
handler(sentFlushed)
},
},
{
"not staged with results",
func(handler func(transactionsEvent)) {
handler(readLoad)
handler(sentLoaded)
handler(readFlush)
handler(sentFlushed)
},
},
} {
t.Run(tt.name, func(t *testing.T) {
be := newBindingEvents()
logger := newExtendedLogger(loggerAtLevel{lvl: log.InfoLevel}, be)
handler := logger.handler()
tt.do(handler)
})

}
}

0 comments on commit 5108330

Please sign in to comment.