Skip to content

Commit

Permalink
create uds server and create base packet server class
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelli321 committed Dec 1, 2024
1 parent 4b45285 commit d86e716
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 60 deletions.
64 changes: 64 additions & 0 deletions receiver/statsdreceiver/internal/transport/packet_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"errors"
"net"

"go.opentelemetry.io/collector/consumer"
)

type packetServer struct {
packetConn net.PacketConn
transport Transport
}

// ListenAndServe starts the server ready to receive metrics.
func (u *packetServer) ListenAndServe(
nextConsumer consumer.Metrics,
reporter Reporter,
transferChan chan<- Metric,
) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
u.transport,
u.packetConn.LocalAddr(),
err)
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
continue
}
}
return err
}
}
}

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *packetServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}
7 changes: 5 additions & 2 deletions receiver/statsdreceiver/internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
TCP Transport = "tcp"
TCP4 Transport = "tcp4"
TCP6 Transport = "tcp6"
UDS Transport = "unixgram"
)

// NewTransport creates a Transport based on the transport string or returns an empty Transport.
Expand All @@ -31,14 +32,16 @@ func NewTransport(ts string) Transport {
return trans
case TCP, TCP4, TCP6:
return trans
case UDS:
return trans
}
return Transport("")
}

// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise.
func (trans Transport) String() string {
switch trans {
case UDP, UDP4, UDP6, TCP, TCP4, TCP6:
case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS:
return string(trans)
}
return ""
Expand All @@ -47,7 +50,7 @@ func (trans Transport) String() string {
// IsPacketTransport returns true if the transport is packet based.
func (trans Transport) IsPacketTransport() bool {
switch trans {
case UDP, UDP4, UDP6:
case UDP, UDP4, UDP6, UDS:
return true
}
return false
Expand Down
60 changes: 5 additions & 55 deletions receiver/statsdreceiver/internal/transport/udp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"errors"
"fmt"
"net"

"go.opentelemetry.io/collector/consumer"
)

type udpServer struct {
packetConn net.PacketConn
transport Transport
packetServer
}

// Ensure that Server is implemented on UDP Server.
Expand All @@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) {
}

return &udpServer{
packetConn: conn,
transport: transport,
packetServer: packetServer{
packetConn: conn,
transport: transport,
},
}, nil
}

// ListenAndServe starts the server ready to receive metrics.
func (u *udpServer) ListenAndServe(
nextConsumer consumer.Metrics,
reporter Reporter,
transferChan chan<- Metric,
) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
u.transport,
u.packetConn.LocalAddr(),
err)
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
continue
}
}
return err
}
}
}

// Close closes the server.
func (u *udpServer) Close() error {
return u.packetConn.Close()
}

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *udpServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}
46 changes: 46 additions & 0 deletions receiver/statsdreceiver/internal/transport/uds_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"fmt"
"net"
"os"
)

type udsServer struct {
packetServer
}

// Ensure that Server is implemented on UDS Server.
var _ (Server) = (*udsServer)(nil)

// NewUDSServer creates a transport.Server using Unixgram as its transport.
func NewUDSServer(transport Transport, socketPath string) (Server, error) {
if !transport.IsPacketTransport() {
return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
}

if _, err := os.Stat(socketPath); err == nil {
os.Remove(socketPath)
}

conn, err := net.ListenPacket(transport.String(), socketPath)
if err != nil {
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
}

return &udsServer{
packetServer: packetServer{
packetConn: conn,
transport: transport,
},
}, nil
}

// Close closes the server.
func (u *udsServer) Close() error {
os.Remove(u.packetConn.LocalAddr().String())
return u.packetConn.Close()
}
12 changes: 9 additions & 3 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,21 @@ func newReceiver(
config Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))

if config.NetAddr.Endpoint == "" {
config.NetAddr.Endpoint = "localhost:8125"
if trans == transport.UDS {
config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock"
} else {
config.NetAddr.Endpoint = "localhost:8125"
}
}

rep, err := newReporter(set)
if err != nil {
return nil, err
}

trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
LongLivedCtx: true,
ReceiverID: set.ID,
Expand All @@ -80,13 +85,14 @@ func newReceiver(
}

func buildTransportServer(config Config) (transport.Server, error) {
// TODO: Add unix socket transport implementations
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
switch trans {
case transport.UDP, transport.UDP4, transport.UDP6:
return transport.NewUDPServer(trans, config.NetAddr.Endpoint)
case transport.TCP, transport.TCP4, transport.TCP6:
return transport.NewTCPServer(trans, config.NetAddr.Endpoint)
case transport.UDS:
return transport.NewUDSServer(trans, config.NetAddr.Endpoint)
}

return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))
Expand Down

0 comments on commit d86e716

Please sign in to comment.