From 94d15b3eda6b7768810684a4db4eabb1b77884f6 Mon Sep 17 00:00:00 2001 From: Connor Gillespie Date: Thu, 18 Feb 2021 09:53:46 +1000 Subject: [PATCH] Resolve #5, implement nice-to-haves --- README.md | 4 ++ go.mod | 10 +-- go.sum | 4 ++ hook.go | 199 +++++++++++++++++++++++---------------------------- hook_test.go | 7 +- 5 files changed, 107 insertions(+), 117 deletions(-) diff --git a/README.md b/README.md index ca15d28..6d556fa 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # Cloud Watch Logs hook for Logrus [![godoc reference](https://godoc.org/github.com/kdar/logrus-cloudwatchlogs?status.png)](https://godoc.org/github.com/kdar/logrus-cloudwatchlogs) +This fork of logrus-cloudwatchlogs: + +* Resolves https://github.com/kdar/logrus-cloudwatchlogs/issues/5. +* Implements nice-to-have methods `Hook.WithFields()` and `Hook.WithFormatter()`. Use this hook to send your [Logrus](https://github.com/sirupsen/logrus) logs to Amazon's [Cloud Watch Logs](https://aws.amazon.com/cloudwatch/details/#log-monitoring). diff --git a/go.mod b/go.mod index 6648bb8..50de0f3 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,11 @@ -module github.com/kdar/logrus-cloudwatchlogs +module github.com/gillepsi/logrus-cloudwatchlogs go 1.13 require ( - github.com/aws/aws-sdk-go v1.30.7 // indirect - github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect - github.com/sirupsen/logrus v1.5.0 // indirect + github.com/aws/aws-sdk-go v1.30.7 + github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 + github.com/kdar/gtest v0.0.0-20171003232747-b20da4453579 + github.com/sirupsen/logrus v1.5.0 + github.com/smartystreets/assertions v1.2.0 ) diff --git a/go.sum b/go.sum index b4a5622..a42aaa8 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2 github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= +github.com/kdar/gtest v0.0.0-20171003232747-b20da4453579 h1:L+qix2kCiUHMDJRHgQF7EKZGCJjFnkemozm7smeKg5s= +github.com/kdar/gtest v0.0.0-20171003232747-b20da4453579/go.mod h1:536LC8OTUEct5Z16El02WNFqiBENncHy291DyyWd8Hs= github.com/kdar/logrus-cloudwatchlogs v0.0.0-20191029193323-96a456d8a358 h1:kRMfwIztgvM7klvW0yXnns58WcUI/CPGpaB7l+Wl+nE= github.com/kdar/logrus-cloudwatchlogs v0.0.0-20191029193323-96a456d8a358/go.mod h1:ofZiSRdosqysKOk+xr2jFgr+ZYXFXmlrttnm9Xg5Emo= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -14,6 +16,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= diff --git a/hook.go b/hook.go index a2857d9..fd22c64 100644 --- a/hook.go +++ b/hook.go @@ -2,7 +2,6 @@ package logrus_cloudwatchlogs import ( "fmt" - "io" "os" "sync" "time" @@ -23,45 +22,14 @@ type Hook struct { m sync.Mutex ch chan *cloudwatchlogs.InputLogEvent err *error + fields logrus.Fields + formatter logrus.Formatter } -func NewHookWithDuration(groupName, streamName string, sess *session.Session, batchFrequency time.Duration) (*Hook, error) { - return NewBatchingHook(groupName, streamName, sess, batchFrequency) -} - -func NewHook(groupName, streamName string, sess *session.Session) (*Hook, error) { - return NewBatchingHook(groupName, streamName, sess, 0) -} - -func (h *Hook) getOrCreateCloudWatchLogGroup() (*cloudwatchlogs.DescribeLogStreamsOutput, error) { - resp, err := h.svc.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ - LogGroupName: aws.String(h.groupName), - LogStreamNamePrefix: aws.String(h.streamName), - }) - - if err != nil { - if aerr, ok := err.(awserr.Error); ok { - switch aerr.Code() { - case cloudwatchlogs.ErrCodeResourceNotFoundException: - _, err = h.svc.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ - LogGroupName: aws.String(h.groupName), - }) - if err != nil { - return nil, err - } - return h.getOrCreateCloudWatchLogGroup() - default: - return nil, err - } - } else { - return nil, err - } - } - return resp, nil - -} - -func NewBatchingHook(groupName, streamName string, sess *session.Session, batchFrequency time.Duration) (*Hook, error) { +// NewHook returns a new CloudWatch hook. +// +// CloudWatch log events are sent in batches on an interval, batchFrequency. Pass 0 to sent events immediately. +func NewHook(groupName, streamName string, sess *session.Session, batchFrequency time.Duration) (*Hook, error) { h := &Hook{ svc: cloudwatchlogs.New(sess), groupName: groupName, @@ -75,7 +43,7 @@ func NewBatchingHook(groupName, streamName string, sess *session.Session, batchF if batchFrequency > 0 { h.ch = make(chan *cloudwatchlogs.InputLogEvent, 10000) - go h.putBatches(time.Tick(batchFrequency)) + go h.putBatches(time.NewTicker(batchFrequency).C) } // grab the next sequence token @@ -96,8 +64,34 @@ func NewBatchingHook(groupName, streamName string, sess *session.Session, batchF return h, nil } +// WithFields includes the given fields to log entries sent to CloudWatch. +func (h *Hook) WithFields(fields logrus.Fields) *Hook { + h.fields = fields + return h +} + +// WithFormatter uses the given formatter to format log entries sent to CloudWatch. +func (h *Hook) WithFormatter(formatter logrus.Formatter) *Hook { + h.formatter = formatter + return h +} + +// Fire sends the given entry to CloudWatch. func (h *Hook) Fire(entry *logrus.Entry) error { - line, err := entry.String() + if h.fields != nil { + entry = entry.WithFields(h.fields) + } + + var line string + var err error + if h.formatter != nil { + var b []byte + b, err = h.formatter.Format(entry) + line = string(b) + } else { + line, err = entry.String() + } + if err != nil { fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err) return err @@ -122,6 +116,53 @@ func (h *Hook) Fire(entry *logrus.Entry) error { } } +// Write sends the given bytes to CloudWatch. +func (h *Hook) Write(p []byte) (n int, err error) { + event := &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(p)), + Timestamp: aws.Int64(int64(time.Nanosecond) * time.Now().UnixNano() / int64(time.Millisecond)), + } + + // Batching hook - send event via channel + if h.ch != nil { + h.ch <- event + if h.err != nil { + lastErr := h.err + h.err = nil + return 0, fmt.Errorf("%v", *lastErr) + } + return len(p), nil + } + + // Synchronous hook - send event immediately + h.sendBatch([]*cloudwatchlogs.InputLogEvent{event}) + return len(p), nil +} + +func (h *Hook) getOrCreateCloudWatchLogGroup() (*cloudwatchlogs.DescribeLogStreamsOutput, error) { + resp, err := h.svc.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + LogGroupName: aws.String(h.groupName), + LogStreamNamePrefix: aws.String(h.streamName), + }) + + if err == nil { + return resp, nil + } + + aerr, ok := err.(awserr.Error) + if ok && aerr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException { + _, err = h.svc.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: aws.String(h.groupName), + }) + if err != nil { + return nil, err + } + return h.getOrCreateCloudWatchLogGroup() + } + + return nil, err +} + func (h *Hook) putBatches(ticker <-chan time.Time) { var batch []*cloudwatchlogs.InputLogEvent size := 0 @@ -151,53 +192,25 @@ func (h *Hook) sendBatch(batch []*cloudwatchlogs.InputLogEvent) { if len(batch) == 0 { return } + params := &cloudwatchlogs.PutLogEventsInput{ LogEvents: batch, LogGroupName: aws.String(h.groupName), LogStreamName: aws.String(h.streamName), SequenceToken: h.nextSequenceToken, } + resp, err := h.svc.PutLogEvents(params) - if err != nil { - h.err = &err - } else { + if err == nil { h.nextSequenceToken = resp.NextSequenceToken - } -} - -func (h *Hook) Write(p []byte) (n int, err error) { - event := &cloudwatchlogs.InputLogEvent{ - Message: aws.String(string(p)), - Timestamp: aws.Int64(int64(time.Nanosecond) * time.Now().UnixNano() / int64(time.Millisecond)), - } - - if h.ch != nil { - h.ch <- event - if h.err != nil { - lastErr := h.err - h.err = nil - return 0, fmt.Errorf("%v", *lastErr) - } - return len(p), nil + return } - h.m.Lock() - defer h.m.Unlock() - - params := &cloudwatchlogs.PutLogEventsInput{ - LogEvents: []*cloudwatchlogs.InputLogEvent{event}, - LogGroupName: aws.String(h.groupName), - LogStreamName: aws.String(h.streamName), - SequenceToken: h.nextSequenceToken, - } - resp, err := h.svc.PutLogEvents(params) - if err != nil { - return 0, err + h.err = &err + if aerr, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok { + h.nextSequenceToken = aerr.ExpectedSequenceToken + h.sendBatch(batch) } - - h.nextSequenceToken = resp.NextSequenceToken - - return len(p), nil } func (h *Hook) Levels() []logrus.Level { @@ -210,37 +223,3 @@ func (h *Hook) Levels() []logrus.Level { logrus.DebugLevel, } } - -// WriterHook is a hook that just outputs to an io.Writer. -// This is useful because our formatter outputs the file -// and line where it was called, and the callstack for a hook -// is different from the callstack for just writing to logrus.Logger.Out. -type WriterHook struct { - w io.Writer -} - -func NewWriterHook(w io.Writer) *WriterHook { - return &WriterHook{w: w} -} - -func (h *WriterHook) Fire(entry *logrus.Entry) error { - line, err := entry.String() - if err != nil { - fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err) - return err - } - - _, err = h.w.Write([]byte(line)) - return err -} - -func (h *WriterHook) Levels() []logrus.Level { - return []logrus.Level{ - logrus.PanicLevel, - logrus.FatalLevel, - logrus.ErrorLevel, - logrus.WarnLevel, - logrus.InfoLevel, - logrus.DebugLevel, - } -} diff --git a/hook_test.go b/hook_test.go index 4079680..f2e0f98 100644 --- a/hook_test.go +++ b/hook_test.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" "github.com/sirupsen/logrus" "github.com/smartystreets/assertions" "github.com/smartystreets/assertions/should" @@ -39,7 +40,7 @@ func TestHook(t *testing.T) { cred := credentials.NewStaticCredentials(key, secret, "") cfg := aws.NewConfig().WithRegion("us-east-1").WithCredentials(cred) - hook, err := NewHook(group, stream, cfg) + hook, err := NewHook(group, stream, session.New(cfg), 0) a.So(err, should.BeNil) a.So(hook, should.NotBeNil) @@ -78,7 +79,7 @@ func TestConcurrentHook(t *testing.T) { cred := credentials.NewStaticCredentials(key, secret, "") cfg := aws.NewConfig().WithRegion("us-east-1").WithCredentials(cred) - hook, err := NewHook(group, stream, cfg) + hook, err := NewHook(group, stream, session.New(cfg), 0) a.So(err, should.BeNil) a.So(hook, should.NotBeNil) @@ -123,7 +124,7 @@ func TestBatching(t *testing.T) { cred := credentials.NewStaticCredentials(key, secret, "") cfg := aws.NewConfig().WithRegion("us-east-1").WithCredentials(cred) - hook, err := NewBatchingHook(group, stream, cfg, 100*time.Millisecond) + hook, err := NewHook(group, stream, session.New(cfg), 100*time.Millisecond) a.So(err, should.BeNil) a.So(hook, should.NotBeNil)