Skip to content

Commit

Permalink
Resolve kdar#5, implement nice-to-haves
Browse files Browse the repository at this point in the history
  • Loading branch information
gillepsi committed Feb 17, 2021
1 parent facbc54 commit 94d15b3
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 117 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Cloud Watch Logs hook for Logrus [![godoc reference](https://godoc.org/github.com/kdar/logrus-cloudwatchlogs?status.png)](https://godoc.org/github.com/kdar/logrus-cloudwatchlogs)

This fork of logrus-cloudwatchlogs:

* Resolves https://github.com/kdar/logrus-cloudwatchlogs/issues/5.
* Implements nice-to-have methods `Hook.WithFields()` and `Hook.WithFormatter()`.

Use this hook to send your [Logrus](https://github.com/sirupsen/logrus) logs to Amazon's [Cloud Watch Logs](https://aws.amazon.com/cloudwatch/details/#log-monitoring).

Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
module github.com/kdar/logrus-cloudwatchlogs
module github.com/gillepsi/logrus-cloudwatchlogs

go 1.13

require (
github.com/aws/aws-sdk-go v1.30.7 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/sirupsen/logrus v1.5.0 // indirect
github.com/aws/aws-sdk-go v1.30.7
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0
github.com/kdar/gtest v0.0.0-20171003232747-b20da4453579
github.com/sirupsen/logrus v1.5.0
github.com/smartystreets/assertions v1.2.0
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/kdar/gtest v0.0.0-20171003232747-b20da4453579 h1:L+qix2kCiUHMDJRHgQF7EKZGCJjFnkemozm7smeKg5s=
github.com/kdar/gtest v0.0.0-20171003232747-b20da4453579/go.mod h1:536LC8OTUEct5Z16El02WNFqiBENncHy291DyyWd8Hs=
github.com/kdar/logrus-cloudwatchlogs v0.0.0-20191029193323-96a456d8a358 h1:kRMfwIztgvM7klvW0yXnns58WcUI/CPGpaB7l+Wl+nE=
github.com/kdar/logrus-cloudwatchlogs v0.0.0-20191029193323-96a456d8a358/go.mod h1:ofZiSRdosqysKOk+xr2jFgr+ZYXFXmlrttnm9Xg5Emo=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q=
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
Expand Down
199 changes: 89 additions & 110 deletions hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logrus_cloudwatchlogs

import (
"fmt"
"io"
"os"
"sync"
"time"
Expand All @@ -23,45 +22,14 @@ type Hook struct {
m sync.Mutex
ch chan *cloudwatchlogs.InputLogEvent
err *error
fields logrus.Fields
formatter logrus.Formatter
}

func NewHookWithDuration(groupName, streamName string, sess *session.Session, batchFrequency time.Duration) (*Hook, error) {
return NewBatchingHook(groupName, streamName, sess, batchFrequency)
}

func NewHook(groupName, streamName string, sess *session.Session) (*Hook, error) {
return NewBatchingHook(groupName, streamName, sess, 0)
}

func (h *Hook) getOrCreateCloudWatchLogGroup() (*cloudwatchlogs.DescribeLogStreamsOutput, error) {
resp, err := h.svc.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(h.groupName),
LogStreamNamePrefix: aws.String(h.streamName),
})

if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case cloudwatchlogs.ErrCodeResourceNotFoundException:
_, err = h.svc.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(h.groupName),
})
if err != nil {
return nil, err
}
return h.getOrCreateCloudWatchLogGroup()
default:
return nil, err
}
} else {
return nil, err
}
}
return resp, nil

}

func NewBatchingHook(groupName, streamName string, sess *session.Session, batchFrequency time.Duration) (*Hook, error) {
// NewHook returns a new CloudWatch hook.
//
// CloudWatch log events are sent in batches on an interval, batchFrequency. Pass 0 to sent events immediately.
func NewHook(groupName, streamName string, sess *session.Session, batchFrequency time.Duration) (*Hook, error) {
h := &Hook{
svc: cloudwatchlogs.New(sess),
groupName: groupName,
Expand All @@ -75,7 +43,7 @@ func NewBatchingHook(groupName, streamName string, sess *session.Session, batchF

if batchFrequency > 0 {
h.ch = make(chan *cloudwatchlogs.InputLogEvent, 10000)
go h.putBatches(time.Tick(batchFrequency))
go h.putBatches(time.NewTicker(batchFrequency).C)
}

// grab the next sequence token
Expand All @@ -96,8 +64,34 @@ func NewBatchingHook(groupName, streamName string, sess *session.Session, batchF
return h, nil
}

// WithFields includes the given fields to log entries sent to CloudWatch.
func (h *Hook) WithFields(fields logrus.Fields) *Hook {
h.fields = fields
return h
}

// WithFormatter uses the given formatter to format log entries sent to CloudWatch.
func (h *Hook) WithFormatter(formatter logrus.Formatter) *Hook {
h.formatter = formatter
return h
}

// Fire sends the given entry to CloudWatch.
func (h *Hook) Fire(entry *logrus.Entry) error {
line, err := entry.String()
if h.fields != nil {
entry = entry.WithFields(h.fields)
}

var line string
var err error
if h.formatter != nil {
var b []byte
b, err = h.formatter.Format(entry)
line = string(b)
} else {
line, err = entry.String()
}

if err != nil {
fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)
return err
Expand All @@ -122,6 +116,53 @@ func (h *Hook) Fire(entry *logrus.Entry) error {
}
}

// Write sends the given bytes to CloudWatch.
func (h *Hook) Write(p []byte) (n int, err error) {
event := &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(p)),
Timestamp: aws.Int64(int64(time.Nanosecond) * time.Now().UnixNano() / int64(time.Millisecond)),
}

// Batching hook - send event via channel
if h.ch != nil {
h.ch <- event
if h.err != nil {
lastErr := h.err
h.err = nil
return 0, fmt.Errorf("%v", *lastErr)
}
return len(p), nil
}

// Synchronous hook - send event immediately
h.sendBatch([]*cloudwatchlogs.InputLogEvent{event})
return len(p), nil
}

func (h *Hook) getOrCreateCloudWatchLogGroup() (*cloudwatchlogs.DescribeLogStreamsOutput, error) {
resp, err := h.svc.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(h.groupName),
LogStreamNamePrefix: aws.String(h.streamName),
})

if err == nil {
return resp, nil
}

aerr, ok := err.(awserr.Error)
if ok && aerr.Code() == cloudwatchlogs.ErrCodeResourceNotFoundException {
_, err = h.svc.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(h.groupName),
})
if err != nil {
return nil, err
}
return h.getOrCreateCloudWatchLogGroup()
}

