Skip to content

Commit

Permalink
Merge pull request #952 from mirackara/cloudentities
Browse files Browse the repository at this point in the history
Cloud Services Entity Relationship Changes
  • Loading branch information
nr-swilloughby authored Oct 10, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 65528e1 + 7543b05 commit ead5d74
Showing 9 changed files with 551 additions and 52 deletions.
3 changes: 2 additions & 1 deletion v3/integrations/nramqp/examples/consumer/main.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,8 @@ func main() {

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
amqpURL := "amqp://guest:guest@localhost:5672/"
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

10 changes: 7 additions & 3 deletions v3/integrations/nramqp/examples/publisher/main.go
Original file line number Diff line number Diff line change
@@ -40,13 +40,15 @@ type amqpServer struct {
ch *amqp.Channel
exchange string
routingKey string
url string
}

func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string) *amqpServer {
func NewServer(channel *amqp.Channel, exchangeName, routingKeyName string, url string) *amqpServer {
return &amqpServer{
channel,
exchangeName,
routingKeyName,
url,
}
}

@@ -65,6 +67,7 @@ func (serv *amqpServer) publishPlainTxtMessage(w http.ResponseWriter, r *http.Re
ctx,
serv.exchange, // exchange
serv.routingKey, // routing key
serv.url, // url
false, // mandatory
false, // immediate
amqp.Publishing{
@@ -94,7 +97,8 @@ func main() {

nrApp.WaitForConnection(time.Second * 5)

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
amqpURL := "amqp://guest:guest@localhost:5672/"
conn, err := amqp.Dial(amqpURL)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

@@ -112,7 +116,7 @@ func main() {
)
failOnError(err, "Failed to declare a queue")

server := NewServer(ch, "", q.Name)
server := NewServer(ch, "", q.Name, amqpURL)

http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/", server.index))
http.HandleFunc(newrelic.WrapHandleFunc(nrApp, "/message", server.publishPlainTxtMessage))
43 changes: 35 additions & 8 deletions v3/integrations/nramqp/nramqp.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package nramqp

import (
"context"
"strings"

amqp "github.com/rabbitmq/amqp091-go"

@@ -16,7 +17,7 @@ const (

func init() { internal.TrackUsage("integration", "messagebroker", "nramqp") }

func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment {
func createProducerSegment(exchange, key string) *newrelic.MessageProducerSegment {
s := newrelic.MessageProducerSegment{
Library: RabbitMQLibrary,
DestinationName: "Default",
@@ -33,13 +34,34 @@ func creatProducerSegment(exchange, key string) *newrelic.MessageProducerSegment
return &s
}

func GetHostAndPortFromURL(url string) (string, string) {
// url is of format amqp://user:password@host:port or amqp://host:port
var hostPortPart string

// extract the part after "@" symbol, if present
if parts := strings.Split(url, "@"); len(parts) == 2 {
hostPortPart = parts[1]
} else {
// assume the whole url after "amqp://" is the host:port part
hostPortPart = strings.TrimPrefix(url, "amqp://")
}

// split the host:port part
strippedURL := strings.Split(hostPortPart, ":")
if len(strippedURL) != 2 {
return "", ""
}
return strippedURL[0], strippedURL[1]
}

// PublishedWithContext looks for a newrelic transaction in the context object, and if found, creates a message producer segment.
// It will also inject distributed tracing headers into the message.
func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key, url string, mandatory, immediate bool, msg amqp.Publishing) error {
host, port := GetHostAndPortFromURL(url)
txn := newrelic.FromContext(ctx)
if txn != nil {
// generate message broker segment
s := creatProducerSegment(exchange, key)
s := createProducerSegment(exchange, key)

// capture telemetry for AMQP producer
if msg.Headers != nil && len(msg.Headers) > 0 {
@@ -49,15 +71,18 @@ func PublishWithContext(ch *amqp.Channel, ctx context.Context, exchange, key str
}
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr)
}
s.StartTime = txn.StartSegmentNow()

// inject DT headers into headers object
msg.Headers = injectDtHeaders(txn, msg.Headers)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeSpanKind, "producer")
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeServerAddress, host)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeServerPort, port)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageDestinationName, exchange)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageRoutingKey, key)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageCorrelationID, msg.CorrelationId)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageReplyTo, msg.ReplyTo)

// inject DT headers into headers object
msg.Headers = injectDtHeaders(txn, msg.Headers)

s.StartTime = txn.StartSegmentNow()
err := ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
s.End()
return err
@@ -91,8 +116,10 @@ func Consume(app *newrelic.Application, ch *amqp.Channel, queue, consumer string
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageHeaders, hdrStr, nil)
}
}

integrationsupport.AddAgentAttribute(txn, newrelic.AttributeSpanKind, "consumer", nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageQueueName, queue, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageDestinationName, queue, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessagingDestinationPublishName, delivery.Exchange, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageRoutingKey, delivery.RoutingKey, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageCorrelationID, delivery.CorrelationId, nil)
integrationsupport.AddAgentAttribute(txn, newrelic.AttributeMessageReplyTo, delivery.ReplyTo, nil)
56 changes: 54 additions & 2 deletions v3/integrations/nramqp/nramqp_test.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ func BenchmarkCreateProducerSegment(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
creatProducerSegment("exchange", "key")
createProducerSegment("exchange", "key")
}
}

@@ -66,7 +66,7 @@ func TestCreateProducerSegment(t *testing.T) {
}

for _, test := range tests {
s := creatProducerSegment(test.exchange, test.key)
s := createProducerSegment(test.exchange, test.key)
if s.DestinationName != test.expect.DestinationName {
t.Errorf("expected destination name %s, got %s", test.expect.DestinationName, s.DestinationName)
}
@@ -76,3 +76,55 @@ func TestCreateProducerSegment(t *testing.T) {
}

}

