Skip to content

Commit

Permalink
[chore] [exporter/signalfx] Remove dead code in correlation logic (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax authored Feb 6, 2024
1 parent 06203fa commit 633aea2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 55 deletions.
61 changes: 10 additions & 51 deletions exporter/signalfxexporter/internal/apm/tracetracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package tracetracker // import "github.com/open-telemetry/opentelemetry-collecto
import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -29,16 +28,11 @@ var DefaultDimsToSyncSource = map[string]string{
// spans passed through ProcessSpans. It supports expiry of service names if
// they are not seen for a certain amount of time.
type ActiveServiceTracker struct {
dpCacheLock sync.Mutex

log log.Logger

// hostIDDims is the map of key/values discovered by the agent that identify the host
hostIDDims map[string]string

// sendTraceHostCorrelationMetrics turns metric emission on and off
sendTraceHostCorrelationMetrics bool

// hostServiceCache is a cache of services associated with the host
hostServiceCache *TimeoutCache

Expand All @@ -56,9 +50,6 @@ type ActiveServiceTracker struct {
// for more information
tenantEmptyEnvironmentCache *TimeoutCache

// cache of service names to generate datapoints for
dpCache map[string]struct{}

timeNow func() time.Time

// correlationClient is the client used for updating infrastructure correlation properties
Expand All @@ -72,21 +63,6 @@ type ActiveServiceTracker struct {
dimsToSyncSource map[string]string
}

// addServiceToDPCache creates a datapoint for the given service in the dpCache.
func (a *ActiveServiceTracker) addServiceToDPCache(service string) {
a.dpCacheLock.Lock()
defer a.dpCacheLock.Unlock()

a.dpCache[service] = struct{}{}
}

// removeServiceFromDPCache removes the datapoint for the given service from the dpCache
func (a *ActiveServiceTracker) removeServiceFromDPCache(service string) {
a.dpCacheLock.Lock()
delete(a.dpCache, service)
a.dpCacheLock.Unlock()
}

// LoadHostIDDimCorrelations asynchronously retrieves all known correlations from the backend
// for all known hostIDDims. This allows the agent to timeout and manage correlation
// deletions on restart.
Expand All @@ -101,11 +77,6 @@ func (a *ActiveServiceTracker) LoadHostIDDimCorrelations() {
// Note that only the value is set for the host service cache because we only track services for the host
// therefore there we don't need to include the dim key and value on the cache key
if isNew := a.hostServiceCache.UpdateOrCreate(&CacheKey{value: service}, a.timeNow()); isNew {
if a.sendTraceHostCorrelationMetrics {
// create datapoint for service
a.addServiceToDPCache(service)
}

a.log.WithFields(log.Fields{"service": service}).Debug("Tracking service name from trace span")
}
}
Expand All @@ -129,22 +100,19 @@ func New(
timeout time.Duration,
correlationClient correlations.CorrelationClient,
hostIDDims map[string]string,
sendTraceHostCorrelationMetrics bool,
dimsToSyncSource map[string]string,
) *ActiveServiceTracker {
a := &ActiveServiceTracker{
log: log,
hostIDDims: hostIDDims,
hostServiceCache: NewTimeoutCache(timeout),
hostEnvironmentCache: NewTimeoutCache(timeout),
tenantServiceCache: NewTimeoutCache(timeout),
tenantEnvironmentCache: NewTimeoutCache(timeout),
tenantEmptyEnvironmentCache: NewTimeoutCache(timeout),
dpCache: make(map[string]struct{}),
correlationClient: correlationClient,
sendTraceHostCorrelationMetrics: sendTraceHostCorrelationMetrics,
timeNow: time.Now,
dimsToSyncSource: dimsToSyncSource,
log: log,
hostIDDims: hostIDDims,
hostServiceCache: NewTimeoutCache(timeout),
hostEnvironmentCache: NewTimeoutCache(timeout),
tenantServiceCache: NewTimeoutCache(timeout),
tenantEnvironmentCache: NewTimeoutCache(timeout),
tenantEmptyEnvironmentCache: NewTimeoutCache(timeout),
correlationClient: correlationClient,
timeNow: time.Now,
dimsToSyncSource: dimsToSyncSource,
}
a.LoadHostIDDimCorrelations()

Expand Down Expand Up @@ -258,11 +226,6 @@ func (a *ActiveServiceTracker) processService(span Span, now time.Time) {
}
}

if a.sendTraceHostCorrelationMetrics {
// create datapoint for service
a.addServiceToDPCache(service)
}

a.log.WithFields(log.Fields{"service": service}).Debug("Tracking service name from trace span")
}

Expand Down Expand Up @@ -308,10 +271,6 @@ func (a *ActiveServiceTracker) Purge() {
a.hostServiceCache.Delete(purged)
})
}
// remove host/service correlation metric from tracker
if a.sendTraceHostCorrelationMetrics {
a.removeServiceFromDPCache(purged.value)
}

a.log.WithFields(log.Fields{"serviceName": purged.value}).Debug("No longer tracking service name from trace span")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestExpiration(t *testing.T) {
correlationClient := &correlationTestClient{}

hostIDDims := map[string]string{"host": "test", "AWSUniqueId": "randomAWSUniqueId"}
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, true, DefaultDimsToSyncSource)
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
setTime(a, time.Unix(100, 0))

a.AddSpansGeneric(context.Background(), fakeSpanList{
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestCorrelationEmptyEnvironment(t *testing.T) {
hostIDDims := map[string]string{"host": "test", "AWSUniqueId": "randomAWSUniqueId"}
wg.Add(len(hostIDDims))
containerLevelIDDims := map[string]string{"kubernetes_pod_uid": "testk8sPodUID", "container_id": "testContainerID"}
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, true, DefaultDimsToSyncSource)
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
wg.Wait() // wait for the initial fetch of hostIDDims to complete

a.AddSpansGeneric(context.Background(), fakeSpanList{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestCorrelationUpdates(t *testing.T) {
hostIDDims := map[string]string{"host": "test", "AWSUniqueId": "randomAWSUniqueId"}
wg.Add(len(hostIDDims))
containerLevelIDDims := map[string]string{"kubernetes_pod_uid": "testk8sPodUID", "container_id": "testContainerID"}
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, true, DefaultDimsToSyncSource)
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
wg.Wait()
assert.Equal(t, int64(1), a.hostServiceCache.ActiveCount, "activeServiceCount is not properly tracked")
assert.Equal(t, int64(1), a.hostEnvironmentCache.ActiveCount, "activeEnvironmentCount is not properly tracked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (cor *Tracker) AddSpans(ctx context.Context, traces ptrace.Traces) error {
map[string]string{
hostDimension: hostID.ID,
},
false,
cor.cfg.SyncAttributes)

cor.pTicker = &timeutils.PolicyTicker{OnTickFunc: cor.traceTracker.Purge}
Expand Down

0 comments on commit 633aea2

Please sign in to comment.