Skip to content

Commit

Permalink
new conveyor in aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 9, 2025
1 parent 8543897 commit 5dee30d
Show file tree
Hide file tree
Showing 3 changed files with 885 additions and 0 deletions.
149 changes: 149 additions & 0 deletions internal/aggregator/tags_mapper2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2022 V Kontakte LLC
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

package aggregator

import (
"context"
"strconv"
"sync"
"time"

"github.com/vkcom/statshouse/internal/agent"
"github.com/vkcom/statshouse/internal/format"
"github.com/vkcom/statshouse/internal/metajournal"
"github.com/vkcom/statshouse/internal/pcache"
"pgregory.net/rand"
)

const maxUknownTagsInBucket = 128 // keep for low at first, then increase gradually
const maxCreateOrLoadTagsPerIteration = 1024 // keep for low at first, then increase gradually
const tagUsedSecondsCounterToCreate = 10 // if used in 10 different seconds, then create
const clearUnknownTagsEverySeconds = 120 // must be relatively large so enough time to collect statistics
const maxSendKnownTagsToAgent = 256

type unknownTag struct { // fits well into cache line
time uint32
secondsCounter uint32
}

type tagsMapper2 struct {
agg *Aggregator
sh2 *agent.Agent
metricStorage *metajournal.MetricsStorage
loader *metajournal.MetricMetaLoader

mu sync.Mutex
unknownTags map[string]unknownTag // collect statistics here
createTags map[string]format.CreateMappingExtra
clearTime time.Time // clear unknownTags periodically (but with random intervals)
}

func NewTagsMapper2(agg *Aggregator, sh2 *agent.Agent, metricStorage *metajournal.MetricsStorage, loader *metajournal.MetricMetaLoader, suffix string) *tagsMapper2 {
ms := &tagsMapper2{
agg: agg,
sh2: sh2,
metricStorage: metricStorage,
loader: loader,
unknownTags: map[string]unknownTag{},
createTags: map[string]format.CreateMappingExtra{},
}
return ms
}

func (ms *tagsMapper2) AddUnknownTags(unknownTags map[string]format.CreateMappingExtra, time uint32) {
ms.mu.Lock()
defer ms.mu.Unlock()
for k, v := range unknownTags {
u := ms.unknownTags[k]
if time > u.time {
u.time = time
u.secondsCounter++
if u.secondsCounter > tagUsedSecondsCounterToCreate {
ms.createTags[k] = v
u.secondsCounter = 0
// we do not delete from ms.unknownTags, because it will be most likely added back immediately,
// but we clear counter, so we will not add to ms.createTags every iteration
}
}
ms.unknownTags[k] = u
}
}

func (ms *tagsMapper2) goRun() {
var pairs []pcache.MappingPair
for { // no reason for graceful shutdown
time.Sleep(500 * time.Millisecond) // arbitrary delay to reduce meta DDOS
createTags := ms.getTagsToCreate()
pairs = pairs[:0]
counter := 0
for k, v := range createTags {
counter++
if counter > maxCreateOrLoadTagsPerIteration {
break // simply forget the rest, will load/create more on the next iteration
}
tag := ms.createTag(k, v)
if tag != 0 && tag != format.TagValueIDMappingFlood && tag != format.TagValueIDDoesNotExist { // last condition is a precaution, must be always true
pairs = append(pairs, pcache.MappingPair{Str: k, Val: tag})
}
}
nowUnix := uint32(time.Now().Unix())
ms.agg.mcagg.AddValues(nowUnix, pairs)
}
}

func (ms *tagsMapper2) createTag(str string, extra format.CreateMappingExtra) int32 {
var metricID int32
metricName := ""
if bm := format.BuiltinMetrics[extra.MetricID]; bm != nil {
metricID = extra.MetricID
metricName = bm.Name
} else if mm := ms.metricStorage.GetMetaMetric(extra.MetricID); mm != nil {
metricID = extra.MetricID
metricName = mm.Name
} else {
metricID = format.BuiltinMetricIDBudgetUnknownMetric
metricName = format.BuiltinMetricNameBudgetUnknownMetric
// Unknown metrics (also loads from caches after initial error, because cache does not store extra). They all share common limit.
// Journal can be stale, while mapping works.
// Explicit metric for this situation allows resetting limit from UI, like any other metric
}
keyValue, c, _, err := ms.loader.GetTagMapping(context.Background(), str, metricName, true)
key := ms.sh2.AggKey(0, format.BuiltinMetricIDAggMappingCreated, [16]int32{extra.ClientEnv, 0, 0, 0, metricID, c, extra.TagIDKey, format.TagValueIDAggMappingCreatedConveyorNew})
meta := format.BuiltinMetricMetaAggMappingCreated
key.WithAgentEnvRouteArch(extra.AgentEnv, extra.Route, extra.BuildArch)
if err != nil {
// TODO - write to actual log from time to time
ms.agg.appendInternalLog("map_tag_new", strconv.Itoa(int(extra.AgentEnv)), "error", str, extra.Metric, strconv.Itoa(int(metricID)), strconv.Itoa(int(extra.TagIDKey)), err.Error())
ms.sh2.AddValueCounterHost(key, 0, 1, extra.Host, meta)
} else {
switch c {
case format.TagValueIDAggMappingCreatedStatusFlood:
// TODO - more efficient flood processing - do not write to log, etc
ms.agg.appendInternalLog("map_tag_new", strconv.Itoa(int(extra.AgentEnv)), "flood", str, extra.Metric, strconv.Itoa(int(metricID)), strconv.Itoa(int(extra.TagIDKey)), extra.HostName)
ms.sh2.AddValueCounterHost(key, 0, 1, extra.Host, meta)
case format.TagValueIDAggMappingCreatedStatusCreated:
ms.agg.appendInternalLog("map_tag_new", strconv.Itoa(int(extra.AgentEnv)), "created", str, extra.Metric, strconv.Itoa(int(metricID)), strconv.Itoa(int(extra.TagIDKey)), strconv.Itoa(int(keyValue)))
// if str is created, it is valid and safe to write
ms.sh2.AddValueCounterHostStringBytes(key, float64(keyValue), 1, extra.Host, []byte(str), meta)
}
}
return keyValue
}

func (ms *tagsMapper2) getTagsToCreate() map[string]format.CreateMappingExtra {
now := time.Now()
ms.mu.Lock()
defer ms.mu.Unlock()
createTags := ms.createTags
ms.createTags = map[string]format.CreateMappingExtra{}
if ms.clearTime.Before(now) {
ms.unknownTags = map[string]unknownTag{} // simply forget everything
sec := clearUnknownTagsEverySeconds + time.Duration(rand.Int63n(clearUnknownTagsEverySeconds))
ms.clearTime = now.Add(time.Second * sec)
}
return createTags
}
Loading

0 comments on commit 5dee30d

Please sign in to comment.