Skip to content

Commit

Permalink
Register partial transaction in aws extn
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Nov 17, 2022
1 parent 4d07ba8 commit 2f6e0df
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 34 deletions.
69 changes: 35 additions & 34 deletions modelwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type modelWriter struct {
// writeTransaction encodes tx as JSON to the buffer, and then resets tx.
func (w *modelWriter) writeTransaction(tx *Transaction, td *TransactionData) {
var modelTx model.Transaction
w.buildModelTransaction(&modelTx, tx, td)
BuildModelTransaction(&modelTx, tx, td)
w.json.RawString(`{"transaction":`)
modelTx.MarshalFastJSON(&w.json)
w.json.RawByte('}')
Expand Down Expand Up @@ -103,39 +103,6 @@ func (w *modelWriter) writeMetrics(m *Metrics) {
m.reset()
}

func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transaction, td *TransactionData) {
out.ID = model.SpanID(tx.traceContext.Span)
out.TraceID = model.TraceID(tx.traceContext.Trace)
sampled := tx.traceContext.Options.Recorded()
if !sampled {
out.Sampled = &notSampled
}
if tx.traceContext.State.haveSampleRate {
out.SampleRate = &tx.traceContext.State.sampleRate
}

out.ParentID = model.SpanID(tx.parentID)
out.Name = truncateString(td.Name)
out.Type = truncateString(td.Type)
out.Result = truncateString(td.Result)
out.Outcome = normalizeOutcome(td.Outcome)
out.Timestamp = model.Time(td.timestamp.UTC())
out.Duration = td.Duration.Seconds() * 1000
out.SpanCount.Started = td.spansCreated
out.SpanCount.Dropped = td.spansDropped
out.OTel = td.Context.otel
for _, sl := range td.links {
out.Links = append(out.Links, model.SpanLink{TraceID: model.TraceID(sl.Trace), SpanID: model.SpanID(sl.Span)})
}
if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 {
out.DroppedSpansStats = dss
}

if sampled {
out.Context = td.Context.build()
}
}

func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData) {
w.modelStacktrace = w.modelStacktrace[:0]
out.ID = model.SpanID(span.traceContext.Span)
Expand Down Expand Up @@ -258,6 +225,40 @@ func (w *modelWriter) buildModelError(out *model.Error, e *ErrorData) {
out.Culprit = truncateString(out.Culprit)
}

// BuildModelTransaction converts apm transaction to model transaction
func BuildModelTransaction(out *model.Transaction, tx *Transaction, td *TransactionData) {
out.ID = model.SpanID(tx.traceContext.Span)
out.TraceID = model.TraceID(tx.traceContext.Trace)
sampled := tx.traceContext.Options.Recorded()
if !sampled {
out.Sampled = &notSampled
}
if tx.traceContext.State.haveSampleRate {
out.SampleRate = &tx.traceContext.State.sampleRate
}

out.ParentID = model.SpanID(tx.parentID)
out.Name = truncateString(td.Name)
out.Type = truncateString(td.Type)
out.Result = truncateString(td.Result)
out.Outcome = normalizeOutcome(td.Outcome)
out.Timestamp = model.Time(td.timestamp.UTC())
out.Duration = td.Duration.Seconds() * 1000
out.SpanCount.Started = td.spansCreated
out.SpanCount.Dropped = td.spansDropped
out.OTel = td.Context.otel
for _, sl := range td.links {
out.Links = append(out.Links, model.SpanLink{TraceID: model.TraceID(sl.Trace), SpanID: model.SpanID(sl.Span)})
}
if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 {
out.DroppedSpansStats = dss
}

if sampled {
out.Context = td.Context.build()
}
}

func stacktraceCulprit(frames []model.StacktraceFrame) string {
for _, frame := range frames {
if !frame.LibraryFrame {
Expand Down
1 change: 1 addition & 0 deletions module/apmlambda/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.elastic.co/apm/module/apmlambda/v2
require (
github.com/aws/aws-lambda-go v1.8.0
go.elastic.co/apm/v2 v2.1.0
go.elastic.co/fastjson v1.1.0
)

replace go.elastic.co/apm/v2 => ../..
Expand Down
43 changes: 43 additions & 0 deletions module/apmlambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package apmlambda // import "go.elastic.co/apm/module/apmlambda/v2"

import (
"bytes"
"log"
"net"
"net/http"
"net/rpc"
"os"
"unicode/utf8"
Expand All @@ -28,7 +30,9 @@ import (
"github.com/aws/aws-lambda-go/lambdacontext"

"go.elastic.co/apm/v2"
"go.elastic.co/apm/v2/model"
"go.elastic.co/apm/v2/stacktrace"
"go.elastic.co/fastjson"
)

const (
Expand All @@ -51,6 +55,10 @@ var (
Request string `json:"request,omitempty"`
Response string `json:"response,omitempty"`
}

jsonw fastjson.Writer

ignoreTxnRegistration bool
)

func init() {
Expand All @@ -72,6 +80,17 @@ func (f *Function) Ping(req *messages.PingRequest, response *messages.PingRespon
return f.client.Call("Function.Ping", req, response)
}

func createPartialTransactionJSON(apmTx *apm.Transaction, w *fastjson.Writer) error {
var tx model.Transaction
apm.BuildModelTransaction(&tx, apmTx, apmTx.TransactionData)
w.RawString(`{"transaction":`)
if err := tx.MarshalFastJSON(w); err != nil {
return err
}
w.RawByte('}')
return nil
}

// Invoke invokes the Lambda function. This is our main trace point.
func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.InvokeResponse) error {
tx := f.tracer.StartTransaction(lambdacontext.FunctionName, "function")
Expand All @@ -92,6 +111,30 @@ func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.Invoke
lambdaContext.Request = formatPayload(req.Payload)
lambdaContext.Response = ""

if !ignoreTxnRegistration {
defer jsonw.Reset()
if err := createPartialTransactionJSON(tx, &jsonw); err != nil {
log.Printf("failed to create partial transaction for registration: %v", err)
} else {
resp, err := http.Post(
// TODO: @lahsivjar better way to get base URI
"http://localhost:8200/register/transaction",
"application/vnd.elastic.apm.transaction+json",
bytes.NewReader(jsonw.Bytes()),
)
// Don't attempt registration for next invocations if network
// error or the registration endpoint is not found.
if err != nil || resp.StatusCode == 404 {
ignoreTxnRegistration = true
}
if err != nil {
log.Printf("failed to register transaction, req failed with error: %v", err)
}
if resp.StatusCode/100 != 2 {
log.Printf("failed to register transaction, req failed with status code: %d", resp.StatusCode)
}
}
}
err := f.client.Call("Function.Invoke", req, response)
if err != nil {
e := f.tracer.NewError(err)
Expand Down
36 changes: 36 additions & 0 deletions module/apmlambda/lambda_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmlambda

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"go.elastic.co/apm/v2"
"go.elastic.co/fastjson"
)

func TestCreatePartialTransactionJSON(t *testing.T) {
var w fastjson.Writer
tx := apm.DefaultTracer().StartTransaction("test", "function")
defer tx.End()
assert.NoError(t, createPartialTransactionJSON(tx, &w))
assert.True(t, json.Valid(w.Bytes()))
assert.Equal(t, "test", string(w.Bytes()))
}

0 comments on commit 2f6e0df

Please sign in to comment.