func TestHostAndPortParsing(t *testing.T) {
app := createTestApp()
txn := app.StartTransaction("test")
defer txn.End()

type testObject struct {
url string
expectHost string
expectPort string
}

tests := []testObject{
{
"amqp://user:password@host:port",
"host",
"port",
},
{
"amqp://host:port",
"host",
"port",
},
{
"aaa://host:port",
"",
"",
},

{
"amqp://user:password@host",
"",
"",
},
{
"amqp://user:password@host:port:extra",
"",
"",
},
}

for _, test := range tests {
host, port := GetHostAndPortFromURL(test.url)
if host != test.expectHost {
t.Errorf("expected host %s, got %s", test.expectHost, host)
}
if port != test.expectPort {
t.Errorf("expected port %s, got %s", test.expectPort, port)
}
}

}
42 changes: 35 additions & 7 deletions v3/integrations/nrawssdk-v2/go.mod
Original file line number Diff line number Diff line change
@@ -2,17 +2,45 @@ module github.com/newrelic/go-agent/v3/integrations/nrawssdk-v2

// As of May 2021, the aws-sdk-go-v2 go.mod file uses 1.15:
// https://github.com/aws/aws-sdk-go-v2/blob/master/go.mod
go 1.20
go 1.21

toolchain go1.21.0

require (
github.com/aws/aws-sdk-go-v2 v1.16.15
github.com/aws/aws-sdk-go-v2/config v1.17.6
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.17.0
github.com/aws/aws-sdk-go-v2/service/lambda v1.24.5
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.10
github.com/aws/smithy-go v1.13.3
github.com/aws/aws-sdk-go-v2 v1.30.4
github.com/aws/aws-sdk-go-v2/config v1.27.31
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.6
github.com/aws/aws-sdk-go-v2/service/lambda v1.58.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.34.6
github.com/aws/smithy-go v1.20.4
github.com/newrelic/go-agent/v3 v3.33.1
)

require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.30 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.56.3 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

replace github.com/newrelic/go-agent/v3 => ../..
75 changes: 75 additions & 0 deletions v3/integrations/nrawssdk-v2/nrawssdk.go
Original file line number Diff line number Diff line change
@@ -28,9 +28,14 @@ package nrawssdk

import (
"context"
"net/url"
"strconv"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
awsmiddle "github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/smithy-go/middleware"
smithymiddle "github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
"github.com/newrelic/go-agent/v3/internal/integrationsupport"
@@ -41,6 +46,11 @@ type nrMiddleware struct {
txn *newrelic.Transaction
}

// Context key for SQS service queue
type contextKey string

const queueURLKey contextKey = "QueueURL"

type endable interface{ End() }

// See https://aws.github.io/aws-sdk-go-v2/docs/middleware/ for a description of
@@ -88,6 +98,24 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error {
response, ok := out.RawResponse.(*smithyhttp.Response)

if ok {
if serviceName == "sqs" || serviceName == "SQS" {
if queueURL, ok := ctx.Value(queueURLKey).(string); ok {
parsedURL, err := url.Parse(queueURL)
if err == nil {
// Example URL: https://sqs.{region}.amazonaws.com/{account.id}/{queue.name}
pathParts := strings.Split(parsedURL.Path, "/")
if len(pathParts) >= 3 {
accountID := pathParts[1]
queueName := pathParts[2]
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeCloudAccountID, accountID)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeCloudRegion, region)
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageSystem, "aws_sqs")
integrationsupport.AddAgentSpanAttribute(txn, newrelic.AttributeMessageDestinationName, queueName)
}
}

}
}
// Set additional span attributes
integrationsupport.AddAgentSpanAttribute(txn,
newrelic.AttributeResponseCode, strconv.Itoa(response.StatusCode))
@@ -107,6 +135,51 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error {
smithymiddle.Before)
}

func (m nrMiddleware) serializeMiddleware(stack *middleware.Stack) error {
return stack.Initialize.Add(middleware.InitializeMiddlewareFunc("NRSerializeMiddleware", func(
ctx context.Context, in middleware.InitializeInput, next middleware.InitializeHandler) (
out middleware.InitializeOutput, metadata middleware.Metadata, err error) {

serviceName := awsmiddle.GetServiceID(ctx)
if serviceName == "sqs" || serviceName == "SQS" {
QueueURL := ""
switch params := in.Parameters.(type) {
case *sqs.SendMessageInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.DeleteQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.ReceiveMessageInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.DeleteMessageInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.ChangeMessageVisibilityInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.ChangeMessageVisibilityBatchInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.DeleteMessageBatchInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.SendMessageBatchInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.PurgeQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.GetQueueAttributesInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.SetQueueAttributesInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.TagQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
case *sqs.UntagQueueInput:
QueueURL = aws.ToString(params.QueueUrl)
default:
QueueURL = ""
}
// Store the QueueURL in the context
ctx = context.WithValue(ctx, queueURLKey, QueueURL)
}
return next.HandleInitialize(ctx, in)
}), middleware.After)
}

// AppendMiddlewares inserts New Relic middleware in the given `apiOptions` for
// the AWS SDK V2 for Go. It must be called only once per AWS configuration.
//
@@ -167,4 +240,6 @@ func (m nrMiddleware) deserializeMiddleware(stack *smithymiddle.Stack) error {
func AppendMiddlewares(apiOptions *[]func(*smithymiddle.Stack) error, txn *newrelic.Transaction) {
m := nrMiddleware{txn: txn}
*apiOptions = append(*apiOptions, m.deserializeMiddleware)
*apiOptions = append(*apiOptions, m.serializeMiddleware)

}
Loading

0 comments on commit ead5d74

Please sign in to comment.