diff --git a/modelwriter.go b/modelwriter.go index 2d7d2825b..cef59c0eb 100644 --- a/modelwriter.go +++ b/modelwriter.go @@ -48,7 +48,7 @@ type modelWriter struct { // writeTransaction encodes tx as JSON to the buffer, and then resets tx. func (w *modelWriter) writeTransaction(tx *Transaction, td *TransactionData) { var modelTx model.Transaction - w.buildModelTransaction(&modelTx, tx, td) + BuildModelTransaction(&modelTx, tx, td) w.json.RawString(`{"transaction":`) modelTx.MarshalFastJSON(&w.json) w.json.RawByte('}') @@ -103,39 +103,6 @@ func (w *modelWriter) writeMetrics(m *Metrics) { m.reset() } -func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transaction, td *TransactionData) { - out.ID = model.SpanID(tx.traceContext.Span) - out.TraceID = model.TraceID(tx.traceContext.Trace) - sampled := tx.traceContext.Options.Recorded() - if !sampled { - out.Sampled = ¬Sampled - } - if tx.traceContext.State.haveSampleRate { - out.SampleRate = &tx.traceContext.State.sampleRate - } - - out.ParentID = model.SpanID(tx.parentID) - out.Name = truncateString(td.Name) - out.Type = truncateString(td.Type) - out.Result = truncateString(td.Result) - out.Outcome = normalizeOutcome(td.Outcome) - out.Timestamp = model.Time(td.timestamp.UTC()) - out.Duration = td.Duration.Seconds() * 1000 - out.SpanCount.Started = td.spansCreated - out.SpanCount.Dropped = td.spansDropped - out.OTel = td.Context.otel - for _, sl := range td.links { - out.Links = append(out.Links, model.SpanLink{TraceID: model.TraceID(sl.Trace), SpanID: model.SpanID(sl.Span)}) - } - if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 { - out.DroppedSpansStats = dss - } - - if sampled { - out.Context = td.Context.build() - } -} - func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData) { w.modelStacktrace = w.modelStacktrace[:0] out.ID = model.SpanID(span.traceContext.Span) @@ -258,6 +225,40 @@ func (w *modelWriter) buildModelError(out *model.Error, e *ErrorData) { out.Culprit = truncateString(out.Culprit) } +// BuildModelTransaction converts apm transaction to model transaction +func BuildModelTransaction(out *model.Transaction, tx *Transaction, td *TransactionData) { + out.ID = model.SpanID(tx.traceContext.Span) + out.TraceID = model.TraceID(tx.traceContext.Trace) + sampled := tx.traceContext.Options.Recorded() + if !sampled { + out.Sampled = ¬Sampled + } + if tx.traceContext.State.haveSampleRate { + out.SampleRate = &tx.traceContext.State.sampleRate + } + + out.ParentID = model.SpanID(tx.parentID) + out.Name = truncateString(td.Name) + out.Type = truncateString(td.Type) + out.Result = truncateString(td.Result) + out.Outcome = normalizeOutcome(td.Outcome) + out.Timestamp = model.Time(td.timestamp.UTC()) + out.Duration = td.Duration.Seconds() * 1000 + out.SpanCount.Started = td.spansCreated + out.SpanCount.Dropped = td.spansDropped + out.OTel = td.Context.otel + for _, sl := range td.links { + out.Links = append(out.Links, model.SpanLink{TraceID: model.TraceID(sl.Trace), SpanID: model.SpanID(sl.Span)}) + } + if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 { + out.DroppedSpansStats = dss + } + + if sampled { + out.Context = td.Context.build() + } +} + func stacktraceCulprit(frames []model.StacktraceFrame) string { for _, frame := range frames { if !frame.LibraryFrame { diff --git a/module/apmlambda/go.mod b/module/apmlambda/go.mod index d149c0b7a..1cb8ca81a 100644 --- a/module/apmlambda/go.mod +++ b/module/apmlambda/go.mod @@ -3,6 +3,7 @@ module go.elastic.co/apm/module/apmlambda/v2 require ( github.com/aws/aws-lambda-go v1.8.0 go.elastic.co/apm/v2 v2.2.0 + go.elastic.co/fastjson v1.1.0 ) replace go.elastic.co/apm/v2 => ../.. diff --git a/module/apmlambda/lambda.go b/module/apmlambda/lambda.go index 449a84640..0ee5b46c9 100644 --- a/module/apmlambda/lambda.go +++ b/module/apmlambda/lambda.go @@ -18,8 +18,11 @@ package apmlambda // import "go.elastic.co/apm/module/apmlambda/v2" import ( + "bytes" + "fmt" "log" "net" + "net/http" "net/rpc" "os" "unicode/utf8" @@ -28,7 +31,9 @@ import ( "github.com/aws/aws-lambda-go/lambdacontext" "go.elastic.co/apm/v2" + "go.elastic.co/apm/v2/model" "go.elastic.co/apm/v2/stacktrace" + "go.elastic.co/fastjson" ) const ( @@ -51,6 +56,10 @@ var ( Request string `json:"request,omitempty"` Response string `json:"response,omitempty"` } + + jsonw fastjson.Writer + metadataBytes int + ignoreTxnRegistration bool ) func init() { @@ -72,6 +81,17 @@ func (f *Function) Ping(req *messages.PingRequest, response *messages.PingRespon return f.client.Call("Function.Ping", req, response) } +func createPartialTransactionJSON(apmTx *apm.Transaction, w *fastjson.Writer) error { + var tx model.Transaction + apm.BuildModelTransaction(&tx, apmTx, apmTx.TransactionData) + w.RawString(`{"transaction":`) + if err := tx.MarshalFastJSON(w); err != nil { + return err + } + w.RawByte('}') + return nil +} + // Invoke invokes the Lambda function. This is our main trace point. func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.InvokeResponse) error { tx := f.tracer.StartTransaction(lambdacontext.FunctionName, "function") @@ -92,6 +112,9 @@ func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.Invoke lambdaContext.Request = formatPayload(req.Payload) lambdaContext.Response = "" + if err := registerTxn(f.tracer, tx, req.RequestId); err != nil { + log.Printf("failed to register txn: %v", err) + } err := f.client.Call("Function.Invoke", req, response) if err != nil { e := f.tracer.NewError(err) @@ -111,6 +134,49 @@ func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.Invoke return nil } +func registerTxn(tracer *apm.Tracer, tx *apm.Transaction, requestID string) error { + if ignoreTxnRegistration { + return nil + } + + if metadataBytes == 0 { + jsonw.Reset() + mb := tracer.JSONRequestMetadata() + jsonw.RawBytes(tracer.JSONRequestMetadata()) + metadataBytes = len(mb) + } + defer jsonw.Rewind(metadataBytes) + + if err := createPartialTransactionJSON(tx, &jsonw); err != nil { + return fmt.Errorf("failed to create txn registration body: %v", err) + } + req, err := http.NewRequest( + http.MethodPost, + // TODO: @lahsivjar better way to get base URI + "http://localhost:8200/register/transaction", + bytes.NewReader(jsonw.Bytes()), + ) + if err != nil { + return fmt.Errorf("failed to create txn registration request: %v", err) + } + req.Header.Set("Content-Type", "application/vnd.elastic.apm.transaction+ndjson") + req.Header.Set("x-elastic-aws-request-id", requestID) + + resp, err := http.DefaultClient.Do(req) + // Don't attempt registration for next invocations if network + // error or the registration endpoint is not found. + if err != nil || resp.StatusCode == 404 { + ignoreTxnRegistration = true + } + if err != nil { + return fmt.Errorf("failed to register transaction, req failed with error: %v", err) + } + if resp.StatusCode/100 != 2 { + return fmt.Errorf("failed to register transaction, req failed with status code: %d", resp.StatusCode) + } + return nil +} + type invokeResponseError struct { err *messages.InvokeResponse_Error } diff --git a/module/apmlambda/lambda_test.go b/module/apmlambda/lambda_test.go new file mode 100644 index 000000000..f25bf3158 --- /dev/null +++ b/module/apmlambda/lambda_test.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmlambda + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.elastic.co/apm/v2" + "go.elastic.co/fastjson" +) + +func TestCreatePartialTransactionJSON(t *testing.T) { + var w fastjson.Writer + tx := apm.DefaultTracer().StartTransaction("test", "function") + defer tx.End() + assert.NoError(t, createPartialTransactionJSON(tx, &w)) + assert.True(t, json.Valid(w.Bytes())) + assert.Equal(t, "test", string(w.Bytes())) +} diff --git a/tracer.go b/tracer.go index 1b6271559..1628d78b3 100644 --- a/tracer.go +++ b/tracer.go @@ -1241,7 +1241,7 @@ func (t *Tracer) loop() { } sendStreamRequest <- gracePeriod if metadata == nil { - metadata = t.jsonRequestMetadata() + metadata = t.JSONRequestMetadata() } zlibWriter.Reset(&requestBuf) zlibWriter.Write(metadata) @@ -1317,10 +1317,10 @@ func (t *Tracer) loop() { } } -// jsonRequestMetadata returns a JSON-encoded metadata object that features +// JSONRequestMetadata returns a JSON-encoded metadata object that features // at the head of every request body. This is called exactly once, when the // first request is made. -func (t *Tracer) jsonRequestMetadata() []byte { +func (t *Tracer) JSONRequestMetadata() []byte { var json fastjson.Writer json.RawString(`{"metadata":`) t.encodeRequestMetadata(&json)