Skip to content

Commit

Permalink
feat: update protobuf models to use uint64 for timestamps (#136)
Browse files Browse the repository at this point in the history
* feat: update protobuf models to use uint64

* feat: regenerate protobuf models

* feat: update code use new uint64

* fix: support larger time range

Co-authored-by: Andrew Wilkins <[email protected]>

---------

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
kruskall and axw authored Aug 9, 2023
1 parent 5085cc4 commit a636758
Show file tree
Hide file tree
Showing 31 changed files with 430 additions and 555 deletions.
3 changes: 1 addition & 2 deletions codec/fullevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (

"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func fullEvent(t *testing.B) *modelpb.APMEvent {
return &modelpb.APMEvent{
Timestamp: timestamppb.New(time.Unix(1, 1)),
Timestamp: uint64(time.Second.Nanoseconds() + 1),
Span: &modelpb.Span{
Message: &modelpb.Message{
Body: "body",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
switch fKind := f.Kind(); fKind {
case reflect.String:
fieldVal = reflect.ValueOf(values.Str)
case reflect.Int, reflect.Int32, reflect.Int64:
case reflect.Int, reflect.Int32, reflect.Int64, reflect.Uint64:
fieldVal = reflect.ValueOf(values.Int).Convert(f.Type())
case reflect.Float64:
fieldVal = reflect.ValueOf(values.Float).Convert(f.Type())
Expand Down
7 changes: 2 additions & 5 deletions input/elasticapm/internal/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand Down Expand Up @@ -248,7 +247,7 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) {
event.ParentId = from.ParentID.Val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -628,9 +627,7 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) {
if from.Start.IsSet() {
// event.Timestamp is initialized to the time the payload was
// received; offset that by "start" milliseconds for RUM.
event.Timestamp = timestamppb.New(event.Timestamp.AsTime().Add(
time.Duration(float64(time.Millisecond) * from.Start.Val),
))
event.Timestamp += uint64(time.Duration(float64(time.Millisecond) * from.Start.Val).Nanoseconds())
}
}

Expand Down
17 changes: 8 additions & 9 deletions input/elasticapm/internal/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,17 +46,17 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.FromTime(time.Now())
eventBase := initializedMetadata()
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000)), batch[0].Timestamp)
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{
Expand All @@ -72,7 +71,7 @@ func TestDecodeNestedError(t *testing.T) {
dec = decoder.NewJSONDecoder(strings.NewReader(str))
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

// test decode
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
Expand Down Expand Up @@ -146,24 +145,24 @@ func TestDecodeMapToErrorModel(t *testing.T) {
}
var input errorEvent
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second).UTC()
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.FromTime(time.Now().Add(time.Second))
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToErrorModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal)

// leave event timestamp unmodified if eventTime is zero
out1.Timestamp = timestamppb.New(reqTime)
out1.Timestamp = reqTime
modeldecodertest.SetStructValues(&input, defaultVal)
mapToErrorModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal)

// reuse input model for different event
// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToErrorModel(&input, &out2)
Expand Down
29 changes: 14 additions & 15 deletions input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,9 +46,9 @@ func TestResetTransactionOnRelease(t *testing.T) {

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.FromTime(time.Now())
eventBase := initializedMetadata()
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"ysc":{"v":5}},"y":{"t":"span_type","su":"span_subtype"}}]}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
Expand All @@ -62,7 +61,7 @@ func TestDecodeNestedTransaction(t *testing.T) {

assert.Equal(t, "request", batch[0].Transaction.Type)
// fall back to request time
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

// Ensure nested metricsets are decoded. RUMv3 only sends
// breakdown metrics, so the Metricsets will be empty and
Expand All @@ -78,11 +77,11 @@ func TestDecodeNestedTransaction(t *testing.T) {
Subtype: "span_subtype",
SelfTime: &modelpb.AggregatedDuration{Count: 5},
}, batch[1].Span, protocmp.Transform()))
assert.Equal(t, now, batch[1].Timestamp.AsTime())
assert.Equal(t, now, batch[1].Timestamp)

// ensure nested spans are decoded
start := time.Duration(20 * 1000 * 1000)
assert.Equal(t, now.Add(start), batch[2].Timestamp.AsTime()) // add start to timestamp
start := uint64(time.Duration(20 * 1000 * 1000).Nanoseconds())
assert.Equal(t, now+start, batch[2].Timestamp) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.Id)
assert.Equal(t, "1", batch[2].Trace.Id)
assert.Equal(t, "100", batch[2].ParentId)
Expand All @@ -93,8 +92,8 @@ func TestDecodeNestedTransaction(t *testing.T) {
})

