Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130158: pkg/util/log: Fix FileSink iteration logic r=arjunmahishi a=arjunmahishi

Recently, buffering was introduced in File Sinks. This was done by creating a `bufferdSink` wrapper on top of the existing `FileSink` type. The iterator function that goes over all the configured file sinks, does not recognise the `bufferSink` wrapper. As a result, the iterator was only returning file sinks that are unbuffered.

This led to issues like the `debug zip` command not being able to fetch logs for a cluster where the buffering is enabled by default in all the file sinks. Refer to CLOUDOPS-10542 for more details

This commit fixes that by handling both types in the iterator function.

Part of: CLOUDOPS-10542
Release note: None

Co-authored-by: Arjun Mahishi <[email protected]>
  • Loading branch information
craig[bot] and arjunmahishi committed Sep 6, 2024
2 parents 950a090 + c4ffa18 commit 0f415ff
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 6 deletions.
8 changes: 6 additions & 2 deletions pkg/util/log/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func (r *sinkInfoRegistry) iter(fn func(l *sinkInfo) error) error {
func (r *sinkInfoRegistry) iterFileSinks(fn func(l *fileSink) error) error {
return r.iter(func(si *sinkInfo) error {
if fs, ok := si.sink.(*fileSink); ok {
if err := fn(fs); err != nil {
return err
return fn(fs)
}

if bs, ok := si.sink.(*bufferedSink); ok {
if fs, ok := bs.child.(*fileSink); ok {
return fn(fs)
}
}
return nil
Expand Down
95 changes: 91 additions & 4 deletions pkg/util/log/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,102 @@ import (
"github.com/stretchr/testify/require"
)

var (
oneMicro = 1 * time.Microsecond
oneByte = logconfig.ByteSize(1)
oneThousandBytes = logconfig.ByteSize(1000)
notBuffered = false
bufferedAddr = "buffered.sink.com"
)

func TestIterFileSinks(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := ScopeWithoutShowLogs(t)
defer sc.Close(t)

// Set up a log config containing a file sink.
cfg := logconfig.DefaultConfig()

cfg.Sinks.FileGroups = map[string]*logconfig.FileSinkConfig{
"unbuffered": {
Channels: logconfig.SelectChannels(channel.OPS),
FileDefaults: logconfig.FileDefaults{},
},
"buffered": {
Channels: logconfig.SelectChannels(channel.DEV),
FileDefaults: logconfig.FileDefaults{
BufferedWrites: &notBuffered,
CommonSinkConfig: logconfig.CommonSinkConfig{
Buffering: logconfig.CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: logconfig.CommonBufferSinkConfig{
MaxStaleness: &oneMicro,
MaxBufferSize: &oneThousandBytes,
FlushTriggerSize: &oneByte,
},
},
},
},
},
}

// add an HTTP sink to make sure the iteration doesn't pick up all the
// buffered sinks. It should only pick up the file sinks.
cfg.Sinks.HTTPServers = map[string]*logconfig.HTTPSinkConfig{
"buffered-http": {
Channels: logconfig.SelectChannels(channel.OPS),
HTTPDefaults: logconfig.HTTPDefaults{
Address: &bufferedAddr,
CommonSinkConfig: logconfig.CommonSinkConfig{
Buffering: logconfig.CommonBufferSinkConfigWrapper{
CommonBufferSinkConfig: logconfig.CommonBufferSinkConfig{
MaxStaleness: &oneMicro,
MaxBufferSize: &oneThousandBytes,
FlushTriggerSize: &oneByte,
},
},
},
},
},
}

require.NoError(t, cfg.Validate(&sc.logDir))

// Apply the configuration
TestingResetActive()
cleanup, err := ApplyConfig(cfg, nil /* fileSinkMetricsForDir */, nil /* fatalOnLogStall */)
require.NoError(t, err)
defer cleanup()

callMap := map[string]bool{
"logtest-stderr": false,
"logtest-unbuffered": false,
"logtest-buffered": false,
}

fn := func(fs *fileSink) error {
require.NotEqual(
t, fs.nameGenerator.fileNamePrefix,
"logtest-buffered-http", "unexpected fileSink %q", fs.nameGenerator.fileNamePrefix,
)
require.False(
t, callMap[fs.nameGenerator.fileNamePrefix], "fileSink %q was called twice", fs.nameGenerator.fileNamePrefix,
)

callMap[fs.nameGenerator.fileNamePrefix] = true
return nil
}
require.NoError(t, logging.allSinkInfos.iterFileSinks(fn))

for k, v := range callMap {
require.Truef(t, v, "fileSink %q was never called during iteration", k)
}
}

func TestIterHTTPSinks(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := ScopeWithoutShowLogs(t)
defer sc.Close(t)

oneMicro := 1 * time.Microsecond
oneByte := logconfig.ByteSize(1)
oneThousandBytes := logconfig.ByteSize(1000)
bufferedAddr := "buffered.sink.com"
unbufferedAddr := "unbuffered.sink.com"

// Set up a log config containing both buffered and unbuffered HTTP sinks.
Expand Down

0 comments on commit 0f415ff

Please sign in to comment.