Skip to content

Commit

Permalink
fix: Return to read immediately after a successful read.
Browse files Browse the repository at this point in the history
Copying the filestream, now pipe, dgram, and socket streams return to read
again immediately after a successful read, so that we don't wait.

This is now obvious the problem in #685 and using the bandwidth-delay-product
we can see that a 250ms pause between reads of 4096B and 128KiB matches the
results seen.

Before:
```
jaq% time ./mtail -logs - -progs examples/rsyncd.mtail < internal/mtail/testdata/rsyncd.log
0.01s user 0.01s system 7% cpu 0.264 total
```

After:
```
jaq% time ./mtail -logs - -progs examples/rsyncd.mtail < internal/mtail/testdata/rsyncd.log
0.01s user 0.03s system 102% cpu 0.041 total
```

Thanks to @rideliner for the hint.
  • Loading branch information
jaqx0r committed Jul 5, 2024
1 parent b8df61a commit b01ee8c
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 33 deletions.
8 changes: 7 additions & 1 deletion internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
ds.lastReadTime = time.Now()
ds.mu.Unlock()
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && IsEndOrCancel(err) {
Expand All @@ -125,12 +130,13 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
select {
case <-ctx.Done():
// Exit after next read attempt.
// We may have started waiting here when the stop signal
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
// we need to go back and try one more read. We'll exit
// the stream in the zero byte handler above.
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)
Expand Down
8 changes: 2 additions & 6 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// Stream is not shut down with cancel in this test
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
Expand All @@ -59,8 +59,6 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // sync past read

// "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done.
_, err = s.Write([]byte{})
testutil.FatalIfErr(t, err)
Expand Down Expand Up @@ -94,7 +92,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
Expand All @@ -111,8 +109,6 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Synchronise past read.

cancel() // This cancellation should cause the stream to shut down.
wg.Wait()

Expand Down
14 changes: 7 additions & 7 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
fs.lastReadTime = time.Now()
fs.mu.Unlock()
fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && err != io.EOF {
Expand Down Expand Up @@ -206,12 +211,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
}

// No error implies there is more to read in this file so go
// straight back to read unless it looks like context is Done.
if err == nil && ctx.Err() == nil {
continue
}

Sleep:
// If we get here it's because we've stalled. First test to see if it's
// time to exit.
Expand Down Expand Up @@ -243,6 +242,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.V(2).Infof("stream(%s): waiting", fs.pathname)
select {
case <-ctx.Done():
// Exit after next read attempt.
// We may have started waiting here when the cancellation
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
Expand All @@ -251,7 +251,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
// could argue exiting immediately is less surprising.
// Assumption is that this doesn't make a difference in
// production.
glog.V(2).Infof("stream(%s): Cancelled after next read", fs.pathname)
glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", fs.pathname)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s): Wake received", fs.pathname)
Expand Down
14 changes: 8 additions & 6 deletions internal/tailer/logstream/pipestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
logCloses.Add(ps.pathname, 1)
close(ps.lines)
ps.cancel()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, fd)

for {
Expand All @@ -105,6 +104,11 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
ps.lastReadTime = time.Now()
ps.mu.Unlock()
ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

// Test to see if we should exit.
Expand All @@ -120,10 +124,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
glog.V(2).Infof("stream(%s): waiting", ps.pathname)
select {
case <-ctx.Done():
// Exit immediately; cancelled context is going to cause the
// next read to be interrupted and exit, so don't bother going
// around the loop again.
return
// Exit after next read attempt.
glog.V(2).Infof("stream(%s): context cancelled, exiting after next read timeout", ps.pathname)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s): Wake received", ps.pathname)
Expand Down
10 changes: 4 additions & 6 deletions internal/tailer/logstream/pipestream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666))

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)
Expand All @@ -87,9 +87,6 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {

testutil.WriteString(t, f, "1\n")

// Avoid a race with cancellation if we can synchronise with waker.Wake()
awaken(0, 0)

cancel() // Cancellation here should cause the stream to shut down.
wg.Wait()

Expand Down Expand Up @@ -155,7 +152,7 @@ func TestPipeStreamReadStdin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down by cancel in this test.
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled)
testutil.FatalIfErr(t, err)
Expand All @@ -165,7 +162,8 @@ func TestPipeStreamReadStdin(t *testing.T) {
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines())

awaken(0, 0)
// Give the stream a chance to wake and read
time.Sleep(10 * time.Millisecond)

testutil.FatalIfErr(t, f.Close())

Expand Down
7 changes: 6 additions & 1 deletion internal/tailer/logstream/socketstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
ss.lastReadTime = time.Now()
ss.mu.Unlock()
ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && IsEndOrCancel(err) {
Expand All @@ -150,7 +155,7 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
select {
case <-ctx.Done():
// Cancelled context will cause the next read to be interrupted and exit.
// Exit after next read attempt.
glog.V(2).Infof("stream(%s:%s): context cancelled, exiting after next read timeout", ss.scheme, ss.address)
case <-waker.Wake():
// sleep until next Wake()
Expand Down
8 changes: 2 additions & 6 deletions internal/tailer/logstream/socketstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down with cancel in this test.
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
Expand All @@ -57,8 +57,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Sync past read

// Close the socket to signal to the socketStream to shut down.
testutil.FatalIfErr(t, s.Close())

Expand Down Expand Up @@ -91,7 +89,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
Expand All @@ -108,8 +106,6 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Sync past read to ensure we read

cancel() // This cancellation should cause the stream to shut down immediately.
wg.Wait()

Expand Down

0 comments on commit b01ee8c

Please sign in to comment.