Skip to content

Commit

Permalink
Add WithContext methods to the Fluent struct including handling of co…
Browse files Browse the repository at this point in the history
…ntext deadlines

Signed-off-by: Arno Geurts <[email protected]>
  • Loading branch information
arnogeurts-sqills committed Sep 14, 2020
1 parent 89c0329 commit 5e1b54c
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 36 deletions.
143 changes: 107 additions & 36 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fluent

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -78,17 +79,24 @@ func NewErrUnknownNetwork(network string) error {
}

type msgToSend struct {
ctx context.Context
data []byte
ack string
}

type bufferInput struct {
msg *msgToSend
result chan<- error
}

type Fluent struct {
Config

dialer dialer
stopRunning chan bool
pending chan *msgToSend
pending chan bufferInput
wg sync.WaitGroup
resultPool sync.Pool

muconn sync.Mutex
conn net.Conn
Expand All @@ -108,6 +116,10 @@ type dialer interface {
Dial(string, string) (net.Conn, error)
}

type dialerWithContext interface {
DialContext(context.Context, string, string) (net.Conn, error)
}

func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
if config.FluentNetwork == "" {
config.FluentNetwork = defaultNetwork
Expand Down Expand Up @@ -140,22 +152,24 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
config.Async = config.Async || config.AsyncConnect
}

if config.Async {
f = &Fluent{
Config: config,
dialer: d,
pending: make(chan *msgToSend, config.BufferLimit),
}
f.wg.Add(1)
go f.run()
} else {
f = &Fluent{
Config: config,
dialer: d,
f = &Fluent{
Config: config,
dialer: d,
pending: make(chan bufferInput, config.BufferLimit),
resultPool: sync.Pool{
New: func() interface{} {
return make(chan error, 1)
},
},
}
if !config.Async {
if err = f.connect(context.Background()); err != nil {
return
}
err = f.connect()
}

f.wg.Add(1)
go f.run()
return
}

Expand Down Expand Up @@ -185,17 +199,25 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
// f.Post("tag_name", structData)
//
func (f *Fluent) Post(tag string, message interface{}) error {
return f.PostWithContext(context.Background(), tag, message)
}

func (f *Fluent) PostWithContext(ctx context.Context, tag string, message interface{}) error {
timeNow := time.Now()
return f.PostWithTime(tag, timeNow, message)
return f.PostWithTimeAndContext(ctx, tag, timeNow, message)
}

func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
return f.PostWithTimeAndContext(context.Background(), tag, tm, message)
}

func (f *Fluent) PostWithTimeAndContext(ctx context.Context, tag string, tm time.Time, message interface{}) error {
if len(f.TagPrefix) > 0 {
tag = f.TagPrefix + "." + tag
}

if m, ok := message.(msgp.Marshaler); ok {
return f.EncodeAndPostData(tag, tm, m)
return f.EncodeAndPostDataWithContext(ctx, tag, tm, m)
}

msg := reflect.ValueOf(message)
Expand All @@ -215,7 +237,7 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
}
kv[name] = msg.FieldByIndex(field.Index).Interface()
}
return f.EncodeAndPostData(tag, tm, kv)
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv)
}

if msgtype.Kind() != reflect.Map {
Expand All @@ -229,13 +251,17 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
kv[k.String()] = msg.MapIndex(k).Interface()
}

return f.EncodeAndPostData(tag, tm, kv)
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv)
}

func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
return f.EncodeAndPostDataWithContext(context.Background(), tag, tm, message)
}

func (f *Fluent) EncodeAndPostDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) error {
var msg *msgToSend
var err error
if msg, err = f.EncodeData(tag, tm, message); err != nil {
if msg, err = f.EncodeDataWithContext(ctx, tag, tm, message); err != nil {
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
}
return f.postRawData(msg)
Expand All @@ -251,7 +277,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
return f.appendBuffer(msg)
}
// Synchronous write
return f.write(msg)
return f.appendBufferBlocking(msg)
}

// For sending forward protocol adopted JSON
Expand Down Expand Up @@ -296,8 +322,12 @@ func getUniqueID(timeUnix int64) (string, error) {
}

func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
return f.EncodeDataWithContext(context.Background(), tag, tm, message)
}