t.Run("decode-marks", func(t *testing.T) {
now := time.Now()
eventBase := modelpb.APMEvent{Timestamp: timestamppb.New(now)}
now := modelpb.FromTime(time.Now())
eventBase := modelpb.APMEvent{Timestamp: now}
input := modeldecoder.Input{Base: &eventBase}
str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
Expand Down Expand Up @@ -215,16 +214,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) {

var input transaction
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second)
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.FromTime(time.Now().Add(time.Second))
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToTransactionModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToTransactionModel(&input, &out2)
Expand Down Expand Up @@ -267,16 +266,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) {

var input span
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second)
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.FromTime(time.Now().Add(time.Second))
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToSpanModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToSpanModel(&input, &out2)
Expand Down
19 changes: 6 additions & 13 deletions input/elasticapm/internal/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/elastic/apm-data/input/otlp"
"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -470,7 +469,7 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) {
event.ParentId = from.ParentID.Val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -731,7 +730,7 @@ func mapToMetricsetModel(from *metricset, event *modelpb.APMEvent) bool {
event.Metricset = &modelpb.Metricset{Name: "app"}

if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}

if len(from.Samples) > 0 {
Expand Down Expand Up @@ -1179,18 +1178,12 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) {
out.Sync = &val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
} else if from.Start.IsSet() {
// event.Timestamp should have been initialized to the time the
// payload was received; offset that by "start" milliseconds for
// RUM.
base := time.Time{}
if event.Timestamp != nil {
base = event.Timestamp.AsTime()
}
event.Timestamp = timestamppb.New(base.Add(
time.Duration(float64(time.Millisecond) * from.Start.Val),
))
event.Timestamp += uint64(time.Duration(float64(time.Millisecond) * from.Start.Val).Nanoseconds())
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -1411,7 +1404,7 @@ func mapToTransactionModel(from *transaction, event *modelpb.APMEvent) {
out.SpanCount.Started = &started
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -1467,7 +1460,7 @@ func mapToLogModel(from *log, event *modelpb.APMEvent) {
mapToFAASModel(from.FAAS, event.Faas)
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
event.Timestamp = modelpb.FromTime(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down
9 changes: 4 additions & 5 deletions input/elasticapm/internal/modeldecoder/v2/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,18 +46,18 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.FromTime(time.Now())
defaultVal := modeldecodertest.DefaultValues()
_, eventBase := initializedInputMetadata(defaultVal)
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"error":{"id":"a-b-c","timestamp":1599996822281000,"log":{"message":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Equal(t, modelpb.FromTime(time.Unix(1599996822, 281000000)), batch[0].Timestamp)
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{Message: "abc"},
Expand All @@ -69,7 +68,7 @@ func TestDecodeNestedError(t *testing.T) {
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
// if no timestamp is provided, leave base event time unmodified
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
require.Error(t, err)
Expand Down
15 changes: 7 additions & 8 deletions input/elasticapm/internal/modeldecoder/v2/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -50,7 +49,7 @@ func TestDecodeNestedLog(t *testing.T) {
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "something happened", batch[0].Message)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "warn", batch[0].Log.Level)
Expand All @@ -68,13 +67,13 @@ func TestDecodeNestedLog(t *testing.T) {
})

t.Run("withoutTimestamp", func(t *testing.T) {
now := time.Now().UTC()
input := modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: timestamppb.New(now)}}
now := modelpb.FromTime(time.Now())
input := modeldecoder.Input{Base: &modelpb.APMEvent{Timestamp: now}}
str := `{"log":{"message":"something happened"}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)
})

t.Run("withError", func(t *testing.T) {
Expand All @@ -84,7 +83,7 @@ func TestDecodeNestedLog(t *testing.T) {
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "error", batch[0].Log.Level)
Expand All @@ -111,7 +110,7 @@ func TestDecodeNestedLog(t *testing.T) {
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "error", batch[0].Log.Level)
Expand All @@ -138,7 +137,7 @@ func TestDecodeNestedLog(t *testing.T) {
var batch modelpb.Batch
require.NoError(t, DecodeNestedLog(dec, &input, &batch))
require.Len(t, batch, 1)
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", batch[0].Timestamp.AsTime().String())
assert.Equal(t, "2022-09-08 06:02:51 +0000 UTC", modelpb.ToTime(batch[0].Timestamp).String())
assert.Equal(t, "trace-id", batch[0].Trace.Id)
assert.Equal(t, "transaction-id", batch[0].Transaction.Id)
assert.Equal(t, "error", batch[0].Log.Level)
Expand Down
Loading

0 comments on commit a636758

Please sign in to comment.