return nil, err
}

func (h *Hook) putBatches(ticker <-chan time.Time) {
var batch []*cloudwatchlogs.InputLogEvent
size := 0
Expand Down Expand Up @@ -151,53 +192,25 @@ func (h *Hook) sendBatch(batch []*cloudwatchlogs.InputLogEvent) {
if len(batch) == 0 {
return
}

params := &cloudwatchlogs.PutLogEventsInput{
LogEvents: batch,
LogGroupName: aws.String(h.groupName),
LogStreamName: aws.String(h.streamName),
SequenceToken: h.nextSequenceToken,
}

resp, err := h.svc.PutLogEvents(params)
if err != nil {
h.err = &err
} else {
if err == nil {
h.nextSequenceToken = resp.NextSequenceToken
}
}

func (h *Hook) Write(p []byte) (n int, err error) {
event := &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(p)),
Timestamp: aws.Int64(int64(time.Nanosecond) * time.Now().UnixNano() / int64(time.Millisecond)),
}

if h.ch != nil {
h.ch <- event
if h.err != nil {
lastErr := h.err
h.err = nil
return 0, fmt.Errorf("%v", *lastErr)
}
return len(p), nil
return
}

h.m.Lock()
defer h.m.Unlock()

params := &cloudwatchlogs.PutLogEventsInput{
LogEvents: []*cloudwatchlogs.InputLogEvent{event},
LogGroupName: aws.String(h.groupName),
LogStreamName: aws.String(h.streamName),
SequenceToken: h.nextSequenceToken,
}
resp, err := h.svc.PutLogEvents(params)
if err != nil {
return 0, err
h.err = &err
if aerr, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok {
h.nextSequenceToken = aerr.ExpectedSequenceToken
h.sendBatch(batch)
}

h.nextSequenceToken = resp.NextSequenceToken

return len(p), nil
}

func (h *Hook) Levels() []logrus.Level {
Expand All @@ -210,37 +223,3 @@ func (h *Hook) Levels() []logrus.Level {
logrus.DebugLevel,
}
}

// WriterHook is a hook that just outputs to an io.Writer.
// This is useful because our formatter outputs the file
// and line where it was called, and the callstack for a hook
// is different from the callstack for just writing to logrus.Logger.Out.
type WriterHook struct {
w io.Writer
}

func NewWriterHook(w io.Writer) *WriterHook {
return &WriterHook{w: w}
}

func (h *WriterHook) Fire(entry *logrus.Entry) error {
line, err := entry.String()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err)
return err
}

_, err = h.w.Write([]byte(line))
return err
}

func (h *WriterHook) Levels() []logrus.Level {
return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
logrus.DebugLevel,
}
}
7 changes: 4 additions & 3 deletions hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/sirupsen/logrus"
"github.com/smartystreets/assertions"
"github.com/smartystreets/assertions/should"
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestHook(t *testing.T) {
cred := credentials.NewStaticCredentials(key, secret, "")
cfg := aws.NewConfig().WithRegion("us-east-1").WithCredentials(cred)

hook, err := NewHook(group, stream, cfg)
hook, err := NewHook(group, stream, session.New(cfg), 0)
a.So(err, should.BeNil)
a.So(hook, should.NotBeNil)

Expand Down Expand Up @@ -78,7 +79,7 @@ func TestConcurrentHook(t *testing.T) {
cred := credentials.NewStaticCredentials(key, secret, "")
cfg := aws.NewConfig().WithRegion("us-east-1").WithCredentials(cred)

hook, err := NewHook(group, stream, cfg)
hook, err := NewHook(group, stream, session.New(cfg), 0)
a.So(err, should.BeNil)
a.So(hook, should.NotBeNil)

Expand Down Expand Up @@ -123,7 +124,7 @@ func TestBatching(t *testing.T) {
cred := credentials.NewStaticCredentials(key, secret, "")
cfg := aws.NewConfig().WithRegion("us-east-1").WithCredentials(cred)

hook, err := NewBatchingHook(group, stream, cfg, 100*time.Millisecond)
hook, err := NewHook(group, stream, session.New(cfg), 100*time.Millisecond)
a.So(err, should.BeNil)
a.So(hook, should.NotBeNil)

Expand Down

0 comments on commit 94d15b3

Please sign in to comment.