diff --git a/pkg/pipeline/decode/decode_protobuf.go b/pkg/pipeline/decode/decode_protobuf.go index 20020c6ed..da3b3075c 100644 --- a/pkg/pipeline/decode/decode_protobuf.go +++ b/pkg/pipeline/decode/decode_protobuf.go @@ -7,6 +7,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) @@ -17,6 +18,7 @@ type Protobuf struct { } func NewProtobuf() (*Protobuf, error) { + log.Debugf("entering NewProtobuf") return &Protobuf{}, nil } diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index c6f6b7712..15de43c11 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -105,6 +105,7 @@ func instrumentGRPC(m *metrics) grpc2.UnaryServerInterceptor { resp, err = handler(ctx, req) if err != nil { + glog.Errorf("Reporting metric error: %v", err) m.error(fmt.Sprint(status.Code(err))) } diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 353112f74..185048780 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -113,17 +113,19 @@ func (k *ingestKafka) isStopped() bool { } func (k *ingestKafka) processRecordDelay(record config.GenericMap) { - TimeFlowEndInterface, ok := record["TimeFlowEnd"] + TimeFlowEndInterface, ok := record["TimeFlowEndMs"] if !ok { - k.metrics.error("TimeFlowEnd missing") + klog.Errorf("TimeFlowEndMs missing in record %v", record) + k.metrics.error("TimeFlowEndMs missing") return } - TimeFlowEnd, ok := TimeFlowEndInterface.(float64) + TimeFlowEnd, ok := TimeFlowEndInterface.(int64) if !ok { - k.metrics.error("Cannot parse TimeFlowEnd") + klog.Errorf("Cannot parse TimeFlowEndMs of record %v", record) + k.metrics.error("Cannot parse TimeFlowEndMs") return } - delay := time.Since(time.Unix(int64(TimeFlowEnd), 0)).Seconds() + delay := time.Since(time.UnixMilli(TimeFlowEnd)).Seconds() k.metrics.latency.Observe(delay) }