Skip to content

Commit

Permalink
[filebeat] Add a configuration option for TCP/UDP network type (#40623)
Browse files Browse the repository at this point in the history
* Add a configuration option for TCP/UDP network type
  • Loading branch information
aleksmaus authored Aug 27, 2024
1 parent 76ea160 commit 56a17e0
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]
- Bump version of elastic/toutoumomoma to remove internal forks of stdlib debug packages. {pull}40325[40325]
- Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308]
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]

==== Deprecated

Expand Down
6 changes: 6 additions & 0 deletions filebeat/docs/inputs/input-common-tcp-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ The maximum size of the message received over TCP. The default is `20MiB`.

The host and TCP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-tcp-network"]
==== `network`

The network type. Acceptable values are: "tcp" (default), "tcp4", "tcp6"

[float]
[id="{beatname_lc}-input-{type}-tcp-framing"]
==== `framing`
Expand Down
6 changes: 6 additions & 0 deletions filebeat/docs/inputs/input-common-udp-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ The maximum size of the message received over UDP. The default is `10KiB`.

The host and UDP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-udp-network"]
==== `network`

The network type. Acceptable values are: "udp" (default), "udp4", "udp6"

[float]
[id="{beatname_lc}-input-{type}-udp-read-buffer"]
==== `read_buffer`
Expand Down
20 changes: 19 additions & 1 deletion filebeat/inputsource/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package tcp