func (f *Fluent) EncodeDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
option := make(map[string]string)
msg = &msgToSend{}
msg = &msgToSend{ctx: ctx}
timeUnix := tm.Unix()
if f.Config.RequestAck {
var err error
Expand Down Expand Up @@ -338,13 +368,37 @@ func (f *Fluent) Close() (err error) {
// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(msg *msgToSend) error {
select {
case f.pending <- msg:
case f.pending <- bufferInput{msg: msg}:
default:
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
}
return nil
}

// appendBufferWithFeedback appends data to buffer and waits for the result
func (f *Fluent) appendBufferBlocking(msg *msgToSend) error {
result := f.resultPool.Get().(chan error)
// write the data to the buffer and block if the buffer is full
select {
case f.pending <- bufferInput{msg: msg, result: result}:
// don't do anything
case <-msg.ctx.Done():
// because the result channel is not used, it can safely be returned to the sync pool.
f.resultPool.Put(result)
return msg.ctx.Err()
}

select {
case err := <-result:
f.resultPool.Put(result)
return err
case <-msg.ctx.Done():
// the context deadline has exceeded, but there is no result yet. So the result channel cannot be returned to
// the pool, as it might be written later.
return msg.ctx.Err()
}
}

// close closes the connection.
func (f *Fluent) close(c net.Conn) {
f.muconn.Lock()
Expand All @@ -356,19 +410,23 @@ func (f *Fluent) close(c net.Conn) {
}

// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {
func (f *Fluent) connect(ctx context.Context) (err error) {
var address string
switch f.Config.FluentNetwork {
case "tcp":
f.conn, err = f.dialer.Dial(
f.Config.FluentNetwork,
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
address = f.Config.FluentHost + ":" + strconv.Itoa(f.Config.FluentPort)
case "unix":
f.conn, err = f.dialer.Dial(
f.Config.FluentNetwork,
f.Config.FluentSocketPath)
address = f.Config.FluentSocketPath
default:
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
return
}
if d, ok := f.dialer.(dialerWithContext); ok {
f.conn, err = d.DialContext(ctx, f.Config.FluentNetwork, address)
} else {
f.conn, err = f.dialer.Dial(f.Config.FluentNetwork, address)
}

return err
}

Expand All @@ -386,7 +444,11 @@ func (f *Fluent) run() {
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
continue
}
err := f.write(entry)
err := f.write(entry.msg)
if entry.result != nil {
entry.result <- err
continue
}
if err != nil {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
}
Expand All @@ -413,7 +475,7 @@ func (f *Fluent) write(msg *msgToSend) error {
if c == nil {
f.muconn.Lock()
if f.conn == nil {
err := f.connect()
err := f.connect(msg.ctx)
if err != nil {
f.muconn.Unlock()

Expand All @@ -425,7 +487,13 @@ func (f *Fluent) write(msg *msgToSend) error {
if waitTime > f.Config.MaxRetryWait {
waitTime = f.Config.MaxRetryWait
}
time.Sleep(time.Duration(waitTime) * time.Millisecond)
waitDuration := time.Duration(waitTime) * time.Millisecond
if deadline, hasDeadLine := msg.ctx.Deadline(); hasDeadLine && deadline.Before(time.Now().Add(waitDuration)) {
// the context deadline is within the wait time, so after the sleep the deadline will have been
// exceeded. It is a waste of time to wait on that.
return context.DeadlineExceeded
}
time.Sleep(waitDuration)
continue
}
}
Expand All @@ -435,11 +503,14 @@ func (f *Fluent) write(msg *msgToSend) error {

// We're connected, write msg
t := f.Config.WriteTimeout
var deadline time.Time
if time.Duration(0) < t {
c.SetWriteDeadline(time.Now().Add(t))
} else {
c.SetWriteDeadline(time.Time{})
deadline = time.Now().Add(t)
}
if ctxDeadline, hasDeadline := msg.ctx.Deadline(); hasDeadline && (deadline.IsZero() || ctxDeadline.Before(deadline)) {
deadline = ctxDeadline
}
c.SetWriteDeadline(deadline)
_, err := c.Write(msg.data)
if err != nil {
f.close(c)
Expand Down
54 changes: 54 additions & 0 deletions fluent/fluent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fluent

import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
Expand Down Expand Up @@ -449,6 +450,59 @@ func TestPostWithTime(t *testing.T) {
}
}

func TestPostWithTimeAndContext(t *testing.T) {
testcases := map[string]Config{
"with Async": {
Async: true,
MarshalAsJSON: true,
TagPrefix: "acme",
},
"without Async": {
Async: false,
MarshalAsJSON: true,
TagPrefix: "acme",
},
}

for tcname := range testcases {
t.Run(tcname, func(t *testing.T) {
tc := testcases[tcname]
t.Parallel()

d := newTestDialer()
var f *Fluent
defer func() {
if f != nil {
f.Close()
}
}()
deadline := time.Now().Add(1 * time.Second)

go func() {
var err error
if f, err = newWithDialer(tc, d); err != nil {
t.Errorf("Unexpected error: %v", err)
}
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
defer cancelFunc()

_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
}()

conn := d.waitForNextDialing(true)
assertReceived(t,
conn.waitForNextWrite(true, ""),
"[\"acme.tag_name\",1482493046,{\"foo\":\"bar\"},{}]")

assertReceived(t,
conn.waitForNextWrite(true, ""),
"[\"acme.tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]")
assert.Equal(t, conn.writeDeadline, deadline)
})
}
}

func TestReconnectAndResendAfterTransientFailure(t *testing.T) {
testcases := map[string]Config{
"with Async": {
Expand Down

0 comments on commit 5e1b54c

Please sign in to comment.