Skip to content

Commit

Permalink
Increase reading buffer size on large messages (#74) (#83)
Browse files Browse the repository at this point in the history
* Increase reading buffer size on large messages  (#74)

When larger than MaxBufferSize messages are being sent, the whole message processing
is failing badly. This can easily happen when socket plugin is used in Ceilometer metrics
or events processing.

Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=2016460
(cherry picked from commit f13e241)

* Ci opstools fix (#85) (#87) (#89)

* Fix centos-opstools repo in CI

* Avoid gpgcheck for centos repo

Co-authored-by: Leif Madsen <[email protected]>
Co-authored-by: Matthias Runge <[email protected]>
(cherry picked from commit 04fc691)
(cherry picked from commit 33d4121)

Co-authored-by: Martin Mágr <[email protected]>

Co-authored-by: Leif Madsen <[email protected]>
  • Loading branch information
paramite and leifmadsen authored Feb 4, 2022
1 parent 33d4121 commit b956a85
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 6 deletions.
25 changes: 19 additions & 6 deletions plugins/transport/socket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/infrawatch/sg-core/pkg/transport"
)

const maxBufferSize = 16384
const (
maxBufferSize = 65535
)

var (
msgCount int64
Expand Down Expand Up @@ -71,22 +73,27 @@ type Socket struct {
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {

var laddr net.UnixAddr

laddr.Name = s.conf.Path
laddr.Net = "unixgram"

os.Remove(s.conf.Path)

pc, err := net.ListenUnixgram("unixgram", &laddr)
if err != nil {
s.logger.Errorf(err, "failed to listen on unix socket %s", laddr.Name)
s.logger.Errorf(err, "failed to bind unix socket %s", laddr.Name)
return
}
// create socket file if it does not exist
skt, err := pc.File()
if err != nil {
s.logger.Errorf(err, "failed to retrieve file handle for %s", laddr.Name)
return
}
skt.Close()

s.logger.Infof("socket listening on %s", laddr.Name)
go func(buffSize int64) {
go func(maxBuffSize int64) {
msgBuffer := make([]byte, maxBuffSize)
for {
msgBuffer := make([]byte, buffSize)
n, err := pc.Read(msgBuffer)
if err != nil || n < 1 {
if err != nil {
Expand All @@ -96,6 +103,11 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
return
}

// whole buffer was used, so we are potentially handling larger message
if n == len(msgBuffer) {
s.logger.Warnf("full read buffer used")
}

if s.conf.DumpMessages.Enabled {
_, err := s.dumpBuf.Write(msgBuffer[:n])
if err != nil {
Expand All @@ -107,6 +119,7 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
}
s.dumpBuf.Flush()
}

w(msgBuffer[:n])
msgCount++
}
Expand Down
83 changes: 83 additions & 0 deletions plugins/transport/socket/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"context"
"io/ioutil"
"net"
"os"
"path"
"sync"
"testing"
"time"

"github.com/infrawatch/apputils/logging"
"github.com/stretchr/testify/require"
"gopkg.in/go-playground/assert.v1"
)

const regularBuffSize = 16384

func TestSocketTransport(t *testing.T) {
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

logpath := path.Join(tmpdir, "test.log")
logger, err := logging.NewLogger(logging.DEBUG, logpath)
require.NoError(t, err)

sktpath := path.Join(tmpdir, "socket")
skt, err := os.OpenFile(sktpath, os.O_RDWR|os.O_CREATE, os.ModeSocket|os.ModePerm)
require.NoError(t, err)
defer skt.Close()

trans := Socket{
conf: configT{
Path: sktpath,
},
logger: &logWrapper{
l: logger,
},
}

t.Run("test large message transport", func(t *testing.T) {
msg := make([]byte, regularBuffSize)
addition := "wubba lubba dub dub"
for i := 0; i < regularBuffSize; i++ {
msg[i] = byte('X')
}
msg[regularBuffSize-1] = byte('$')
msg = append(msg, []byte(addition)...)

// verify transport
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
go trans.Run(ctx, func(mess []byte) {
wg.Add(1)
strmsg := string(mess)
assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message
assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct
wg.Done()
}, make(chan bool))

// wait for socket file to be created
for {
stat, err := os.Stat(sktpath)
require.NoError(t, err)
if stat.Mode()&os.ModeType == os.ModeSocket {
break
}
time.Sleep(250 * time.Millisecond)
}

// write to socket
wskt, err := net.DialUnix("unixgram", nil, &net.UnixAddr{Name: sktpath, Net: "unixgram"})
require.NoError(t, err)
_, err = wskt.Write(msg)
require.NoError(t, err)

cancel()
wg.Wait()
wskt.Close()
})
}

0 comments on commit b956a85

Please sign in to comment.