Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: [WIP] Add instrumentations when routing logs to the log driver #116

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions logger/buffered_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ func (bl *bufferedLogger) Start(
return err
}

var logWG sync.WaitGroup
logWG.Add(1)
stopTracingLogRoutingChan := make(chan bool, 1)
bytesReadFromSrc = 0
bytesSentToDst = 0
go func(){
startTracingLogRouting(bl.containerID, stopTracingLogRoutingChan)
logWG.Done()
}()
defer func() {
debug.SendEventsToLog(DaemonName, "Sending signal to stop the ticker.", debug.DEBUG, 0)
stopTracingLogRoutingChan <- true
logWG.Wait()
}()

errGroup, ctx := errgroup.WithContext(ctx)
// Start the goroutine of underlying log driver to consume logs from ring buffer and
// send logs to destination when there's any.
Expand All @@ -110,6 +125,7 @@ func (bl *bufferedLogger) Start(
pipe := p

errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, fmt.Sprintf("Reading logs from pipe %s", source), debug.DEBUG, 0)
logErr := bl.saveLogMessagesToRingBuffer(ctx, pipe, source)
if logErr != nil {
err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
Expand Down
67 changes: 65 additions & 2 deletions logger/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"time"

"github.com/aws/shim-loggers-for-containerd/debug"

dockerlogger "github.com/docker/docker/daemon/logger"

types "github.com/docker/docker/api/types/backend"
dockerlogger "github.com/docker/docker/daemon/logger"
"golang.org/x/sync/errgroup"
)

Expand All @@ -45,6 +46,16 @@ const (
// from Docker, reference:
// https://github.com/moby/moby/blob/19.03/daemon/logger/copier.go#L21
DefaultBufSizeInBytes = 16 * 1024

traceLogRoutingInterval = 1 * time.Minute
)

var(
// bytesReadFromSrc defines the number of bytes we read from the source(all pipes) within given time interval.
bytesReadFromSrc uint64
// bytesSentToDst defines the number of bytes we send to the destination(the corresponding log driver) within given
// time interval.
bytesSentToDst uint64
)

// GlobalArgs contains the essential arguments required for initializing the logger.
Expand Down Expand Up @@ -169,6 +180,21 @@ func (l *Logger) Start(
return err
}

var logWG sync.WaitGroup
logWG.Add(1)
stopTracingLogRoutingChan := make(chan bool, 1)
atomic.StoreUint64(&bytesReadFromSrc, 0)
atomic.StoreUint64(&bytesSentToDst, 0)
go func(){
startTracingLogRouting(l.Info.ContainerID, stopTracingLogRoutingChan)
logWG.Done()
}()
defer func() {
debug.SendEventsToLog(l.Info.ContainerID, "Sending signal to stop the ticker.", debug.DEBUG, 0)
stopTracingLogRoutingChan <- true
logWG.Wait()
}()

