Skip to content

Commit

Permalink
Add statsdreceiver Unixgram Support (#36608)
Browse files Browse the repository at this point in the history
#### Description

Adds `unixgram` transport for the `statsdreceiver`. Additionally,
creates a new `packetServer` base class for both `UDS` and `UDP*`
transport types

#### Link to tracking issue

#21385

#### Testing

Added a unit test

---------

Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
michaelli321 and ChrsMark authored Dec 20, 2024
1 parent 23306ea commit c208ea2
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 64 deletions.
27 changes: 27 additions & 0 deletions .chloggen/statsdreceiver-uds.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: statsdreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add UDS support to statsdreceiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21385]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 5 additions & 4 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ Use case: it does not support horizontal pool of collectors. Desired work case i

## Configuration

The following settings are required:

- `endpoint` (default = `localhost:8125`): Address and port to listen on.
The Following settings are optional:

- `endpoint`: Address and port to listen on.
- For `udp` and `tcp` based `transport`, this config will default to `localhost:8125`
- For `unixgram` `transport`, this config will default to `/var/run/statsd-receiver.sock`

The Following settings are optional:
- `transport` (default = `udp`): Protocol used by the StatsD server. Currently supported transports can be found in [this file](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/internal/transport/transport.go).

- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)

Expand Down
9 changes: 9 additions & 0 deletions receiver/statsdreceiver/internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func (s *StatsD) connect() error {
if err != nil {
return err
}
case "unixgram":
unixAddr, err := net.ResolveUnixAddr(s.transport, s.address)
if err != nil {
return err
}
s.conn, err = net.DialUnix(s.transport, nil, unixAddr)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown/unsupported transport: %s", s.transport)
}
Expand Down
84 changes: 84 additions & 0 deletions receiver/statsdreceiver/internal/transport/packet_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"errors"
"net"

"go.opentelemetry.io/collector/consumer"
)

type packetServer struct {
packetConn net.PacketConn
transport Transport
}

// ListenAndServe starts the server ready to receive metrics.
func (u *packetServer) ListenAndServe(
nextConsumer consumer.Metrics,
reporter Reporter,
transferChan chan<- Metric,
) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if addr == nil && u.transport == UDS {
addr = &udsAddr{
network: u.transport.String(),
address: u.packetConn.LocalAddr().String(),
}
}

if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
u.transport,
u.packetConn.LocalAddr(),
err)
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
continue
}
}
return err
}
}
}

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *packetServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}

type udsAddr struct {
network string
address string
}

func (u *udsAddr) Network() string {
return u.network
}

func (u *udsAddr) String() string {
return u.address
}
7 changes: 5 additions & 2 deletions receiver/statsdreceiver/internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
TCP Transport = "tcp"
TCP4 Transport = "tcp4"
TCP6 Transport = "tcp6"
UDS Transport = "unixgram"
)

// NewTransport creates a Transport based on the transport string or returns an empty Transport.
Expand All @@ -31,14 +32,16 @@ func NewTransport(ts string) Transport {
return trans
case TCP, TCP4, TCP6:
return trans
case UDS:
return trans
}
return Transport("")
}

// String casts the transport to a String if the Transport is supported. Return an empty Transport overwise.
func (trans Transport) String() string {
switch trans {
case UDP, UDP4, UDP6, TCP, TCP4, TCP6:
case UDP, UDP4, UDP6, TCP, TCP4, TCP6, UDS:
return string(trans)
}
return ""
Expand All @@ -47,7 +50,7 @@ func (trans Transport) String() string {
// IsPacketTransport returns true if the transport is packet based.
func (trans Transport) IsPacketTransport() bool {
switch trans {
case UDP, UDP4, UDP6:
case UDP, UDP4, UDP6, UDS:
return true
}
return false
Expand Down
60 changes: 5 additions & 55 deletions receiver/statsdreceiver/internal/transport/udp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"errors"
"fmt"
"net"

"go.opentelemetry.io/collector/consumer"
)

type udpServer struct {
packetConn net.PacketConn
transport Transport
packetServer
}

// Ensure that Server is implemented on UDP Server.
Expand All @@ -31,60 +27,14 @@ func NewUDPServer(transport Transport, address string) (Server, error) {
}

return &udpServer{
packetConn: conn,
transport: transport,
packetServer: packetServer{
packetConn: conn,
transport: transport,
},
}, nil
}

// ListenAndServe starts the server ready to receive metrics.
func (u *udpServer) ListenAndServe(
nextConsumer consumer.Metrics,
reporter Reporter,
transferChan chan<- Metric,
) error {
if nextConsumer == nil || reporter == nil {
return errNilListenAndServeParameters
}

buf := make([]byte, 65527) // max size for udp packet body (assuming ipv6)
for {
n, addr, err := u.packetConn.ReadFrom(buf)
if n > 0 {
u.handlePacket(n, buf, addr, transferChan)
}
if err != nil {
reporter.OnDebugf("%s Transport (%s) - ReadFrom error: %v",
u.transport,
u.packetConn.LocalAddr(),
err)
var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
continue
}
}
return err
}
}
}

