Skip to content

Commit

Permalink
formatting improved with selectors and text output (#14)
Browse files Browse the repository at this point in the history
* Improve format registration
* less interleaving (json does not require importing protobuf)
* generic text renderer
* escape for strings in text/json formatter
  • Loading branch information
lspgn authored Jun 11, 2021
1 parent 9bdcdf0 commit e52a053
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 382 deletions.
7 changes: 4 additions & 3 deletions cmd/enricher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (

// import various formatters
"github.com/netsampler/goflow2/format"
"github.com/netsampler/goflow2/format/json"
"github.com/netsampler/goflow2/format/common"
_ "github.com/netsampler/goflow2/format/json"
_ "github.com/netsampler/goflow2/format/protobuf"

// import various transports
Expand Down Expand Up @@ -85,8 +86,8 @@ func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *flowmessage.FlowMessageExt) {
}

func init() {
json.AddJSONField("SrcCountry", json.FORMAT_TYPE_STRING)
json.AddJSONField("DstCountry", json.FORMAT_TYPE_STRING)
common.AddTextField("SrcCountry", common.FORMAT_TYPE_STRING)
common.AddTextField("DstCountry", common.FORMAT_TYPE_STRING)
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/goflow2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/netsampler/goflow2/format"
_ "github.com/netsampler/goflow2/format/json"
_ "github.com/netsampler/goflow2/format/protobuf"
_ "github.com/netsampler/goflow2/format/text"

// import various transports
"github.com/netsampler/goflow2/transport"
Expand All @@ -40,7 +41,6 @@ var (

Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))
//FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")

MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")
Expand Down
56 changes: 56 additions & 0 deletions format/common/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package common

import (
"flag"
"fmt"
"reflect"
"strings"
"sync"
)

var (
fieldsVar string
fields []string // Hashing fields

hashDeclared bool
hashDeclaredLock = &sync.Mutex{}
)

func HashFlag() {
hashDeclaredLock.Lock()
defer hashDeclaredLock.Unlock()

if hashDeclared {
return
}
hashDeclared = true
flag.StringVar(&fieldsVar, "format.hash", "SamplerAddress", "List of fields to do hashing, separated by commas")

}

func ManualHashInit() error {
fields = strings.Split(fieldsVar, ",")
return nil
}

func HashProtoLocal(msg interface{}) string {
return HashProto(fields, msg)
}

func HashProto(fields []string, msg interface{}) string {
var keyStr string

if msg != nil {
vfm := reflect.ValueOf(msg)
vfm = reflect.Indirect(vfm)

for _, kf := range fields {
fieldValue := vfm.FieldByName(kf)
if fieldValue.IsValid() {
keyStr += fmt.Sprintf("%v-", fieldValue)
}
}
}

return keyStr
}
38 changes: 38 additions & 0 deletions format/common/selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package common

import (
"flag"
"strings"
"sync"
)

var (
selectorVar string
selector []string // Hashing fields
selectorMap = make(map[string]bool)

selectorDeclared bool
selectorDeclaredLock = &sync.Mutex{}
)

func SelectorFlag() {
selectorDeclaredLock.Lock()
defer selectorDeclaredLock.Unlock()

if selectorDeclared {
return
}
selectorDeclared = true
flag.StringVar(&selectorVar, "format.selector", "", "List of fields to do keep in output")
}

func ManualSelectorInit() error {
if selectorVar == "" {
return nil
}
selector = strings.Split(selectorVar, ",")
for _, v := range selector {
selectorMap[v] = true
}
return nil
}
263 changes: 263 additions & 0 deletions format/common/text.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package common

import (
"encoding/binary"
"fmt"
"github.com/golang/protobuf/proto"
"net"
"reflect"
"strings"
)

const (
FORMAT_TYPE_UNKNOWN = iota
FORMAT_TYPE_STRING_FUNC
FORMAT_TYPE_STRING
FORMAT_TYPE_INTEGER
FORMAT_TYPE_IP
FORMAT_TYPE_MAC
)

var (
EtypeName = map[uint32]string{
0x806: "ARP",
0x800: "IPv4",
0x86dd: "IPv6",
}
ProtoName = map[uint32]string{
1: "ICMP",
6: "TCP",
17: "UDP",
58: "ICMPv6",
}
IcmpTypeName = map[uint32]string{
0: "EchoReply",
3: "DestinationUnreachable",
8: "Echo",
9: "RouterAdvertisement",
10: "RouterSolicitation",
11: "TimeExceeded",
}
Icmp6TypeName = map[uint32]string{
1: "DestinationUnreachable",
2: "PacketTooBig",
3: "TimeExceeded",
128: "EchoRequest",
129: "EchoReply",
133: "RouterSolicitation",
134: "RouterAdvertisement",
}

TextFields = []string{
"Type",
"TimeReceived",
"SequenceNum",
"SamplingRate",
"SamplerAddress",
"TimeFlowStart",
"TimeFlowEnd",
"Bytes",
"Packets",
"SrcAddr",
"DstAddr",
"Etype",
"Proto",
"SrcPort",
"DstPort",
"InIf",
"OutIf",
"SrcMac",
"DstMac",
"SrcVlan",
"DstVlan",
"VlanId",
"IngressVrfID",
"EgressVrfID",
"IPTos",
"ForwardingStatus",
"IPTTL",
"TCPFlags",
"IcmpType",
"IcmpCode",
"IPv6FlowLabel",
"FragmentId",
"FragmentOffset",
"BiFlowDirection",
"SrcAS",
"DstAS",
"NextHop",
"NextHopAS",
"SrcNet",
"DstNet",
}
TextFieldsTypes = []int{
FORMAT_TYPE_STRING_FUNC,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_IP,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_IP,
FORMAT_TYPE_IP,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_MAC,
FORMAT_TYPE_MAC,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_IP,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
FORMAT_TYPE_INTEGER,
}
RenderExtras = []string{
"EtypeName",
"ProtoName",
"IcmpName",
}
RenderExtraCall = []RenderExtraFunction{
RenderExtraFunctionEtypeName,
RenderExtraFunctionProtoName,
RenderExtraFunctionIcmpName,
}
)

func AddTextField(name string, jtype int) {
TextFields = append(TextFields, name)
TextFieldsTypes = append(TextFieldsTypes, jtype)
}

type RenderExtraFunction func(proto.Message) string

func RenderExtraFetchNumbers(msg proto.Message, fields []string) []uint64 {
vfm := reflect.ValueOf(msg)
vfm = reflect.Indirect(vfm)

values := make([]uint64, len(fields))
for i, kf := range fields {
fieldValue := vfm.FieldByName(kf)
if fieldValue.IsValid() {
values[i] = fieldValue.Uint()
}
}

return values
}

func RenderExtraFunctionEtypeName(msg proto.Message) string {
num := RenderExtraFetchNumbers(msg, []string{"Etype"})
return EtypeName[uint32(num[0])]
}

func RenderExtraFunctionProtoName(msg proto.Message) string {
num := RenderExtraFetchNumbers(msg, []string{"Proto"})
return ProtoName[uint32(num[0])]
}
func RenderExtraFunctionIcmpName(msg proto.Message) string {
num := RenderExtraFetchNumbers(msg, []string{"Proto", "IcmpCode", "IcmpType"})
return IcmpCodeType(uint32(num[0]), uint32(num[1]), uint32(num[2]))
}

func IcmpCodeType(proto, icmpCode, icmpType uint32) string {
if proto == 1 {
return IcmpTypeName[icmpType]
} else if proto == 58 {
return Icmp6TypeName[icmpType]
}
return ""
}

func RenderIP(addr []byte) string {
if addr == nil || (len(addr) != 4 && len(addr) != 16) {
return ""
}

return net.IP(addr).String()
}

func FormatMessageReflectText(msg proto.Message, ext string) string {
return FormatMessageReflectCustom(msg, ext, "", " ", "=", false)
}

func FormatMessageReflectJSON(msg proto.Message, ext string) string {
return fmt.Sprintf("{%s}", FormatMessageReflectCustom(msg, ext, "\"", ",", ":", true))
}

func FormatMessageReflectCustom(msg proto.Message, ext, quotes, sep, sign string, null bool) string {
fstr := make([]string, len(TextFields)+len(RenderExtras))

vfm := reflect.ValueOf(msg)
vfm = reflect.Indirect(vfm)

var i int
for j, kf := range TextFields {
fieldValue := vfm.FieldByName(kf)
if fieldValue.IsValid() {

switch TextFieldsTypes[j] {
case FORMAT_TYPE_STRING_FUNC:
strMethod := fieldValue.MethodByName("String").Call([]reflect.Value{})
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, strMethod[0].String())
case FORMAT_TYPE_STRING:
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, fieldValue.String())
case FORMAT_TYPE_INTEGER:
fstr[i] = fmt.Sprintf("%s%s%s%s%d", quotes, kf, quotes, sign, fieldValue.Uint())
case FORMAT_TYPE_IP:
ip := fieldValue.Bytes()
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, RenderIP(ip))
case FORMAT_TYPE_MAC:
mac := make([]byte, 8)
binary.BigEndian.PutUint64(mac, fieldValue.Uint())
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, net.HardwareAddr(mac[2:]).String())
default:
if null {
fstr[i] = fmt.Sprintf("%s%s%s%snull", quotes, kf, quotes, sign)
}
}

} else {
if null {
fstr[i] = fmt.Sprintf("%s%s%s%snull", quotes, kf, quotes, sign)
}
}
if len(selectorMap) == 0 || selectorMap[kf] {
i++
}

}

for j, e := range RenderExtras {
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, e, quotes, sign, RenderExtraCall[j](msg))
if len(selectorMap) == 0 || selectorMap[e] {
i++
}
}

if len(selectorMap) > 0 {
fstr = fstr[0:i]
}

return strings.Join(fstr, sep)
}
Loading

0 comments on commit e52a053

Please sign in to comment.