errGroup, ctx := errgroup.WithContext(ctx)
for pn, p := range pipeNameToPipe {
// Copy pn and p to new variables source and pipe, accordingly.
Expand Down Expand Up @@ -271,6 +297,7 @@ func (l *Logger) Read(
if err != nil {
return err
}

// If container pipe is closed and no bytes left in our buffer, directly return.
if eof && bytesInBuffer == 0 {
return nil
Expand Down Expand Up @@ -304,6 +331,7 @@ func (l *Logger) Read(
if err != nil {
return err
}
atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
// Since we have found a newline symbol, it means this line has ended.
// Reset flags.
isFirstPartial = true
Expand Down Expand Up @@ -349,6 +377,7 @@ func (l *Logger) Read(
return err
}

atomic.AddUint64(&bytesSentToDst, uint64(len(curLine)))
// reset head and bytesInBuffer
head = 0
bytesInBuffer = 0
Expand Down Expand Up @@ -376,6 +405,39 @@ func (l *Logger) Read(
}
}


// startTracingLogRouting will emit logs every 1 minute where it counts how many bytes are read from the source
// (container pipes) within given interval and how many bytes are sent to the destination (the log driver).
func startTracingLogRouting(containerID string, stop chan bool) {
ticker := time.NewTicker(traceLogRoutingInterval)
debug.SendEventsToLog(containerID, "Starting the ticker...", debug.DEBUG, 0)
for {
select {
case <-ticker.C:
// The ticker is running as a separate goroutine which has read and write operations to
// `bytesReadFromSrc/bytesSentToDst`. And the shim logger process has another write operation to the same
// var. To avoid race conditions between these two, we should use atomic variables.
previousBytesReadFromSrc := atomic.SwapUint64(&bytesReadFromSrc, 0)
previousBytesSentToDst := atomic.SwapUint64(&bytesSentToDst, 0)
debug.SendEventsToLog(
containerID,
fmt.Sprintf("Within last minute, reading %d bytes from the source " +
"and sending %d bytes to the destination",
previousBytesReadFromSrc, previousBytesSentToDst),
debug.DEBUG,
0)
case <-stop:
debug.SendEventsToLog(containerID,
fmt.Sprintf("Reading %d bytes from the source and sending %d bytes to the destination",
atomic.LoadUint64(&bytesReadFromSrc), atomic.LoadUint64(&bytesSentToDst)),
debug.DEBUG, 0)
ticker.Stop()
debug.SendEventsToLog(containerID, "Stopped the ticker...", debug.DEBUG, 0)
return
}
}
}

// generateRandomID is based on Docker
// GenerateRandomID: https://github.com/moby/moby/blob/bca8d9f2ce0d63e1490692917cde6273bc288bad/pkg/stringid/stringid.go#L40
// with the simplification that we don't need to worry about guaranteeing the string isn't all 0 - 9
Expand Down Expand Up @@ -407,6 +469,7 @@ func readFromContainerPipe(pipe io.Reader, buf []byte, bytesInBuffer, maxReadByt
// Pipe is closed, set flag to true.
eof = true
}
atomic.AddUint64(&bytesReadFromSrc, uint64(readBytesFromPipe))
bytesInBuffer += readBytesFromPipe
}

Expand Down
51 changes: 51 additions & 0 deletions logger/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"os"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -207,3 +208,53 @@ func TestNewInfo(t *testing.T) {
info := NewInfo(testContainerID, testContainerName, WithConfig(config))
require.Equal(t, config, info.Config)
}

// TestTracingLogRouting verifies the expected number of bytes are read from the source/containers' pipe files and the
// expected number of bytes are sent to the destination/the log driver.
func TestTracingLogRouting(t *testing.T) {
// Create a tmp file that used to mock the io pipe where the logger reads log
// messages from.
tmpIOSource, err := os.CreateTemp("", "")
require.NoError(t, err)
defer os.Remove(tmpIOSource.Name()) //nolint:errcheck // testing only
var (
testStdout bytes.Buffer
testStderr bytes.Buffer
)
// Create two pipes for stdout and stderr.
inputForStdout := "1234567890\n"
countOfNewLinesForStdout := 1
_, err = testStdout.WriteString(inputForStdout)
require.NoError(t, err)
inputForStderr := "123 456 789 0\n123 456 789 0\n123 456 789 0\n"
countOfNewLinesForStderr := 3
_, err = testStderr.WriteString(inputForStderr)
require.NoError(t, err)
// Create a tmp file that used to inside customized dummy Log function where the
// logger sends log messages to.
tmpDest, err := os.CreateTemp("", "")
require.NoError(t, err)
defer os.Remove(tmpDest.Name()) //nolint:errcheck // testing only
logDestinationFileName = tmpDest.Name()

l := &Logger{
Info: &dockerlogger.Info{},
Stream: &dummyClient{t},
bufferSizeInBytes: DefaultBufSizeInBytes,
maxReadBytes: defaultMaxReadBytes,
Stdout: &testStdout,
Stderr: &testStderr,
}
err = l.Start(
context.TODO(),
&dummyCleanupTime,
func() error { return nil },
)
require.NoError(t, err)

require.Equal(t, uint64(len(inputForStdout)+len(inputForStderr)), atomic.LoadUint64(&bytesReadFromSrc))
// Exclude the new line characters because they will be removed when sending logs to the log driver.
require.Equal(t,
uint64(len(inputForStdout)+len(inputForStderr)-countOfNewLinesForStdout-countOfNewLinesForStderr),
atomic.LoadUint64(&bytesSentToDst))
}
Loading