import (
"errors"
"fmt"
"time"

Expand All @@ -35,12 +36,29 @@ type Config struct {
MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"`
MaxConnections int `config:"max_connections"`
TLS *tlscommon.ServerConfig `config:"ssl"`
Network string `config:"network"`
}

const (
networkTCP = "tcp"
networkTCP4 = "tcp4"
networkTCP6 = "tcp6"
)

var (
ErrInvalidNetwork = errors.New("invalid network value")
ErrMissingHostPort = errors.New("need to specify the host using the `host:port` syntax")
)

// Validate validates the Config option for the tcp input.
func (c *Config) Validate() error {
if len(c.Host) == 0 {
return fmt.Errorf("need to specify the host using the `host:port` syntax")
return ErrMissingHostPort
}
switch c.Network {
case "", networkTCP, networkTCP4, networkTCP6:
default:
return fmt.Errorf("%w: %s, expected: %v or %v or %v ", ErrInvalidNetwork, c.Network, networkTCP, networkTCP4, networkTCP6)
}
return nil
}
74 changes: 74 additions & 0 deletions filebeat/inputsource/tcp/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tcp

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestValidate(t *testing.T) {
type testCfg struct {
name string
cfg Config
wantErr error
}

tests := []testCfg{
{
name: "ok",
cfg: Config{
Host: "localhost:9000",
},
},
{
name: "nohost",
wantErr: ErrMissingHostPort,
},
{
name: "invalidnetwork",
cfg: Config{
Host: "localhost:9000",
Network: "foo",
},
wantErr: ErrInvalidNetwork,
},
}

for _, network := range []string{networkTCP, networkTCP4, networkTCP6} {
tests = append(tests, testCfg{
name: "network_" + network,
cfg: Config{
Host: "localhost:9000",
Network: network,
},
})
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors())
if diff != "" {
t.Fatal(diff)
}
})
}
}
12 changes: 10 additions & 2 deletions filebeat/inputsource/tcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ func New(
func (s *Server) createServer() (net.Listener, error) {
var l net.Listener
var err error
network := s.network()
if s.tlsConfig != nil {
t := s.tlsConfig.BuildServerConfig(s.config.Host)
l, err = tls.Listen("tcp", s.config.Host, t)
l, err = tls.Listen(network, s.config.Host, t)
if err != nil {
return nil, err
}
} else {
l, err = net.Listen("tcp", s.config.Host)
l, err = net.Listen(network, s.config.Host)
if err != nil {
return nil, err
}
Expand All @@ -85,3 +86,10 @@ func (s *Server) createServer() (net.Listener, error) {
}
return l, nil
}

func (s *Server) network() string {
if s.config.Network != "" {
return s.config.Network
}
return networkTCP
}
13 changes: 11 additions & 2 deletions filebeat/inputsource/tcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func TestErrorOnEmptyLineDelimiter(t *testing.T) {
}

func TestReceiveEventsAndMetadata(t *testing.T) {
// Excluding tcp6 for now, since it fails in our CI
for _, network := range []string{networkTCP, networkTCP4} {
testReceiveEventsAndMetadata(t, network)
}
}

func testReceiveEventsAndMetadata(t *testing.T, network string) {
expectedMessages := generateMessages(5, 100)
largeMessages := generateMessages(10, 4096)
extraLargeMessages := generateMessages(2, 65*1024)
Expand Down Expand Up @@ -220,6 +227,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
if !assert.NoError(t, err) {
return
}
config.Network = network

splitFunc, err := streaming.SplitFunc(test.framing, test.delimiter)
if !assert.NoError(t, err) {
Expand All @@ -237,7 +245,8 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
}
defer server.Stop()

conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String())
addr := server.Listener.Listener.Addr().String()
conn, err := net.Dial(network, addr)
require.NoError(t, err)
fmt.Fprint(conn, test.messageSent)
conn.Close()
Expand Down Expand Up @@ -294,8 +303,8 @@ func TestReceiveNewEventsConcurrently(t *testing.T) {
for w := 0; w < workers; w++ {
go func() {
conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String())
defer conn.Close()
assert.NoError(t, err)
defer conn.Close()
for _, sample := range samples {
fmt.Fprintln(conn, sample)
}
Expand Down
21 changes: 21 additions & 0 deletions filebeat/inputsource/udp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package udp

import (
"errors"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/cfgtype"
Expand All @@ -29,4 +31,23 @@ type Config struct {
MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"positive,nonzero"`
Timeout time.Duration `config:"timeout"`
ReadBuffer cfgtype.ByteSize `config:"read_buffer" validate:"positive"`
Network string `config:"network"`
}

const (
networkUDP = "udp"
networkUDP4 = "udp4"
networkUDP6 = "udp6"
)

var ErrInvalidNetwork = errors.New("invalid network value")

// Validate validates the Config option for the udp input.
func (c *Config) Validate() error {
switch c.Network {
case "", networkUDP, networkUDP4, networkUDP6:
default:
return fmt.Errorf("%w: %s, expected: %v or %v or %v", ErrInvalidNetwork, c.Network, networkUDP, networkUDP4, networkUDP6)
}
return nil
}
70 changes: 70 additions & 0 deletions filebeat/inputsource/udp/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package udp

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestValidate(t *testing.T) {
type testCfg struct {
name string
cfg Config
wantErr error
}

tests := []testCfg{
{
name: "ok",
cfg: Config{
Host: "localhost:8080",
},
},
{
name: "invalidnetwork",
cfg: Config{
Host: "localhost:8080",
Network: "foo",
},
wantErr: ErrInvalidNetwork,
},
}

for _, network := range []string{networkUDP, networkUDP4, networkUDP6} {
tests = append(tests, testCfg{
name: "network_" + network,
cfg: Config{
Host: "localhost:8080",
Network: network,
},
})
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors())
if diff != "" {
t.Fatal(diff)
}
})
}
}
12 changes: 10 additions & 2 deletions filebeat/inputsource/udp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ func New(config *Config, callback inputsource.NetworkFunc) *Server {

func (u *Server) createConn() (net.PacketConn, error) {
var err error
udpAdddr, err := net.ResolveUDPAddr("udp", u.config.Host)
network := u.network()
udpAdddr, err := net.ResolveUDPAddr(network, u.config.Host)
if err != nil {
return nil, err
}
listener, err := net.ListenUDP("udp", udpAdddr)
listener, err := net.ListenUDP(network, udpAdddr)
if err != nil {
return nil, err
}
Expand All @@ -71,3 +72,10 @@ func (u *Server) createConn() (net.PacketConn, error) {

return listener, err
}

func (u *Server) network() string {
if u.config.Network != "" {
return u.config.Network
}
return networkUDP
}
12 changes: 11 additions & 1 deletion filebeat/inputsource/udp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type info struct {
}

func TestReceiveEventFromUDP(t *testing.T) {
// Excluding udp6 for now, since it fails in our CI
for _, network := range []string{networkUDP, networkUDP4} {
t.Run(network, func(t *testing.T) {
testReceiveEventFromUDPWithNetwork(t, network)
})
}
}

func testReceiveEventFromUDPWithNetwork(t *testing.T, network string) {
tests := []struct {
name string
message []byte
Expand All @@ -64,6 +73,7 @@ func TestReceiveEventFromUDP(t *testing.T) {
MaxMessageSize: maxMessageSize,
Timeout: timeout,
ReadBuffer: maxSocketSize,
Network: network,
}
fn := func(message []byte, metadata inputsource.NetworkMetadata) {
ch <- info{message: message, mt: metadata}
Expand All @@ -77,7 +87,7 @@ func TestReceiveEventFromUDP(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
conn, err := net.Dial("udp", s.localaddress)
conn, err := net.Dial(s.network(), s.localaddress)
if !assert.NoError(t, err) {
return
}
Expand Down

0 comments on commit 56a17e0

Please sign in to comment.