diff --git a/sflow/decoder.go b/sflow/decoder.go index fc34b59e..4930704d 100644 --- a/sflow/decoder.go +++ b/sflow/decoder.go @@ -33,9 +33,11 @@ import ( const ( // DataFlowSample defines packet flow sampling DataFlowSample = 1 + DataFlowSampleExpanded = 3 // DataCounterSample defines counter sampling DataCounterSample = 2 + DataCounterSampleExpanded = 4 ) // SFDecoder represents sFlow decoder @@ -120,7 +122,19 @@ func (d *SFDecoder) SFDecode() (*SFDatagram, error) { } datagram.Samples = append(datagram.Samples, d) case DataCounterSample: - d, err := decodeFlowCounter(d.reader) + d, err := decodeFlowCounter(d.reader, false) + if err != nil { + return datagram, err + } + datagram.Counters = append(datagram.Counters, d) + case DataFlowSampleExpanded: + d, err := decodeFlowSampleExpand(d.reader) + if err != nil { + return datagram, err + } + datagram.Samples = append(datagram.Samples, d) + case DataCounterSampleExpanded: + d, err := decodeFlowCounter(d.reader, true) if err != nil { return datagram, err } diff --git a/sflow/flow_counter.go b/sflow/flow_counter.go index eeb172e6..c338dc55 100644 --- a/sflow/flow_counter.go +++ b/sflow/flow_counter.go @@ -146,13 +146,13 @@ type ProcessorCounters struct { // CounterSample represents the periodic sampling or polling of counters associated with a Data Source type CounterSample struct { SequenceNo uint32 - SourceIDType byte + SourceIDType uint32 SourceIDIdx uint32 RecordsNo uint32 Records map[string]Record } -func decodeFlowCounter(r io.ReadSeeker) (*CounterSample, error) { +func decodeFlowCounter(r io.ReadSeeker, expanded bool) (*CounterSample, error) { var ( cs = new(CounterSample) rTypeFormat uint32 @@ -160,7 +160,7 @@ func decodeFlowCounter(r io.ReadSeeker) (*CounterSample, error) { err error ) - if err = cs.unmarshal(r); err != nil { + if err = cs.unmarshal(r, expanded); err != nil { return nil, err } @@ -441,7 +441,7 @@ func (pc *ProcessorCounters) unmarshal(r io.Reader) error { return nil } -func (cs *CounterSample) unmarshal(r io.Reader) error { +func (cs *CounterSample) unmarshal(r io.Reader, expanded bool) error { var err error @@ -449,15 +449,25 @@ func (cs *CounterSample) unmarshal(r io.Reader) error { return err } - if err = read(r, &cs.SourceIDType); err != nil { - return err - } - - buf := make([]byte, 3) - if err = read(r, &buf); err != nil { - return err + if expanded { + if err = read(r, &cs.SourceIDType); err != nil { + return err + } + if err = read(r, &cs.SourceIDIdx); err != nil { + return err + } + } else { + buf := make([]byte, 1) + if err = read(r, &buf); err != nil { + return err + } + cs.SourceIDType = uint32(buf[0]) + buf = make([]byte, 3) + if err = read(r, &buf); err != nil { + return err + } + cs.SourceIDIdx = uint32(buf[2]) | uint32(buf[1])<<8 | uint32(buf[0])<<16 } - cs.SourceIDIdx = uint32(buf[2]) | uint32(buf[1])<<8 | uint32(buf[0])<<16 err = read(r, &cs.RecordsNo) diff --git a/sflow/flow_sample.go b/sflow/flow_sample.go index 0a284673..bfe47b17 100644 --- a/sflow/flow_sample.go +++ b/sflow/flow_sample.go @@ -44,7 +44,7 @@ const ( // FlowSample represents single flow sample type FlowSample struct { SequenceNo uint32 // Incremented with each flow sample - SourceID byte // sfSourceID + SourceID uint32 // sfSourceID SamplingRate uint32 // sfPacketSamplingRate SamplePool uint32 // Total number of packets that could have been sampled Drops uint32 // Number of times a packet was dropped due to lack of resources @@ -54,6 +54,29 @@ type FlowSample struct { Records map[string]Record } +type sflowDataSourceExpand struct{ + sourceIdType uint32; /* sFlowDataSource type */ + sourceIdIndex uint32; /* sFlowDataSource index */ +} + +type interfaceExpand struct{ + format uint32; /* interface format */ + value uint32; /* interface value */ +} + +// FlowSampleExpand represents single flow sample expand +type FlowSampleExpand struct { + SequenceNo uint32 // Incremented with each flow sample + SourceID sflowDataSourceExpand // sfSourceID + SamplingRate uint32 // sfPacketSamplingRate + SamplePool uint32 // Total number of packets that could have been sampled + Drops uint32 // Number of times a packet was dropped due to lack of resources + Input interfaceExpand // SNMP ifIndex of input interface + Output interfaceExpand // SNMP ifIndex of input interface + RecordsNo uint32 // Number of records to follow + Records map[string]Record +} + // SampledHeader represents sampled header type SampledHeader struct { Protocol uint32 // (enum SFLHeader_protocol) @@ -89,11 +112,13 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker) error { return err } - if err = read(r, &fs.SourceID); err != nil { - return err + buf := make([]byte, 1) + if err = read(r, &buf); err != nil { + return err } + fs.SourceID = uint32(buf[0]) + r.Seek(3, 1) // skip unused bytes - r.Seek(3, 1) // skip counter sample decoding if err = read(r, &fs.SamplingRate); err != nil { return err @@ -120,6 +145,54 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker) error { return err } +func (fs *FlowSampleExpand) unmarshalExpand(r io.ReadSeeker) error { + var err error + + if err = read(r, &fs.SequenceNo); err != nil { + return err + } + + if err = read(r, &fs.SourceID.sourceIdType); err != nil { + return err + } + + if err = read(r, &fs.SourceID.sourceIdType); err != nil { + return err + } + + if err = read(r, &fs.SamplingRate); err != nil { + return err + } + + if err = read(r, &fs.SamplePool); err != nil { + return err + } + + if err = read(r, &fs.Drops); err != nil { + return err + } + + if err = read(r, &fs.Input.format); err != nil { + return err + } + + if err = read(r, &fs.Input.value); err != nil { + return err + } + + if err = read(r, &fs.Output.format); err != nil { + return err + } + + if err = read(r, &fs.Output.value); err != nil { + return err + } + + err = read(r, &fs.RecordsNo) + + return err +} + func (sh *SampledHeader) unmarshal(r io.Reader) error { var err error @@ -248,6 +321,57 @@ func decodeFlowSample(r io.ReadSeeker) (*FlowSample, error) { return fs, nil } +func decodeFlowSampleExpand(r io.ReadSeeker) (*FlowSampleExpand, error) { + var ( + fs = new(FlowSampleExpand) + rTypeFormat uint32 + rTypeLength uint32 + err error + ) + + if err = fs.unmarshalExpand(r); err != nil { + return nil, err + } + + fs.Records = make(map[string]Record) + + for i := uint32(0); i < fs.RecordsNo; i++ { + if err = read(r, &rTypeFormat); err != nil { + return nil, err + } + if err = read(r, &rTypeLength); err != nil { + return nil, err + } + + switch rTypeFormat { + case SFDataRawHeader: + d, err := decodeSampledHeader(r) + if err != nil { + return fs, err + } + fs.Records["RawHeader"] = d + case SFDataExtSwitch: + d, err := decodeExtSwitchData(r) + if err != nil { + return fs, err + } + + fs.Records["ExtSwitch"] = d + case SFDataExtRouter: + d, err := decodeExtRouterData(r, rTypeLength) + if err != nil { + return fs, err + } + + fs.Records["ExtRouter"] = d + default: + r.Seek(int64(rTypeLength), 1) + } + } + + return fs, nil +} + func decodeSampledHeader(r io.Reader) (*packet.Packet, error) { var ( h = new(SampledHeader)