From 6d660f6d6126e9ebf2a5430e2e4c32491f91e611 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Mon, 7 Aug 2023 10:20:54 +0200 Subject: [PATCH] feat: update code use new uint64 --- codec/fullevent_test.go | 3 +- .../modeldecodertest/populator.go | 2 +- .../internal/modeldecoder/rumv3/decoder.go | 7 +-- .../internal/modeldecoder/rumv3/error_test.go | 17 +++--- .../modeldecoder/rumv3/transaction_test.go | 29 +++++----- .../internal/modeldecoder/v2/decoder.go | 19 +++---- .../internal/modeldecoder/v2/error_test.go | 9 ++-- .../internal/modeldecoder/v2/log_test.go | 15 +++--- .../modeldecoder/v2/metricset_test.go | 13 ++--- .../internal/modeldecoder/v2/span_test.go | 11 ++-- .../modeldecoder/v2/transaction_test.go | 19 ++++--- input/elasticapm/processor_test.go | 5 +- input/otlp/exceptions_test.go | 7 ++- input/otlp/logs.go | 5 +- input/otlp/logs_test.go | 31 ++++++----- input/otlp/metadata_test.go | 2 +- input/otlp/metrics.go | 5 +- input/otlp/metrics_test.go | 53 +++++++++---------- input/otlp/traces.go | 7 ++- input/otlp/traces_test.go | 14 ++--- model/modeljson/apmevent.pb.json.go | 6 +-- model/modeljson/apmevent.pb.json_test.go | 3 +- model/modeljson/event.pb.json.go | 4 +- model/modeljson/event.pb.json_test.go | 3 +- model/modelpb/timestamp.go | 30 +++++++++++ 25 files changed, 159 insertions(+), 160 deletions(-) create mode 100644 model/modelpb/timestamp.go diff --git a/codec/fullevent_test.go b/codec/fullevent_test.go index bc95548e..fc5574ad 100644 --- a/codec/fullevent_test.go +++ b/codec/fullevent_test.go @@ -23,12 +23,11 @@ import ( "github.com/elastic/apm-data/model/modelpb" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) func fullEvent(t *testing.B) *modelpb.APMEvent { return &modelpb.APMEvent{ - Timestamp: timestamppb.New(time.Unix(1, 1)), + Timestamp: uint64(time.Second.Nanoseconds() + 1), Span: &modelpb.Span{ Message: &modelpb.Message{ Body: "body", diff --git a/input/elasticapm/internal/modeldecoder/modeldecodertest/populator.go b/input/elasticapm/internal/modeldecoder/modeldecodertest/populator.go index d402e969..925edc3b 100644 --- a/input/elasticapm/internal/modeldecoder/modeldecodertest/populator.go +++ b/input/elasticapm/internal/modeldecoder/modeldecodertest/populator.go @@ -122,7 +122,7 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti switch fKind := f.Kind(); fKind { case reflect.String: fieldVal = reflect.ValueOf(values.Str) - case reflect.Int, reflect.Int32, reflect.Int64: + case reflect.Int, reflect.Int32, reflect.Int64, reflect.Uint64: fieldVal = reflect.ValueOf(values.Int).Convert(f.Type()) case reflect.Float64: fieldVal = reflect.ValueOf(values.Float).Convert(f.Type()) diff --git a/input/elasticapm/internal/modeldecoder/rumv3/decoder.go b/input/elasticapm/internal/modeldecoder/rumv3/decoder.go index bd6e5919..1c748ca3 100644 --- a/input/elasticapm/internal/modeldecoder/rumv3/decoder.go +++ b/input/elasticapm/internal/modeldecoder/rumv3/decoder.go @@ -32,7 +32,6 @@ import ( "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable" "github.com/elastic/apm-data/model/modelpb" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -248,7 +247,7 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) { event.ParentId = from.ParentID.Val } if !from.Timestamp.Val.IsZero() { - event.Timestamp = timestamppb.New(from.Timestamp.Val) + event.Timestamp = modelpb.FromTime(from.Timestamp.Val) } if from.TraceID.IsSet() { event.Trace = &modelpb.Trace{ @@ -628,9 +627,7 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) { if from.Start.IsSet() { // event.Timestamp is initialized to the time the payload was // received; offset that by "start" milliseconds for RUM. - event.Timestamp = timestamppb.New(event.Timestamp.AsTime().Add( - time.Duration(float64(time.Millisecond) * from.Start.Val), - )) + event.Timestamp += uint64(time.Duration(float64(time.Millisecond) * from.Start.Val).Nanoseconds()) } } diff --git a/input/elasticapm/internal/modeldecoder/rumv3/error_test.go b/input/elasticapm/internal/modeldecoder/rumv3/error_test.go index 74ef7cbe..2118e1c9 100644 --- a/input/elasticapm/internal/modeldecoder/rumv3/error_test.go +++ b/input/elasticapm/internal/modeldecoder/rumv3/error_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -47,9 +46,9 @@ func TestResetErrorOnRelease(t *testing.T) { func TestDecodeNestedError(t *testing.T) { t.Run("decode", func(t *testing.T) { - now := time.Now().UTC() + now := modelpb.FromTime(time.Now()) eventBase := initializedMetadata() - eventBase.Timestamp = timestamppb.New(now) + eventBase.Timestamp = now input := modeldecoder.Input{Base: eventBase} str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -57,7 +56,7 @@ func TestDecodeNestedError(t *testing.T) { require.NoError(t, DecodeNestedError(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Error) - assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime()) + assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000)), batch[0].Timestamp) assert.Empty(t, cmp.Diff(&modelpb.Error{ Id: "a-b-c", Log: &modelpb.ErrorLog{ @@ -72,7 +71,7 @@ func TestDecodeNestedError(t *testing.T) { dec = decoder.NewJSONDecoder(strings.NewReader(str)) batch = modelpb.Batch{} require.NoError(t, DecodeNestedError(dec, &input, &batch)) - assert.Equal(t, now, batch[0].Timestamp.AsTime()) + assert.Equal(t, now, batch[0].Timestamp) // test decode err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) @@ -146,8 +145,8 @@ func TestDecodeMapToErrorModel(t *testing.T) { } var input errorEvent var out1, out2 modelpb.APMEvent - reqTime := time.Now().Add(time.Second).UTC() - out1.Timestamp = timestamppb.New(reqTime) + reqTime := modelpb.FromTime(time.Now().Add(time.Second)) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) mapToErrorModel(&input, &out1) @@ -155,7 +154,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal) // leave event timestamp unmodified if eventTime is zero - out1.Timestamp = timestamppb.New(reqTime) + out1.Timestamp = reqTime modeldecodertest.SetStructValues(&input, defaultVal) mapToErrorModel(&input, &out1) input.Reset() @@ -163,7 +162,7 @@ func TestDecodeMapToErrorModel(t *testing.T) { // reuse input model for different event // ensure memory is not shared by reusing input model - out2.Timestamp = timestamppb.New(reqTime) + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) mapToErrorModel(&input, &out2) diff --git a/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go b/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go index 0e4c7646..95fd6a60 100644 --- a/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go +++ b/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -47,9 +46,9 @@ func TestResetTransactionOnRelease(t *testing.T) { func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { - now := time.Now().UTC() + now := modelpb.FromTime(time.Now()) eventBase := initializedMetadata() - eventBase.Timestamp = timestamppb.New(now) + eventBase.Timestamp = now input := modeldecoder.Input{Base: eventBase} str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"ysc":{"v":5}},"y":{"t":"span_type","su":"span_subtype"}}]}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -62,7 +61,7 @@ func TestDecodeNestedTransaction(t *testing.T) { assert.Equal(t, "request", batch[0].Transaction.Type) // fall back to request time - assert.Equal(t, now, batch[0].Timestamp.AsTime()) + assert.Equal(t, now, batch[0].Timestamp) // Ensure nested metricsets are decoded. RUMv3 only sends // breakdown metrics, so the Metricsets will be empty and @@ -78,11 +77,11 @@ func TestDecodeNestedTransaction(t *testing.T) { Subtype: "span_subtype", SelfTime: &modelpb.AggregatedDuration{Count: 5}, }, batch[1].Span, protocmp.Transform())) - assert.Equal(t, now, batch[1].Timestamp.AsTime()) + assert.Equal(t, now, batch[1].Timestamp) // ensure nested spans are decoded - start := time.Duration(20 * 1000 * 1000) - assert.Equal(t, now.Add(start), batch[2].Timestamp.AsTime()) // add start to timestamp + start := uint64(time.Duration(20 * 1000 * 1000).Nanoseconds()) + assert.Equal(t, now+start, batch[2].Timestamp) // add start to timestamp assert.Equal(t, "100", batch[2].Transaction.Id) assert.Equal(t, "1", batch[2].Trace.Id) assert.Equal(t, "100", batch[2].ParentId) @@ -93,8 +92,8 @@ func TestDecodeNestedTransaction(t *testing.T) { }) t.Run("decode-marks", func(t *testing.T) { - now := time.Now() - eventBase := modelpb.APMEvent{Timestamp: timestamppb.New(now)} + now := modelpb.FromTime(time.Now()) + eventBase := modelpb.APMEvent{Timestamp: now} input := modeldecoder.Input{Base: &eventBase} str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -215,8 +214,8 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out1, out2 modelpb.APMEvent - reqTime := time.Now().Add(time.Second) - out1.Timestamp = timestamppb.New(reqTime) + reqTime := modelpb.FromTime(time.Now().Add(time.Second)) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) mapToTransactionModel(&input, &out1) @@ -224,7 +223,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) // ensure memory is not shared by reusing input model - out2.Timestamp = timestamppb.New(reqTime) + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) mapToTransactionModel(&input, &out2) @@ -267,8 +266,8 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input span var out1, out2 modelpb.APMEvent - reqTime := time.Now().Add(time.Second) - out1.Timestamp = timestamppb.New(reqTime) + reqTime := modelpb.FromTime(time.Now().Add(time.Second)) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) mapToSpanModel(&input, &out1) @@ -276,7 +275,7 @@ func TestDecodeMapToTransactionModel(t *testing.T) { modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal) // ensure memory is not shared by reusing input model - out2.Timestamp = timestamppb.New(reqTime) + out2.Timestamp = reqTime otherVal := modeldecodertest.NonDefaultValues() modeldecodertest.SetStructValues(&input, otherVal) mapToSpanModel(&input, &out2) diff --git a/input/elasticapm/internal/modeldecoder/v2/decoder.go b/input/elasticapm/internal/modeldecoder/v2/decoder.go index 8b286d3d..7e9796ba 100644 --- a/input/elasticapm/internal/modeldecoder/v2/decoder.go +++ b/input/elasticapm/internal/modeldecoder/v2/decoder.go @@ -37,7 +37,6 @@ import ( "github.com/elastic/apm-data/input/otlp" "github.com/elastic/apm-data/model/modelpb" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -470,7 +469,7 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) { event.ParentId = from.ParentID.Val } if !from.Timestamp.Val.IsZero() { - event.Timestamp = timestamppb.New(from.Timestamp.Val) + event.Timestamp = modelpb.FromTime(from.Timestamp.Val) } if from.TraceID.IsSet() { event.Trace = &modelpb.Trace{ @@ -731,7 +730,7 @@ func mapToMetricsetModel(from *metricset, event *modelpb.APMEvent) bool { event.Metricset = &modelpb.Metricset{Name: "app"} if !from.Timestamp.Val.IsZero() { - event.Timestamp = timestamppb.New(from.Timestamp.Val) + event.Timestamp = modelpb.FromTime(from.Timestamp.Val) } if len(from.Samples) > 0 { @@ -1179,18 +1178,12 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) { out.Sync = &val } if !from.Timestamp.Val.IsZero() { - event.Timestamp = timestamppb.New(from.Timestamp.Val) + event.Timestamp = modelpb.FromTime(from.Timestamp.Val) } else if from.Start.IsSet() { // event.Timestamp should have been initialized to the time the // payload was received; offset that by "start" milliseconds for // RUM. - base := time.Time{} - if event.Timestamp != nil { - base = event.Timestamp.AsTime() - } - event.Timestamp = timestamppb.New(base.Add( - time.Duration(float64(time.Millisecond) * from.Start.Val), - )) + event.Timestamp += uint64(time.Duration(float64(time.Millisecond) * from.Start.Val).Nanoseconds()) } if from.TraceID.IsSet() { event.Trace = &modelpb.Trace{ @@ -1411,7 +1404,7 @@ func mapToTransactionModel(from *transaction, event *modelpb.APMEvent) { out.SpanCount.Started = &started } if !from.Timestamp.Val.IsZero() { - event.Timestamp = timestamppb.New(from.Timestamp.Val) + event.Timestamp = modelpb.FromTime(from.Timestamp.Val) } if from.TraceID.IsSet() { event.Trace = &modelpb.Trace{ @@ -1467,7 +1460,7 @@ func mapToLogModel(from *log, event *modelpb.APMEvent) { mapToFAASModel(from.FAAS, event.Faas) } if !from.Timestamp.Val.IsZero() { - event.Timestamp = timestamppb.New(from.Timestamp.Val) + event.Timestamp = modelpb.FromTime(from.Timestamp.Val) } if from.TraceID.IsSet() { event.Trace = &modelpb.Trace{ diff --git a/input/elasticapm/internal/modeldecoder/v2/error_test.go b/input/elasticapm/internal/modeldecoder/v2/error_test.go index 2dfe610d..b471c891 100644 --- a/input/elasticapm/internal/modeldecoder/v2/error_test.go +++ b/input/elasticapm/internal/modeldecoder/v2/error_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -47,10 +46,10 @@ func TestResetErrorOnRelease(t *testing.T) { func TestDecodeNestedError(t *testing.T) { t.Run("decode", func(t *testing.T) { - now := time.Now().UTC() + now := modelpb.FromTime(time.Now()) defaultVal := modeldecodertest.DefaultValues() _, eventBase := initializedInputMetadata(defaultVal) - eventBase.Timestamp = timestamppb.New(now) + eventBase.Timestamp = now input := modeldecoder.Input{Base: eventBase} str := `{"error":{"id":"a-b-c","timestamp":1599996822281000,"log":{"message":"abc"}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -58,7 +57,7 @@ func TestDecodeNestedError(t *testing.T) { require.NoError(t, DecodeNestedError(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Error) - assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime()) + assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000)), batch[0].Timestamp) assert.Empty(t, cmp.Diff(&modelpb.Error{ Id: "a-b-c", Log: &modelpb.ErrorLog{Message: "abc"}, @@ -69,7 +68,7 @@ func TestDecodeNestedError(t *testing.T) { batch = modelpb.Batch{} require.NoError(t, DecodeNestedError(dec, &input, &batch)) // if no timestamp is provided, leave base event time unmodified - assert.Equal(t, now, batch[0].Timestamp.AsTime()) + assert.Equal(t, now, batch[0].Timestamp) err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) require.Error(t, err) diff --git a/input/elasticapm/internal/modeldecoder/v2/log_test.go b/input/elasticapm/internal/modeldecoder/v2/log_test.go index c002808d..382f805a 100644 --- a/input/elasticapm/internal/modeldecoder/v2/log_test.go +++ b/input/elasticapm/internal/modeldecoder/v2/log_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -50,7 +49,7 @@ func TestDecodeNestedLog(t *testing.T) { require.NoError(t, DecodeNestedLog(dec, &input, &batch)) require.Len(t, batch, 1) assert.Equal(t, "something happened", batch[0].Message) - assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String()) + assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String()) assert.Equal(t, "trace-id", batch[0].Trace.Id) assert.Equal(t, "transaction-id", batch[0].Transaction.Id) assert.Equal(t, "warn", batch[0].Log.Level) @@ -68,13 +67,13 @@ func TestDecodeNestedLog(t *testing.T) { }) t.Run("withoutTimestamp", func(t *testing.T) { - now := time.Now().UTC() - input := modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: timestamppb.New(now)}} + now := modelpb.FromTime(time.Now()) + input := modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: now}} str := `{"log":{"message":"something happened"}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) var batch modelpb.Batch require.NoError(t, DecodeNestedLog(dec, &input, &batch)) - assert.Equal(t, now, batch[0].Timestamp.AsTime()) + assert.Equal(t, now, batch[0].Timestamp) }) t.Run("withError", func(t *testing.T) { @@ -84,7 +83,7 @@ func TestDecodeNestedLog(t *testing.T) { var batch modelpb.Batch require.NoError(t, DecodeNestedLog(dec, &input, &batch)) require.Len(t, batch, 1) - assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String()) + assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String()) assert.Equal(t, "trace-id", batch[0].Trace.Id) assert.Equal(t, "transaction-id", batch[0].Transaction.Id) assert.Equal(t, "error", batch[0].Log.Level) @@ -111,7 +110,7 @@ func TestDecodeNestedLog(t *testing.T) { var batch modelpb.Batch require.NoError(t, DecodeNestedLog(dec, &input, &batch)) require.Len(t, batch, 1) - assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String()) + assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String()) assert.Equal(t, "trace-id", batch[0].Trace.Id) assert.Equal(t, "transaction-id", batch[0].Transaction.Id) assert.Equal(t, "error", batch[0].Log.Level) @@ -138,7 +137,7 @@ func TestDecodeNestedLog(t *testing.T) { var batch modelpb.Batch require.NoError(t, DecodeNestedLog(dec, &input, &batch)) require.Len(t, batch, 1) - assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String()) + assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String()) assert.Equal(t, "trace-id", batch[0].Trace.Id) assert.Equal(t, "transaction-id", batch[0].Transaction.Id) assert.Equal(t, "error", batch[0].Log.Level) diff --git a/input/elasticapm/internal/modeldecoder/v2/metricset_test.go b/input/elasticapm/internal/modeldecoder/v2/metricset_test.go index 617286bc..bb257287 100644 --- a/input/elasticapm/internal/modeldecoder/v2/metricset_test.go +++ b/input/elasticapm/internal/modeldecoder/v2/metricset_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -47,10 +46,10 @@ func TestResetMetricsetOnRelease(t *testing.T) { func TestDecodeNestedMetricset(t *testing.T) { t.Run("decode", func(t *testing.T) { - now := time.Now() + now := modelpb.FromTime(time.Now()) defaultVal := modeldecodertest.DefaultValues() _, eventBase := initializedInputMetadata(defaultVal) - eventBase.Timestamp = timestamppb.New(now) + eventBase.Timestamp = now input := modeldecoder.Input{Base: eventBase} str := `{"metricset":{"timestamp":1599996822281000,"samples":{"a.b":{"value":2048}}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -58,7 +57,7 @@ func TestDecodeNestedMetricset(t *testing.T) { require.NoError(t, DecodeNestedMetricset(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Metricset) - assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime()) + assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000).UTC()), batch[0].Timestamp) assert.Empty(t, cmp.Diff(&modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{{ @@ -74,7 +73,7 @@ func TestDecodeNestedMetricset(t *testing.T) { require.NoError(t, DecodeNestedMetricset(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Metricset) - assert.Equal(t, now.UTC(), batch[0].Timestamp.AsTime()) + assert.Equal(t, now, batch[0].Timestamp) // invalid type err := DecodeNestedMetricset(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) @@ -257,7 +256,6 @@ func TestDecodeMetricsetInternal(t *testing.T) { require.NoError(t, err) assert.Empty(t, cmp.Diff(modelpb.Batch{{ - Timestamp: timestamppb.New(time.Unix(0, 0).UTC()), Metricset: &modelpb.Metricset{ Name: "span_breakdown", }, @@ -311,7 +309,6 @@ func TestDecodeMetricsetServiceName(t *testing.T) { require.NoError(t, err) assert.Empty(t, cmp.Diff(modelpb.Batch{{ - Timestamp: timestamppb.New(time.Unix(0, 0).UTC()), Metricset: &modelpb.Metricset{ Name: "span_breakdown", }, @@ -370,7 +367,6 @@ func TestDecodeMetricsetServiceNameAndVersion(t *testing.T) { require.NoError(t, err) assert.Empty(t, cmp.Diff(modelpb.Batch{{ - Timestamp: timestamppb.New(time.Unix(0, 0).UTC()), Metricset: &modelpb.Metricset{ Name: "span_breakdown", }, @@ -429,7 +425,6 @@ func TestDecodeMetricsetServiceVersion(t *testing.T) { require.NoError(t, err) assert.Empty(t, cmp.Diff(modelpb.Batch{{ - Timestamp: timestamppb.New(time.Unix(0, 0).UTC()), Metricset: &modelpb.Metricset{ Name: "span_breakdown", }, diff --git a/input/elasticapm/internal/modeldecoder/v2/span_test.go b/input/elasticapm/internal/modeldecoder/v2/span_test.go index 4c1e34ee..7ad54fbf 100644 --- a/input/elasticapm/internal/modeldecoder/v2/span_test.go +++ b/input/elasticapm/internal/modeldecoder/v2/span_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -59,7 +58,7 @@ func TestDecodeNestedSpan(t *testing.T) { require.NoError(t, DecodeNestedSpan(dec, &input, &batch)) require.Len(t, batch, 1) require.NotNil(t, batch[0].Span) - assert.Equal(t, time.Time{}.Add(143*time.Millisecond), batch[0].Timestamp.AsTime()) + assert.Equal(t, eventBase.Timestamp+uint64((143*time.Millisecond).Nanoseconds()), batch[0].Timestamp) assert.Equal(t, 100*time.Millisecond, batch[0].Event.Duration.AsDuration()) assert.Equal(t, "parent-123", batch[0].ParentId, protocmp.Transform()) assert.Equal(t, &modelpb.Trace{Id: "trace-ab"}, batch[0].Trace, protocmp.Transform()) @@ -185,17 +184,17 @@ func TestDecodeMapToSpanModel(t *testing.T) { var input span var out modelpb.APMEvent reqTime := time.Now().Add(time.Hour).UTC() - out.Timestamp = timestamppb.New(reqTime) + out.Timestamp = modelpb.FromTime(reqTime) input.Start.Set(20.5) mapToSpanModel(&input, &out) timestamp := reqTime.Add(time.Duration(input.Start.Val * float64(time.Millisecond))).UTC() - assert.Equal(t, timestamp, out.Timestamp.AsTime()) + assert.Equal(t, modelpb.FromTime(timestamp), out.Timestamp) // leave base event timestamp unmodified if neither event timestamp nor start is specified - out = modelpb.APMEvent{Timestamp: timestamppb.New(reqTime)} + out = modelpb.APMEvent{Timestamp: modelpb.FromTime(reqTime)} input.Start.Reset() mapToSpanModel(&input, &out) - assert.Equal(t, reqTime, out.Timestamp.AsTime()) + assert.Equal(t, modelpb.FromTime(reqTime), out.Timestamp) }) t.Run("sample-rate", func(t *testing.T) { diff --git a/input/elasticapm/internal/modeldecoder/v2/transaction_test.go b/input/elasticapm/internal/modeldecoder/v2/transaction_test.go index c03c575c..8747e2b2 100644 --- a/input/elasticapm/internal/modeldecoder/v2/transaction_test.go +++ b/input/elasticapm/internal/modeldecoder/v2/transaction_test.go @@ -33,7 +33,6 @@ import ( "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/structpb" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" "github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder" @@ -53,7 +52,7 @@ func TestResetTransactionOnRelease(t *testing.T) { func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { - now := time.Now().UTC() + now := modelpb.FromTime(time.Now()) input := modeldecoder.Input{Base: &modelpb.APMEvent{}} str := `{"transaction":{"duration":100,"timestamp":1599996822281000,"id":"100","trace_id":"1","type":"request","span_count":{"started":2}}}` dec := decoder.NewJSONDecoder(strings.NewReader(str)) @@ -63,15 +62,15 @@ func TestDecodeNestedTransaction(t *testing.T) { require.Len(t, batch, 1) require.NotNil(t, batch[0].Transaction) assert.Equal(t, "request", batch[0].Transaction.Type) - assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", batch[0].Timestamp.AsTime().String()) + assert.Equal(t, "2020-09-13 11:33:42.281 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String()) - input = modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: timestamppb.New(now)}} + input = modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: now}} str = `{"transaction":{"duration":100,"id":"100","trace_id":"1","type":"request","span_count":{"started":2}}}` dec = decoder.NewJSONDecoder(strings.NewReader(str)) batch = modelpb.Batch{} require.NoError(t, DecodeNestedTransaction(dec, &input, &batch)) // if no timestamp is provided, fall back to base event timestamp - assert.Equal(t, now, batch[0].Timestamp.AsTime()) + assert.Equal(t, now, batch[0].Timestamp) err := DecodeNestedTransaction(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch) require.Error(t, err) @@ -296,8 +295,8 @@ func TestDecodeMapToTransactionModel(t *testing.T) { var input transaction var out1, out2 modelpb.APMEvent - reqTime := time.Now().Add(time.Second) - out1.Timestamp = timestamppb.New(reqTime) + reqTime := modelpb.FromTime(time.Now().Add(time.Second)) + out1.Timestamp = reqTime defaultVal := modeldecodertest.DefaultValues() modeldecodertest.SetStructValues(&input, defaultVal) input.OTel.Reset() @@ -306,13 +305,13 @@ func TestDecodeMapToTransactionModel(t *testing.T) { modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal) // leave base event timestamp unmodified if event timestamp is unspecified - out1.Timestamp = timestamppb.New(reqTime) + out1.Timestamp = reqTime mapToTransactionModel(&input, &out1) - assert.Equal(t, reqTime.UTC(), out1.Timestamp.AsTime()) + assert.Equal(t, reqTime, out1.Timestamp) input.Reset() // ensure memory is not shared by reusing input model - out2.Timestamp = timestamppb.New(reqTime) + out2.Timestamp = reqTime modeldecodertest.SetStructValues(&input, defaultVal) mapToTransactionModel(&input, &out1) input.Reset() diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index 24c47f2e..a7aa34e7 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/semaphore" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/model/modelpb" ) @@ -292,7 +291,7 @@ func TestHandleStreamBaseEvent(t *testing.T) { requestTimestamp := time.Date(2018, 8, 1, 10, 0, 0, 0, time.UTC) baseEvent := modelpb.APMEvent{ - Timestamp: timestamppb.New(requestTimestamp), + Timestamp: modelpb.FromTime(requestTimestamp), UserAgent: &modelpb.UserAgent{Original: "rum-2.0"}, Source: &modelpb.Source{Ip: modelpb.MustParseIP("192.0.0.1")}, Client: &modelpb.Client{Ip: modelpb.MustParseIP("192.0.0.2")}, // X-Forwarded-For @@ -320,7 +319,7 @@ func TestHandleStreamBaseEvent(t *testing.T) { assert.Equal(t, "rum-2.0", events[0].UserAgent.Original) assert.Equal(t, baseEvent.Source, events[0].Source) assert.Equal(t, baseEvent.Client, events[0].Client) - assert.Equal(t, requestTimestamp.Add(50*time.Millisecond), events[0].Timestamp.AsTime()) // span's start is "50" + assert.Equal(t, modelpb.FromTime(requestTimestamp.Add(50*time.Millisecond)), events[0].Timestamp) // span's start is "50" } func TestLabelLeak(t *testing.T) { diff --git a/input/otlp/exceptions_test.go b/input/otlp/exceptions_test.go index 59fdfc49..f15193fc 100644 --- a/input/otlp/exceptions_test.go +++ b/input/otlp/exceptions_test.go @@ -45,7 +45,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/model/modelpb" ) @@ -114,7 +113,7 @@ Caused by: LowLevelException out := cmp.Diff([]*modelpb.APMEvent{{ Service: service, Agent: agent, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Labels: modelpb.Labels{}, NumericLabels: modelpb.NumericLabels{}, Trace: transactionEvent.Trace, @@ -169,7 +168,7 @@ Caused by: LowLevelException }, { Service: service, Agent: agent, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Labels: modelpb.Labels{}, NumericLabels: modelpb.NumericLabels{}, Trace: transactionEvent.Trace, @@ -333,7 +332,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { out := cmp.Diff(&modelpb.APMEvent{ Service: service, Agent: agent, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Labels: modelpb.Labels{}, NumericLabels: modelpb.NumericLabels{}, Trace: transactionEvent.Trace, diff --git a/input/otlp/logs.go b/input/otlp/logs.go index 1ffca4d4..ff081166 100644 --- a/input/otlp/logs.go +++ b/input/otlp/logs.go @@ -43,7 +43,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/model/modelpb" ) @@ -69,7 +68,7 @@ func (c *Consumer) convertResourceLogs(resourceLogs plog.ResourceLogs, receiveTi resource := resourceLogs.Resource() baseEvent := modelpb.APMEvent{ Event: &modelpb.Event{ - Received: timestamppb.New(receiveTimestamp), + Received: modelpb.FromTime(receiveTimestamp), }, } translateResourceMetadata(resource, &baseEvent) @@ -103,7 +102,7 @@ func (c *Consumer) convertLogRecord( ) *modelpb.APMEvent { event := baseEvent.CloneVT() initEventLabels(event) - event.Timestamp = timestamppb.New(record.Timestamp().AsTime().Add(timeDelta)) + event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) event.Event = populateNil(event.Event) event.Event.Severity = uint64(record.SeverityNumber()) event.Log = populateNil(event.Log) diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index 54f843c3..bcb247f4 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -47,7 +47,6 @@ import ( "golang.org/x/sync/semaphore" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/otlp" "github.com/elastic/apm-data/model/modelpb" @@ -185,8 +184,8 @@ func TestConsumerConsumeLogs(t *testing.T) { panic("already processes batch") } processed = *batch - assert.NotNil(t, processed[0].Timestamp) - processed[0].Timestamp = nil + assert.NotZero(t, processed[0].Timestamp) + processed[0].Timestamp = 0 return nil } consumer := otlp.NewConsumer(otlp.ConsumerConfig{ @@ -195,10 +194,10 @@ func TestConsumerConsumeLogs(t *testing.T) { }) assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) - now := time.Now().Unix() + now := modelpb.FromTime(time.Now()) for _, e := range processed { - assert.InDelta(t, now, e.Event.Received.AsTime().Unix(), 2) - e.Event.Received = nil + assert.InDelta(t, now, e.Event.Received, float64((2 * time.Second).Nanoseconds())) + e.Event.Received = 0 } expected := proto.Clone(&commonEvent).(*modelpb.APMEvent) @@ -285,8 +284,8 @@ Caused by: LowLevelException panic("already processes batch") } processed = *batch - assert.NotNil(t, processed[0].Timestamp) - processed[0].Timestamp = nil + assert.NotZero(t, processed[0].Timestamp) + processed[0].Timestamp = 0 return nil } consumer := otlp.NewConsumer(otlp.ConsumerConfig{ @@ -295,16 +294,16 @@ Caused by: LowLevelException }) assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) - now := time.Now().Unix() + now := modelpb.FromTime(time.Now()) for _, e := range processed { - assert.InDelta(t, now, e.Event.Received.AsTime().Unix(), 2) - e.Event.Received = nil + assert.InDelta(t, now, e.Event.Received, float64((2 * time.Second).Nanoseconds())) + e.Event.Received = 0 } assert.Len(t, processed, 2) assert.Equal(t, modelpb.Labels{"key0": {Global: true, Value: "zero"}, "key1": {Value: "one"}}, modelpb.Labels(processed[0].Labels)) assert.Empty(t, processed[0].NumericLabels) - processed[1].Timestamp = nil + processed[1].Timestamp = 0 out := cmp.Diff(&modelpb.APMEvent{ Service: &modelpb.Service{ Name: "unknown", @@ -435,8 +434,8 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) { panic("already processes batch") } processed = *batch - assert.NotNil(t, processed[0].Timestamp) - processed[0].Timestamp = timestamppb.New(time.Time{}) + assert.NotZero(t, processed[0].Timestamp) + processed[0].Timestamp = 0 return nil } consumer := otlp.NewConsumer(otlp.ConsumerConfig{ @@ -478,8 +477,8 @@ func TestConsumerConsumeLogsLabels(t *testing.T) { panic("already processes batch") } processed = *batch - assert.NotNil(t, processed[0].Timestamp) - processed[0].Timestamp = timestamppb.New(time.Time{}) + assert.NotZero(t, processed[0].Timestamp) + processed[0].Timestamp = 0 return nil } consumer := otlp.NewConsumer(otlp.ConsumerConfig{ diff --git a/input/otlp/metadata_test.go b/input/otlp/metadata_test.go index 5ced6db3..52b16aca 100644 --- a/input/otlp/metadata_test.go +++ b/input/otlp/metadata_test.go @@ -299,6 +299,6 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]interface{ (*events)[0].Span = nil (*events)[0].Trace = nil (*events)[0].Event = nil - (*events)[0].Timestamp = nil + (*events)[0].Timestamp = 0 return (*events)[0] } diff --git a/input/otlp/metrics.go b/input/otlp/metrics.go index aff21e0e..b6658e44 100644 --- a/input/otlp/metrics.go +++ b/input/otlp/metrics.go @@ -44,7 +44,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/model/modelpb" ) @@ -75,7 +74,7 @@ func (c *Consumer) convertMetrics(metrics pmetric.Metrics, receiveTimestamp time func (c *Consumer) convertResourceMetrics(resourceMetrics pmetric.ResourceMetrics, receiveTimestamp time.Time, out *modelpb.Batch) { baseEvent := modelpb.APMEvent{ Event: &modelpb.Event{ - Received: timestamppb.New(receiveTimestamp), + Received: modelpb.FromTime(receiveTimestamp), }, } @@ -107,7 +106,7 @@ func (c *Consumer) convertScopeMetrics( } for key, ms := range ms { event := baseEvent.CloneVT() - event.Timestamp = timestamppb.New(key.timestamp.Add(timeDelta)) + event.Timestamp = modelpb.FromTime(key.timestamp.Add(timeDelta)) metrs := make([]*modelpb.MetricsetSample, 0, len(ms.samples)) for _, s := range ms.samples { metrs = append(metrs, s) diff --git a/input/otlp/metrics_test.go b/input/otlp/metrics_test.go index cbf7efc9..df9122fa 100644 --- a/input/otlp/metrics_test.go +++ b/input/otlp/metrics_test.go @@ -50,7 +50,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "golang.org/x/sync/semaphore" "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/input/otlp" "github.com/elastic/apm-data/model/modelpb" @@ -141,7 +140,7 @@ func TestConsumeMetrics(t *testing.T) { expected := []*modelpb.APMEvent{{ Agent: &agent, Service: &service, - Timestamp: timestamppb.New(timestamp0), + Timestamp: modelpb.FromTime(timestamp0), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -168,7 +167,7 @@ func TestConsumeMetrics(t *testing.T) { }, { Agent: &agent, Service: &service, - Timestamp: timestamppb.New(timestamp1), + Timestamp: modelpb.FromTime(timestamp1), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -179,7 +178,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"k": {Value: "v"}}, - Timestamp: timestamppb.New(timestamp1), + Timestamp: modelpb.FromTime(timestamp1), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -191,7 +190,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"k": {Value: "v2"}}, - Timestamp: timestamppb.New(timestamp1), + Timestamp: modelpb.FromTime(timestamp1), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -203,7 +202,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"k2": {Value: "v"}}, - Timestamp: timestamppb.New(timestamp1), + Timestamp: modelpb.FromTime(timestamp1), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -342,7 +341,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "idle"}, "cpu": {Value: "0"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -357,7 +356,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "system"}, "cpu": {Value: "0"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -372,7 +371,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "user"}, "cpu": {Value: "0"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -387,7 +386,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "idle"}, "cpu": {Value: "1"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -402,7 +401,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "system"}, "cpu": {Value: "1"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -417,7 +416,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "user"}, "cpu": {Value: "1"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -432,7 +431,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "idle"}, "cpu": {Value: "2"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -447,7 +446,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "system"}, "cpu": {Value: "2"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -462,7 +461,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "user"}, "cpu": {Value: "2"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -477,7 +476,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "idle"}, "cpu": {Value: "3"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -492,7 +491,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "system"}, "cpu": {Value: "3"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -507,7 +506,7 @@ func TestConsumeMetricsHostCPU(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "user"}, "cpu": {Value: "3"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -552,7 +551,7 @@ func TestConsumeMetricsHostMemory(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "free"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -567,7 +566,7 @@ func TestConsumeMetricsHostMemory(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"state": {Value: "used"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -631,7 +630,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"type": {Value: "heap"}, "pool": {Value: "G1 Eden Space"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -646,7 +645,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Agent: &agent, Service: &service, Labels: modelpb.Labels{"action": {Value: "end of minor GC"}, "gc": {Value: "G1 Young Generation"}}, - Timestamp: timestamppb.New(timestamp), + Timestamp: modelpb.FromTime(timestamp), Metricset: &modelpb.Metricset{ Name: "app", Samples: []*modelpb.MetricsetSample{ @@ -675,7 +674,7 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) { // Use a large delta so that we can allow for a significant amount of // delay in the test environment affecting the timestamp adjustment. const timeDelta = time.Hour - const allowedError = 5 // seconds + const allowedError = 5 * time.Second // seconds now := time.Now() exportTimestamp := now.Add(-timeDelta) @@ -695,7 +694,7 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) { events, _ := transformMetrics(t, metrics) require.Len(t, events, 1) - assert.InDelta(t, now.Add(dataPointOffset).Unix(), events[0].Timestamp.AsTime().Unix(), allowedError) + assert.InDelta(t, modelpb.FromTime(now.Add(dataPointOffset)), events[0].Timestamp, float64(allowedError.Nanoseconds())) for _, e := range events { // telemetry.sdk.elastic_export_timestamp should not be sent as a label. @@ -743,10 +742,10 @@ func eventsMatch(t *testing.T, expected []*modelpb.APMEvent, actual []*modelpb.A return strings.Compare(actual[i].String(), actual[j].String()) == -1 }) - now := time.Now().Unix() + now := modelpb.FromTime(time.Now()) for i, e := range actual { - assert.InDelta(t, now, e.Event.Received.AsTime().Unix(), 2) - e.Event.Received = nil + assert.InDelta(t, now, e.Event.Received, float64((2 * time.Second).Nanoseconds())) + e.Event.Received = 0 if expected[i].Event == nil { e.Event = nil } diff --git a/input/otlp/traces.go b/input/otlp/traces.go index 0cdc8eb2..23beef7e 100644 --- a/input/otlp/traces.go +++ b/input/otlp/traces.go @@ -52,7 +52,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/apm-data/model/modelpb" ) @@ -100,7 +99,7 @@ func (c *Consumer) convertResourceSpans( ) { baseEvent := modelpb.APMEvent{ Event: &modelpb.Event{ - Received: timestamppb.New(receiveTimestamp), + Received: modelpb.FromTime(receiveTimestamp), }, } var timeDelta time.Duration @@ -154,7 +153,7 @@ func (c *Consumer) convertSpan( representativeCount := getRepresentativeCountFromTracestateHeader(otelSpan.TraceState().AsRaw()) event := baseEvent.CloneVT() initEventLabels(event) - event.Timestamp = timestamppb.New(startTime.Add(timeDelta)) + event.Timestamp = modelpb.FromTime(startTime.Add(timeDelta)) if id := hexTraceID(otelSpan.TraceID()); id != "" { event.Trace = &modelpb.Trace{ Id: id, @@ -888,7 +887,7 @@ func (c *Consumer) convertSpanEvent( initEventLabels(event) event.Transaction = nil // populate fields as required from parent event.Span = nil // populate fields as required from parent - event.Timestamp = timestamppb.New(spanEvent.Timestamp().AsTime().Add(timeDelta)) + event.Timestamp = modelpb.FromTime(spanEvent.Timestamp().AsTime().Add(timeDelta)) isJaeger := strings.HasPrefix(parent.Agent.Name, "Jaeger") if isJaeger { diff --git a/input/otlp/traces_test.go b/input/otlp/traces_test.go index faec12c2..6e0fdb9b 100644 --- a/input/otlp/traces_test.go +++ b/input/otlp/traces_test.go @@ -774,7 +774,7 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { // Use a large delta so that we can allow for a significant amount of // delay in the test environment affecting the timestamp adjustment. const timeDelta = time.Hour - const allowedError = 5 // seconds + const allowedError = 5 * time.Second // seconds now := time.Now() exportTimestamp := now.Add(-timeDelta) @@ -815,9 +815,9 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { require.Len(t, *batch, 3) // Give some leeway for one event, and check other events' timestamps relative to that one. - assert.InDelta(t, now.Add(transactionOffset).Unix(), (*batch)[0].Timestamp.AsTime().Unix(), allowedError) - assert.Equal(t, spanOffset-transactionOffset, (*batch)[1].Timestamp.AsTime().Sub((*batch)[0].Timestamp.AsTime())) - assert.Equal(t, exceptionOffset-transactionOffset, (*batch)[2].Timestamp.AsTime().Sub((*batch)[0].Timestamp.AsTime())) + assert.InDelta(t, modelpb.FromTime(now.Add(transactionOffset)), (*batch)[0].Timestamp, float64(allowedError.Nanoseconds())) + assert.Equal(t, uint64((spanOffset - transactionOffset).Nanoseconds()), (*batch)[1].Timestamp-(*batch)[0].Timestamp) + assert.Equal(t, uint64((exceptionOffset - transactionOffset).Nanoseconds()), (*batch)[2].Timestamp-(*batch)[0].Timestamp) // Durations should be unaffected. assert.Equal(t, transactionDuration, (*batch)[0].GetEvent().GetDuration().AsDuration()) @@ -831,7 +831,7 @@ func TestConsumeTracesExportTimestamp(t *testing.T) { func TestConsumeTracesEventReceived(t *testing.T) { traces, otelSpans := newTracesSpans() - now := time.Now() + now := modelpb.FromTime(time.Now()) otelSpan1 := otelSpans.Spans().AppendEmpty() otelSpan1.SetTraceID(pcommon.TraceID{1}) @@ -840,8 +840,8 @@ func TestConsumeTracesEventReceived(t *testing.T) { batch := transformTraces(t, traces) require.Len(t, *batch, 1) - const allowedDelta = 2 // seconds - require.InDelta(t, now.Unix(), (*batch)[0].Event.Received.AsTime().Unix(), allowedDelta) + const allowedDelta = 2 * time.Second // seconds + require.InDelta(t, now, (*batch)[0].Event.Received, float64(allowedDelta.Nanoseconds())) } func TestSpanLinks(t *testing.T) { diff --git a/model/modeljson/apmevent.pb.json.go b/model/modeljson/apmevent.pb.json.go index 4fd26663..1a50583a 100644 --- a/model/modeljson/apmevent.pb.json.go +++ b/model/modeljson/apmevent.pb.json.go @@ -51,7 +51,7 @@ func MarshalAPMEvent(e *modelpb.APMEvent, w *fastjson.Writer) error { } doc := modeljson.Document{ - Timestamp: modeljson.Time(e.Timestamp.AsTime()), + Timestamp: modeljson.Time(modelpb.ToTime(e.Timestamp)), Labels: labels, NumericLabels: numericLabels, Message: e.Message, @@ -100,10 +100,10 @@ func MarshalAPMEvent(e *modelpb.APMEvent, w *fastjson.Writer) error { // // TODO(axw) change @timestamp to use date_nanos, and remove this field. var timestampStruct modeljson.Timestamp - if e.Timestamp != nil && !e.Timestamp.AsTime().IsZero() { + if e.Timestamp != 0 { switch e.Type() { case modelpb.TransactionEventType, modelpb.SpanEventType, modelpb.ErrorEventType: - timestampStruct.US = int(e.Timestamp.AsTime().UnixNano() / 1000) + timestampStruct.US = int(e.Timestamp / 1000) doc.TimestampStruct = ×tampStruct } } diff --git a/model/modeljson/apmevent.pb.json_test.go b/model/modeljson/apmevent.pb.json_test.go index 1de5b7e2..1b9cfe4f 100644 --- a/model/modeljson/apmevent.pb.json_test.go +++ b/model/modeljson/apmevent.pb.json_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/require" "go.elastic.co/fastjson" durationpb "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) func TestFullEvent(t *testing.T) { @@ -49,7 +48,7 @@ func BenchmarkAPMEventToJSON(b *testing.B) { func fullEvent(t testing.TB) *modelpb.APMEvent { return &modelpb.APMEvent{ - Timestamp: timestamppb.New(time.Unix(1, 1)), + Timestamp: uint64(time.Second.Nanoseconds() + 1), Span: &modelpb.Span{ Message: &modelpb.Message{ Body: "body", diff --git a/model/modeljson/event.pb.json.go b/model/modeljson/event.pb.json.go index 22bfb201..236820b6 100644 --- a/model/modeljson/event.pb.json.go +++ b/model/modeljson/event.pb.json.go @@ -39,7 +39,7 @@ func EventModelJSON(e *modelpb.Event, out *modeljson.Event) { Sum: e.SuccessCount.Sum, } } - if e.Received.IsValid() { - out.Received = modeljson.Time(e.Received.AsTime()) + if e.Received != 0 { + out.Received = modeljson.Time(modelpb.ToTime(e.Received)) } } diff --git a/model/modeljson/event.pb.json_test.go b/model/modeljson/event.pb.json_test.go index 458592ad..6e9f21ce 100644 --- a/model/modeljson/event.pb.json_test.go +++ b/model/modeljson/event.pb.json_test.go @@ -26,7 +26,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) const ( @@ -59,7 +58,7 @@ func TestEventToModelJSON(t *testing.T) { }, Duration: durationpb.New(3 * time.Second), Severity: 4, - Received: timestamppb.New(now), + Received: modelpb.FromTime(now), }, expected: &modeljson.Event{ Outcome: "outcome", diff --git a/model/modelpb/timestamp.go b/model/modelpb/timestamp.go new file mode 100644 index 00000000..63c5cab2 --- /dev/null +++ b/model/modelpb/timestamp.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelpb + +import "time" + +// FromTime converts a time.Time to uint64 nanoseconds since Unix epoch. +func FromTime(t time.Time) uint64 { + return uint64(t.UnixNano()) +} + +// ToTime converts uint64 nanoseconds since Unix epoch to a time.Time. +func ToTime(v uint64) time.Time { + return time.Unix(0, int64(v)).UTC() +}