Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate proto timestamp to uint64 #134

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions codec/fullevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (

"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func fullEvent(t *testing.B) *modelpb.APMEvent {
return &modelpb.APMEvent{
Timestamp: timestamppb.New(time.Unix(1, 1)),
Timestamp: uint64(time.Second.Nanoseconds()) + 1,
Span: &modelpb.Span{
Message: &modelpb.Message{
Body: "body",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
switch fKind := f.Kind(); fKind {
case reflect.String:
fieldVal = reflect.ValueOf(values.Str)
case reflect.Int, reflect.Int32, reflect.Int64:
case reflect.Int, reflect.Int32, reflect.Int64, reflect.Uint64:
fieldVal = reflect.ValueOf(values.Int).Convert(f.Type())
case reflect.Float64:
fieldVal = reflect.ValueOf(values.Float).Convert(f.Type())
Expand Down
17 changes: 6 additions & 11 deletions input/elasticapm/internal/modeldecoder/nullable/nullable.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ func init() {
iter.ReadNil()
case jsoniter.NumberValue:
us := iter.ReadInt64()
s := us / 1000000
ns := (us - (s * 1000000)) * 1000
(*((*TimeMicrosUnix)(ptr))).Val = time.Unix(s, ns).UTC()
(*((*TimeMicrosUnix)(ptr))).Val = us * 1000
(*((*TimeMicrosUnix)(ptr))).isSet = true
case jsoniter.StringValue:
tstr := iter.ReadString()
for _, f := range supportedTSFormats {
if t, err := time.Parse(f, tstr); err == nil {
(*((*TimeMicrosUnix)(ptr))).Val = t.UTC()
(*((*TimeMicrosUnix)(ptr))).Val = t.UnixNano()
(*((*TimeMicrosUnix)(ptr))).isSet = true
return
}
Expand Down Expand Up @@ -304,14 +302,11 @@ func (v *Interface) Reset() {
v.isSet = false
}

type TimeMicrosUnix struct {
Val time.Time
isSet bool
}
type TimeMicrosUnix Int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be unsigned?


// Set sets the value
func (v *TimeMicrosUnix) Set(val time.Time) {
v.Val = val
func (v *TimeMicrosUnix) Set(val uint64) {
v.Val = int64(val)
v.isSet = true
}

Expand All @@ -323,7 +318,7 @@ func (v *TimeMicrosUnix) IsSet() bool {
// Reset sets the Interface to it's initial state
// where it is not set and has no value
func (v *TimeMicrosUnix) Reset() {
v.Val = time.Time{}
v.Val = 0
v.isSet = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/elastic/apm-data/model/modelpb"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -282,10 +283,10 @@ func TestTimeMicrosUnix(t *testing.T) {
val: "2022-10-17 14:15:30.111 +0000 UTC"},
{name: "valid-with-str-tmz-without-milli", input: `{"tms":"2022-10-17T14:15:30Z"}`, isSet: true,
val: "2022-10-17 14:15:30 +0000 UTC"},
{name: "null", input: `{"tms":null}`, val: time.Time{}.String()},
{name: "null", input: `{"tms":null}`, val: time.Unix(0, 0).UTC().String()},
{name: "invalid-type", input: `{"tms":""}`, fail: true, isSet: true},
{name: "invalid-type", input: `{"tms":123.56}`, fail: true, isSet: true},
{name: "missing", input: `{}`, val: time.Time{}.String()},
{name: "missing", input: `{}`, val: time.Unix(0, 0).UTC().String()},
} {
t.Run(tc.name, func(t *testing.T) {
dec := json.NewDecoder(strings.NewReader(tc.input))
Expand All @@ -295,15 +296,16 @@ func TestTimeMicrosUnix(t *testing.T) {
require.Error(t, err)
} else {
require.NoError(t, err)
val := modelpb.PBTimestampToTime(uint64(testStruct.Tms.Val)).Format("2006-01-02 15:04:05.999 +0000 UTC")
assert.Equal(t, tc.isSet, testStruct.Tms.IsSet())
assert.Equal(t, tc.val, testStruct.Tms.Val.String())
assert.Equal(t, tc.val, val)
}

testStruct.Tms.Reset()
assert.False(t, testStruct.Tms.IsSet())
assert.Zero(t, testStruct.Tms.Val)

testStruct.Tms.Set(time.Now())
testStruct.Tms.Set(modelpb.PBTimestampNow())
assert.True(t, testStruct.Tms.IsSet())
assert.NotZero(t, testStruct.Tms.Val)
})
Expand Down
11 changes: 5 additions & 6 deletions input/elasticapm/internal/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
"github.com/elastic/apm-data/model/modelpb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
Expand Down Expand Up @@ -247,8 +246,8 @@ func mapToErrorModel(from *errorEvent, event *modelpb.APMEvent) {
if from.ParentID.IsSet() {
event.ParentId = from.ParentID.Val
}
if !from.Timestamp.Val.IsZero() {
event.Timestamp = timestamppb.New(from.Timestamp.Val)
if from.Timestamp.IsSet() {
event.Timestamp = uint64(from.Timestamp.Val)
}
if from.TraceID.IsSet() {
event.Trace = &modelpb.Trace{
Expand Down Expand Up @@ -628,9 +627,9 @@ func mapToSpanModel(from *span, event *modelpb.APMEvent) {
if from.Start.IsSet() {
// event.Timestamp is initialized to the time the payload was
// received; offset that by "start" milliseconds for RUM.
event.Timestamp = timestamppb.New(event.Timestamp.AsTime().Add(
time.Duration(float64(time.Millisecond) * from.Start.Val),
))
event.Timestamp = event.Timestamp + uint64(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event.Timestamp = event.Timestamp + uint64(
event.Timestamp += uint64(

time.Duration(float64(time.Millisecond)*from.Start.Val).Nanoseconds(),
)
}
}

Expand Down
17 changes: 8 additions & 9 deletions input/elasticapm/internal/modeldecoder/rumv3/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,17 +46,17 @@ func TestResetErrorOnRelease(t *testing.T) {

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.PBTimestampNow()
eventBase := initializedMetadata()
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"e":{"id":"a-b-c","timestamp":1599996822281000,"log":{"mg":"abc"}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
var batch modelpb.Batch
require.NoError(t, DecodeNestedError(dec, &input, &batch))
require.Len(t, batch, 1)
require.NotNil(t, batch[0].Error)
assert.Equal(t, time.Unix(1599996822, 281000000).UTC(), batch[0].Timestamp.AsTime())
assert.Equal(t, uint64(1599996822281000*1000), batch[0].Timestamp)
assert.Empty(t, cmp.Diff(&modelpb.Error{
Id: "a-b-c",
Log: &modelpb.ErrorLog{
Expand All @@ -72,7 +71,7 @@ func TestDecodeNestedError(t *testing.T) {
dec = decoder.NewJSONDecoder(strings.NewReader(str))
batch = modelpb.Batch{}
require.NoError(t, DecodeNestedError(dec, &input, &batch))
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

// test decode
err := DecodeNestedError(decoder.NewJSONDecoder(strings.NewReader(`malformed`)), &input, &batch)
Expand Down Expand Up @@ -146,24 +145,24 @@ func TestDecodeMapToErrorModel(t *testing.T) {
}
var input errorEvent
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second).UTC()
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.PBTimestampAdd(modelpb.PBTimestampNow(), time.Second)
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToErrorModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal)

// leave event timestamp unmodified if eventTime is zero
out1.Timestamp = timestamppb.New(reqTime)
out1.Timestamp = reqTime
modeldecodertest.SetStructValues(&input, defaultVal)
mapToErrorModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Error, exceptions, defaultVal)

// reuse input model for different event
// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToErrorModel(&input, &out2)
Expand Down
6 changes: 3 additions & 3 deletions input/elasticapm/internal/modeldecoder/rumv3/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ type contextServiceRuntime struct {

type errorEvent struct {
_ struct{} `validate:"requiredAnyOf=ex;log"`
// Timestamp holds the recorded time of the event, UTC based and formatted
// as microseconds since Unix epoch.
Timestamp nullable.TimeMicrosUnix `json:"timestamp"`
// Log holds additional information added when the error is logged.
Log errorLog `json:"log"`
// Culprit identifies the function call which was the primary perpetrator
Expand All @@ -184,6 +181,9 @@ type errorEvent struct {
Transaction errorTransactionRef `json:"x"`
// Context holds arbitrary contextual information for the event.
Context context `json:"c"`
// Timestamp holds the recorded time of the event, UTC based and formatted
// as microseconds since Unix epoch.
Timestamp nullable.TimeMicrosUnix `json:"timestamp"`
}

type errorException struct {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 13 additions & 14 deletions input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder"
Expand All @@ -47,9 +46,9 @@ func TestResetTransactionOnRelease(t *testing.T) {

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := time.Now().UTC()
now := modelpb.PBTimestampNow()
eventBase := initializedMetadata()
eventBase.Timestamp = timestamppb.New(now)
eventBase.Timestamp = now
input := modeldecoder.Input{Base: eventBase}
str := `{"x":{"n":"tr-a","d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"y":[{"n":"a","d":10,"t":"http","id":"123","s":20}],"me":[{"sa":{"ysc":{"v":5}},"y":{"t":"span_type","su":"span_subtype"}}]}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
Expand All @@ -62,7 +61,7 @@ func TestDecodeNestedTransaction(t *testing.T) {

assert.Equal(t, "request", batch[0].Transaction.Type)
// fall back to request time
assert.Equal(t, now, batch[0].Timestamp.AsTime())
assert.Equal(t, now, batch[0].Timestamp)

// Ensure nested metricsets are decoded. RUMv3 only sends
// breakdown metrics, so the Metricsets will be empty and
Expand All @@ -78,11 +77,11 @@ func TestDecodeNestedTransaction(t *testing.T) {
Subtype: "span_subtype",
SelfTime: &modelpb.AggregatedDuration{Count: 5},
}, batch[1].Span, protocmp.Transform()))
assert.Equal(t, now, batch[1].Timestamp.AsTime())
assert.Equal(t, now, batch[1].Timestamp)

// ensure nested spans are decoded
start := time.Duration(20 * 1000 * 1000)
assert.Equal(t, now.Add(start), batch[2].Timestamp.AsTime()) // add start to timestamp
assert.Equal(t, now+uint64(start.Nanoseconds()), batch[2].Timestamp) // add start to timestamp
assert.Equal(t, "100", batch[2].Transaction.Id)
assert.Equal(t, "1", batch[2].Trace.Id)
assert.Equal(t, "100", batch[2].ParentId)
Expand All @@ -93,8 +92,8 @@ func TestDecodeNestedTransaction(t *testing.T) {
})

t.Run("decode-marks", func(t *testing.T) {
now := time.Now()
eventBase := modelpb.APMEvent{Timestamp: timestamppb.New(now)}
now := modelpb.PBTimestampNow()
eventBase := modelpb.APMEvent{Timestamp: now}
input := modeldecoder.Input{Base: &eventBase}
str := `{"x":{"d":100,"id":"100","tid":"1","t":"request","yc":{"sd":2},"k":{"a":{"dc":0.1,"di":0.2,"ds":0.3,"de":0.4,"fb":0.5,"fp":0.6,"lp":0.7,"long":0.8},"nt":{"fs":0.1,"ls":0.2,"le":0.3,"cs":0.4,"ce":0.5,"qs":0.6,"rs":0.7,"re":0.8,"dl":0.9,"di":0.11,"ds":0.21,"de":0.31,"dc":0.41,"es":0.51,"ee":6,"long":0.99},"long":{"long":0.1}}}}`
dec := decoder.NewJSONDecoder(strings.NewReader(str))
Expand Down Expand Up @@ -215,16 +214,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) {

var input transaction
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second)
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.PBTimestampAdd(modelpb.PBTimestampNow(), time.Second)
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToTransactionModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Transaction, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToTransactionModel(&input, &out2)
Expand Down Expand Up @@ -267,16 +266,16 @@ func TestDecodeMapToTransactionModel(t *testing.T) {

var input span
var out1, out2 modelpb.APMEvent
reqTime := time.Now().Add(time.Second)
out1.Timestamp = timestamppb.New(reqTime)
reqTime := modelpb.PBTimestampAdd(modelpb.PBTimestampNow(), time.Second)
out1.Timestamp = reqTime
defaultVal := modeldecodertest.DefaultValues()
modeldecodertest.SetStructValues(&input, defaultVal)
mapToSpanModel(&input, &out1)
input.Reset()
modeldecodertest.AssertStructValues(t, out1.Span, exceptions, defaultVal)

// ensure memory is not shared by reusing input model
out2.Timestamp = timestamppb.New(reqTime)
out2.Timestamp = reqTime
otherVal := modeldecodertest.NonDefaultValues()
modeldecodertest.SetStructValues(&input, otherVal)
mapToSpanModel(&input, &out2)
Expand Down
Loading