Skip to content

Commit

Permalink
Add udp transport to socket plugin (#101)
Browse files Browse the repository at this point in the history
* Add udp transport to socket plugin

* Make unix socket default

* Fix linting issues

* Incorporate PR comment suggestions

* Fix unit test config after previous changes
  • Loading branch information
vyzigold authored Oct 12, 2022
1 parent 8216de3 commit 35b0d26
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 9 deletions.
68 changes: 60 additions & 8 deletions plugins/transport/socket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net"
"os"
"strings"
"time"

"github.com/infrawatch/apputils/logging"
Expand All @@ -17,6 +18,8 @@ import (

const (
maxBufferSize = 65535
udp = "udp"
unix = "unix"
)

var (
Expand All @@ -31,7 +34,9 @@ func rate() int64 {
}

type configT struct {
Path string `validate:"required"`
Path string `validate:"required_without=Socketaddr"`
Type string
Socketaddr string `validate:"required_without=Path"`
DumpMessages struct {
Enabled bool
Path string
Expand Down Expand Up @@ -69,9 +74,7 @@ type Socket struct {
dumpFile *os.File
}

// Run implements type Transport
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {

func (s *Socket) initUnixSocket() *net.UnixConn {
var laddr net.UnixAddr
laddr.Name = s.conf.Path
laddr.Net = "unixgram"
Expand All @@ -80,17 +83,49 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
pc, err := net.ListenUnixgram("unixgram", &laddr)
if err != nil {
s.logger.Errorf(err, "failed to bind unix socket %s", laddr.Name)
return
return nil
}
// create socket file if it does not exist
skt, err := pc.File()
if err != nil {
s.logger.Errorf(err, "failed to retrieve file handle for %s", laddr.Name)
return
return nil
}
skt.Close()

s.logger.Infof("socket listening on %s", laddr.Name)

return pc
}

func (s *Socket) initUDPSocket() *net.UDPConn {
addr, err := net.ResolveUDPAddr(udp, s.conf.Socketaddr)
if err != nil {
s.logger.Errorf(err, "failed to resolve udp address: %s", s.conf.Socketaddr)
return nil
}
pc, err := net.ListenUDP(udp, addr)
if err != nil {
s.logger.Errorf(err, "failed to bind udp socket to addr: %s", s.conf.Socketaddr)
return nil
}

s.logger.Infof("socket listening on %s", s.conf.Socketaddr)

return pc
}

// Run implements type Transport
func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
var pc net.Conn
if s.conf.Type == udp {
pc = s.initUDPSocket()
} else {
pc = s.initUnixSocket()
}
if pc == nil {
s.logger.Errorf(nil, "Failed to initialize socket transport plugin")
}
go func(maxBuffSize int64) {
msgBuffer := make([]byte, maxBuffSize)
for {
Expand Down Expand Up @@ -136,14 +171,16 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
}
Done:
pc.Close()
os.Remove(s.conf.Path)
if s.conf.Type == unix {
os.Remove(s.conf.Path)
}
s.dumpFile.Close()
s.logger.Infof("exited")
}

// Listen ...
func (s *Socket) Listen(e data.Event) {
fmt.Printf("Received event: %v\n", e)
fmt.Printf("received event: %v\n", e)
}

// Config load configurations
Expand All @@ -155,6 +192,7 @@ func (s *Socket) Config(c []byte) error {
}{
Path: "/dev/stdout",
},
Type: unix,
}

err := config.ParseConfig(bytes.NewReader(c), &s.conf)
Expand All @@ -171,6 +209,20 @@ func (s *Socket) Config(c []byte) error {
s.dumpBuf = bufio.NewWriter(s.dumpFile)
}

s.conf.Type = strings.ToLower(s.conf.Type)
if s.conf.Type != unix && s.conf.Type != udp {
return fmt.Errorf("unable to determine socket type from configuration file. Should be either \"unix\" or \"udp\", received: %s",
s.conf.Type)
}

if s.conf.Type == unix && s.conf.Path == "" {
return fmt.Errorf("the path configuration option is required when using unix socket type")
}

if s.conf.Type == udp && s.conf.Socketaddr == "" {
return fmt.Errorf("the socketaddr configuration option is required when using udp socket type")
}

return nil
}

Expand Down
55 changes: 54 additions & 1 deletion plugins/transport/socket/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const regularBuffSize = 16384

func TestSocketTransport(t *testing.T) {
func TestUnixSocketTransport(t *testing.T) {
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
Expand Down Expand Up @@ -81,3 +81,56 @@ func TestSocketTransport(t *testing.T) {
wskt.Close()
})
}

func TestUdpSocketTransport(t *testing.T) {
tmpdir, err := ioutil.TempDir(".", "socket_test_tmp")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

logpath := path.Join(tmpdir, "test.log")
logger, err := logging.NewLogger(logging.DEBUG, logpath)
require.NoError(t, err)

trans := Socket{
conf: configT{
Socketaddr: "127.0.0.1:8642",
Type: "udp",
},
logger: &logWrapper{
l: logger,
},
}

t.Run("test large message transport", func(t *testing.T) {
msg := make([]byte, regularBuffSize)
addition := "wubba lubba dub dub"
for i := 0; i < regularBuffSize; i++ {
msg[i] = byte('X')
}
msg[regularBuffSize-1] = byte('$')
msg = append(msg, []byte(addition)...)

// verify transport
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
go trans.Run(ctx, func(mess []byte) {
wg.Add(1)
strmsg := string(mess)
assert.Equal(t, regularBuffSize+len(addition), len(strmsg)) // we received whole message
assert.Equal(t, addition, strmsg[len(strmsg)-len(addition):]) // and the out-of-band part is correct
wg.Done()
}, make(chan bool))

// write to socket
addr, err := net.ResolveUDPAddr("udp", "127.0.0.1:8642")
require.NoError(t, err)
wskt, err := net.DialUDP("udp", nil, addr)
require.NoError(t, err)
_, err = wskt.Write(msg)
require.NoError(t, err)

cancel()
wg.Wait()
wskt.Close()
})
}

0 comments on commit 35b0d26

Please sign in to comment.