Skip to content

Commit

Permalink
Merge pull request #116 from chronosphereio/service-ports
Browse files Browse the repository at this point in the history
fix: add cloudflare, prometheus_remote_write and splunk to service ports [sc-115821]
  • Loading branch information
niedbalski authored Nov 5, 2024
2 parents 0f81e09 + 4ca91d1 commit b9c7e2f
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 160 deletions.
274 changes: 175 additions & 99 deletions service_port.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,154 @@
package fluentbitconfig

import (
"errors"
"net"
"strconv"
"strings"

"github.com/calyptia/go-fluentbit-config/v2/networking"
)

var servicePortDefaultsByPlugin = map[string]servicePortDefaults{
// Inputs.
"collectd": {Port: 25826, Protocol: networking.ProtocolUDP},
"elasticsearch": {Port: 9200, Protocol: networking.ProtocolTCP},
"forward": {Port: 24224, Protocol: networking.ProtocolTCP}, // only if `unix_path` is not set.
"http": {Port: 9880, Protocol: networking.ProtocolTCP},
"mqtt": {Port: 1883, Protocol: networking.ProtocolTCP},
"opentelemetry": {Port: 4318, Protocol: networking.ProtocolTCP},
"statsd": {Port: 8125, Protocol: networking.ProtocolUDP},
"syslog": {Port: 5140, Protocol: networking.ProtocolUDP}, // only if `mode` is not `unix_udp` (default) or `unix_tcp`
"tcp": {Port: 5170, Protocol: networking.ProtocolTCP},
"udp": {Port: 5170, Protocol: networking.ProtocolUDP},
"cloudflare": {Port: 9880, Protocol: networking.ProtocolTCP},

// Outputs.
"prometheus_exporter": {Port: 2021, Protocol: networking.ProtocolTCP},
type ServicePortGetter struct {
EnabledFunc func(p Plugin) bool
PortFunc func(p Plugin) (int32, bool)
ProtocolFunc func(p Plugin) (networking.Protocol, bool)
DefaultPort int32
DefaultProtocol networking.Protocol
}

type servicePortDefaults struct {
Port int
Protocol networking.Protocol
func fromPort(p Plugin) (int32, bool) {
portVal, ok := p.Properties.Get("port")
if !ok {
return 0, false
}

i, ok := int32FromAny(portVal)
return i, ok
}

var inputServicePorts = map[string]ServicePortGetter{
"cloudflare": {
PortFunc: func(p Plugin) (int32, bool) {
addrVal, ok := p.Properties.Get("addr")
if !ok {
return 0, false
}

_, port, err := net.SplitHostPort(stringFromAny(addrVal))
if err != nil {
return 0, false
}

i, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return 0, false
}

return int32(i), true
},
DefaultPort: 9880,
DefaultProtocol: networking.ProtocolTCP,
},
"collectd": {
PortFunc: fromPort,
DefaultPort: 25826,
DefaultProtocol: networking.ProtocolUDP,
},
"elasticsearch": {
PortFunc: fromPort,
DefaultPort: 9200,
DefaultProtocol: networking.ProtocolTCP,
},
"forward": {
EnabledFunc: func(p Plugin) bool {
return !p.Properties.Has("unix_path")
},
PortFunc: fromPort,
DefaultPort: 24224,
DefaultProtocol: networking.ProtocolTCP,
},
"http": {
PortFunc: fromPort,
DefaultPort: 9880,
DefaultProtocol: networking.ProtocolTCP,
},
"mqtt": {
PortFunc: fromPort,
DefaultPort: 1883,
DefaultProtocol: networking.ProtocolTCP,
},
"opentelemetry": {
PortFunc: fromPort,
DefaultPort: 4318,
DefaultProtocol: networking.ProtocolTCP,
},
"prometheus_remote_write": {
PortFunc: fromPort,
DefaultPort: 8080,
DefaultProtocol: networking.ProtocolTCP,
},
"splunk": {
PortFunc: fromPort,
DefaultPort: 8088,
DefaultProtocol: networking.ProtocolTCP,
},
"statsd": {
PortFunc: fromPort,
DefaultPort: 8125,
DefaultProtocol: networking.ProtocolUDP,
},
"syslog": {
EnabledFunc: func(p Plugin) bool {
modeVal, ok := p.Properties.Get("mode")
if !ok {
return false // default mode is unix_udp
}

mode := strings.ToLower(stringFromAny(modeVal))
return mode == "tcp" || mode == "udp"
},
PortFunc: fromPort,
ProtocolFunc: func(p Plugin) (networking.Protocol, bool) {
modeVal, ok := p.Properties.Get("mode")
if !ok {
return networking.Protocol("unknown"), false
}

mode := strings.ToLower(stringFromAny(modeVal))
switch mode {
case "tcp":
return networking.ProtocolTCP, true
case "udp":
return networking.ProtocolUDP, true
}

return networking.Protocol("unknown"), false
},
DefaultPort: 5140,
DefaultProtocol: networking.ProtocolTCP,
},
"tcp": {
PortFunc: fromPort,
DefaultPort: 5170,
DefaultProtocol: networking.ProtocolTCP,
},
"udp": {
PortFunc: fromPort,
DefaultPort: 5170,
DefaultProtocol: networking.ProtocolUDP,
},
}

var outputServicePorts = map[string]ServicePortGetter{
"prometheus_exporter": {
PortFunc: fromPort,
DefaultPort: 2021,
DefaultProtocol: networking.ProtocolTCP,
},
}

type ServicePort struct {
Port int
Port int32
Protocol networking.Protocol
Kind SectionKind
// Plugin is not available for `service`` section kind.
Expand All @@ -43,16 +160,16 @@ type ServicePorts []ServicePort
func (c *Config) ServicePorts() ServicePorts {
var out ServicePorts

enabledVal, ok := c.Service.Get("http_server")
if ok && (enabledVal == true || strings.ToLower(stringFromAny(enabledVal)) == "on") {
httpServerEnabled, ok := c.Service.Get("http_server")
if ok && (httpServerEnabled == true || strings.ToLower(stringFromAny(httpServerEnabled)) == "on") {
portVal, ok := c.Service.Get("http_port")
if !ok {
out = append(out, ServicePort{
Port: 2020,
Protocol: networking.ProtocolTCP,
Kind: SectionKindService,
})
} else if i, ok := intFromAny(portVal); ok {
} else if i, ok := int32FromAny(portVal); ok {
out = append(out, ServicePort{
Port: i,
Protocol: networking.ProtocolTCP,
Expand All @@ -61,90 +178,49 @@ func (c *Config) ServicePorts() ServicePorts {
}
}

lookup := func(kind SectionKind, plugins Plugins) {
for _, plugin := range plugins {
err := ValidateSection(kind, plugin.Properties)
if errors.Is(err, ErrMissingName) {
continue
}

var e *UnknownPluginError
if errors.As(err, &e) {
continue
}

if plugin.Name == "forward" && plugin.Properties.Has("unix_path") {
continue
}
process := func(getters map[string]ServicePortGetter, kind SectionKind, plugin Plugin) {
getter, ok := getters[plugin.Name]
if !ok {
return
}

if plugin.Name == "syslog" {
modeVal, ok := plugin.Properties.Get("mode")
if !ok {
continue
}

if ok {
mode := strings.ToLower(stringFromAny(modeVal))
if mode == "unix_udp" || mode == "unix_tcp" {
continue
}
}
}
if getter.EnabledFunc != nil && !getter.EnabledFunc(plugin) {
return
}

portVal, ok := plugin.Properties.Get("port")
port := getter.DefaultPort
if getter.PortFunc != nil {
var ok bool
port, ok = getter.PortFunc(plugin)
if !ok {
defaults, ok := servicePortDefaultsByPlugin[plugin.Name]
if ok {
plugin := plugin
plugin.Properties = nil
out = append(out, ServicePort{
Port: defaults.Port,
Protocol: defaults.Protocol,
Kind: kind,
Plugin: &plugin,
})
}
port = getter.DefaultPort
}
}

if ok {
port, ok := intFromAny(portVal)
if ok {
var protocol networking.Protocol
if plugin.Name == "syslog" {
modeVal, ok := plugin.Properties.Get("mode")
if ok {
if v := networking.Protocol(strings.ToUpper(stringFromAny(modeVal))); v.OK() {
protocol = v
}
}
}

if protocol == "" {
defaultPort, ok := servicePortDefaultsByPlugin[plugin.Name]
if ok {
protocol = defaultPort.Protocol
}
}

if protocol == "" {
protocol = networking.ProtocolTCP
}

plugin := plugin
plugin.Properties = nil
out = append(out, ServicePort{
Port: port,
Protocol: protocol,
Kind: kind,
Plugin: &plugin,
})
}
protocol := getter.DefaultProtocol
if getter.ProtocolFunc != nil {
var ok bool
protocol, ok = getter.ProtocolFunc(plugin)
if !ok {
protocol = getter.DefaultProtocol
}
}

out = append(out, ServicePort{
Port: port,
Protocol: protocol,
Kind: kind,
Plugin: &plugin,
})
}

for _, input := range c.Pipeline.Inputs {
process(inputServicePorts, SectionKindInput, input)
}

lookup(SectionKindInput, c.Pipeline.Inputs)
lookup(SectionKindOutput, c.Pipeline.Outputs)
for _, output := range c.Pipeline.Outputs {
process(outputServicePorts, SectionKindOutput, output)
}

return out
}
Loading

0 comments on commit b9c7e2f

Please sign in to comment.