// Close closes the server.
func (u *udpServer) Close() error {
return u.packetConn.Close()
}

// handlePacket is helper that parses the buffer and split it line by line to be parsed upstream.
func (u *udpServer) handlePacket(
numBytes int,
data []byte,
addr net.Addr,
transferChan chan<- Metric,
) {
splitPacket := NewSplitBytes(data[:numBytes], '\n')
for splitPacket.Next() {
chunk := splitPacket.Chunk()
if len(chunk) > 0 {
transferChan <- Metric{string(chunk), addr}
}
}
}
42 changes: 42 additions & 0 deletions receiver/statsdreceiver/internal/transport/uds_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package transport // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"

import (
"fmt"
"net"
"os"
)

type udsServer struct {
packetServer
}

// Ensure that Server is implemented on UDS Server.
var _ (Server) = (*udsServer)(nil)

// NewUDSServer creates a transport.Server using Unixgram as its transport.
func NewUDSServer(transport Transport, socketPath string) (Server, error) {
if !transport.IsPacketTransport() {
return nil, fmt.Errorf("NewUDSServer with %s: %w", transport.String(), ErrUnsupportedPacketTransport)
}

conn, err := net.ListenPacket(transport.String(), socketPath)
if err != nil {
return nil, fmt.Errorf("starting to listen %s socket: %w", transport.String(), err)
}

return &udsServer{
packetServer: packetServer{
packetConn: conn,
transport: transport,
},
}, nil
}

// Close closes the server.
func (u *udsServer) Close() error {
os.Remove(u.packetConn.LocalAddr().String())
return u.packetConn.Close()
}
12 changes: 9 additions & 3 deletions receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,21 @@ func newReceiver(
config Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))

if config.NetAddr.Endpoint == "" {
config.NetAddr.Endpoint = "localhost:8125"
if trans == transport.UDS {
config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock"
} else {
config.NetAddr.Endpoint = "localhost:8125"
}
}

rep, err := newReporter(set)
if err != nil {
return nil, err
}

trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
LongLivedCtx: true,
ReceiverID: set.ID,
Expand All @@ -80,13 +85,14 @@ func newReceiver(
}

func buildTransportServer(config Config) (transport.Server, error) {
// TODO: Add unix socket transport implementations
trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport)))
switch trans {
case transport.UDP, transport.UDP4, transport.UDP6:
return transport.NewUDPServer(trans, config.NetAddr.Endpoint)
case transport.TCP, transport.TCP4, transport.TCP6:
return transport.NewTCPServer(trans, config.NetAddr.Endpoint)
case transport.UDS:
return transport.NewUDSServer(trans, config.NetAddr.Endpoint)
}

return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport))
Expand Down
18 changes: 18 additions & 0 deletions receiver/statsdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) {
return c
},
},
{
name: "UDS server with 4s interval",
addr: "/tmp/statsd_test.sock",
configFn: func() *Config {
return &Config{
NetAddr: confignet.AddrConfig{
Endpoint: "/tmp/statsd_test.sock",
Transport: confignet.TransportTypeUnixgram,
},
AggregationInterval: 4 * time.Second,
}
},
clientFn: func(t *testing.T, addr string) *client.StatsD {
c, err := client.NewStatsD("unixgram", addr)
require.NoError(t, err)
return c
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit c208ea2

Please sign in to comment.