Skip to content

Commit

Permalink
new conveyor in aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
hrissan committed Jan 9, 2025
1 parent 1acbab6 commit 8543897
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 12 deletions.
16 changes: 13 additions & 3 deletions cmd/statshouse/statshouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,27 @@ func runMain() int {
argv.cacheDir = filepath.Dir(argv.diskCacheFilename)
}
var dc *pcache.DiskCache // We support working without touching disk (on readonly filesystems, in stateless containers, etc)
var fpmcagg *os.File
if argv.cacheDir != "" {
var err error
if dc, err = pcache.OpenDiskCache(filepath.Join(argv.cacheDir, "mapping_cache.sqlite3"), pcache.DefaultTxDuration); err != nil {
logErr.Printf("failed to open disk cache: %v", err)
return 1
}
fpmcagg, err = os.OpenFile(filepath.Join(argv.cacheDir, "mappings-agg.cache"), os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
logErr.Printf("failed to open mappings cache cache: %v", err)
return 1
}
defer fpmcagg.Close()
// we do not want to delay shutdown for saving cache
// defer func() {
// if err := dc.Close(); err != nil {
// logErr.Printf("failed to close disk cache: %v", err)
// }
// }()
}
mcagg := pcache.LoadMappingsCacheFile(fpmcagg, argv.configAggregator.MappingCacheSize, argv.configAggregator.MappingCacheTTL)

argv.configAgent.AggregatorAddresses = strings.Split(argv.aggAddr, ",")

Expand All @@ -234,7 +242,7 @@ func runMain() int {
}
mainAgent(aesPwd, dc)
case "aggregator":
mainAggregator(aesPwd, dc)
mainAggregator(aesPwd, dc, mcagg)
case "ingress_proxy":
if len(argv.configAgent.AggregatorAddresses) != 3 {
logErr.Printf("-agg-addr must contain comma-separated list of 3 aggregators (1 shard is recommended)")
Expand Down Expand Up @@ -550,7 +558,7 @@ loop:
return 0
}

func mainAggregator(aesPwd string, dc *pcache.DiskCache) int {
func mainAggregator(aesPwd string, dc *pcache.DiskCache, mcagg *pcache.MappingsCache) int {
startDiscCacheTime := time.Now() // we only have disk cache before. Be carefull when redesigning
if err := aggregator.ValidateConfigAggregator(argv.configAggregator); err != nil {
logErr.Printf("%s", err)
Expand All @@ -563,7 +571,7 @@ func mainAggregator(aesPwd string, dc *pcache.DiskCache) int {
logErr.Printf("--agg-addr to listen must be specified")
return 1
}
agg, err := aggregator.MakeAggregator(dc, argv.cacheDir, argv.aggAddr, aesPwd, argv.configAggregator, argv.customHostName, argv.logLevel == "trace")
agg, err := aggregator.MakeAggregator(dc, mcagg, argv.cacheDir, argv.aggAddr, aesPwd, argv.configAggregator, argv.customHostName, argv.logLevel == "trace")
if err != nil {
logErr.Printf("%v", err)
return 1
Expand Down Expand Up @@ -600,6 +608,8 @@ loop:
logOk.Printf("4. Waiting RPC clients to receive responses and disconnect...")
agg.WaitRPCServer(10 * time.Second)
shutdownInfo.StopRPCServer = shutdownInfoDuration(&now).Nanoseconds()
_ = mcagg.Save()
shutdownInfo.SaveMappings = shutdownInfoDuration(&now).Nanoseconds()
shutdownInfo.FinishShutdownTime = now.UnixNano()
shutdownInfoSave(argv.cacheDir, shutdownInfo)
logOk.Printf("Bye")
Expand Down
26 changes: 18 additions & 8 deletions internal/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Config struct {
SkipShards int // if cluster is extended, first shard might be almost full, so we can skip them for some time.
BuiltinNewSharding bool

MappingCacheSize int64
MappingCacheTTL int

// "remote write" was never used (so never tested) and was dropped
RemoteWriteEnabled bool
RemoteWriteAddr string
Expand Down Expand Up @@ -68,14 +71,18 @@ func DefaultConfig() Config {
SaveSecondsImmediately: false,
StatsHouseEnv: "production",
BuiltinNewSharding: false, // false by default because agent deploy is slow, should be enabled after full deploy and then removed
RemoteWriteEnabled: false,
RemoteWriteAddr: ":13380",
RemoteWritePath: "/write",
AutoCreate: true,
DisableRemoteConfig: false,
DisableNoSampleAgent: false,
HardwareMetricResolution: 5,
HardwareSlowMetricResolution: 15,

MappingCacheSize: 100 << 20,
MappingCacheTTL: 86400,

RemoteWriteEnabled: false,
RemoteWriteAddr: ":13380",
RemoteWritePath: "/write",
AutoCreate: true,
DisableRemoteConfig: false,
DisableNoSampleAgent: false,
HardwareMetricResolution: 5,
HardwareSlowMetricResolution: 15,
}
}

Expand All @@ -94,6 +101,9 @@ func (c *Config) Bind(f *flag.FlagSet, d Config, legacyVerb bool) {
f.BoolVar(&c.SaveSecondsImmediately, "save-seconds-immediately", d.SaveSecondsImmediately, "Save data to disk as soon as second is ready. When false, data is saved after first unsuccessful send.")
f.StringVar(&c.StatsHouseEnv, "statshouse-env", d.StatsHouseEnv, "Fill key0 with this value in built-in statistics. 'production', 'staging1', 'staging2', 'staging3' values are allowed.")

f.Int64Var(&c.MappingCacheSize, "mappings-cache-size", d.MappingCacheSize, "Mappings cache size both in memory and on disk.")
f.IntVar(&c.MappingCacheTTL, "mappings-cache-ttl", d.MappingCacheTTL, "Mappings cache item TTL since last used.")

f.BoolVar(&c.RemoteWriteEnabled, "remote-write-enabled", d.RemoteWriteEnabled, "Serve prometheus remote write endpoint (deprecated).")
f.StringVar(&c.RemoteWriteAddr, "remote-write-addr", d.RemoteWriteAddr, "Prometheus remote write listen address (deprecated).")
f.StringVar(&c.RemoteWritePath, "remote-write-path", d.RemoteWritePath, "Prometheus remote write path (deprecated).")
Expand Down
7 changes: 6 additions & 1 deletion internal/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type (
metricStorage *metajournal.MetricsStorage
testConnection *TestConnection
tagsMapper *TagsMapper
tagsMapper2 *tagsMapper2
mcagg *pcache.MappingsCache

scrape *scrapeServer
autoCreate *autoCreate
Expand All @@ -133,7 +135,7 @@ func (b *aggregatorBucket) CancelHijack(hctx *rpc.HandlerContext) {
}

// aggregator is also run in this method
func MakeAggregator(dc *pcache.DiskCache, storageDir string, listenAddr string, aesPwd string, config ConfigAggregator, hostName string, logTrace bool) (*Aggregator, error) {
func MakeAggregator(dc *pcache.DiskCache, mcagg *pcache.MappingsCache, storageDir string, listenAddr string, aesPwd string, config ConfigAggregator, hostName string, logTrace bool) (*Aggregator, error) {
if dc == nil { // TODO - make sure aggregator works without cache dir?
return nil, fmt.Errorf("aggregator cannot run without -cache-dir for now")
}
Expand Down Expand Up @@ -219,6 +221,7 @@ func MakeAggregator(dc *pcache.DiskCache, storageDir string, listenAddr string,
buildArchTag: format.GetBuildArchKey(runtime.GOARCH),
addresses: addresses,
tagMappingBootstrapResponse: tagMappingBootstrapResponse,
mcagg: mcagg,
}
errNoAutoCreate := &rpc.Error{Code: data_model.RPCErrorNoAutoCreate}
a.h = tlstatshouse.Handler{
Expand Down Expand Up @@ -302,6 +305,7 @@ func MakeAggregator(dc *pcache.DiskCache, storageDir string, listenAddr string,

a.testConnection = MakeTestConnection()
a.tagsMapper = NewTagsMapper(a, a.sh2, a.metricStorage, dc, metricMetaLoader, a.config.Cluster)
a.tagsMapper2 = NewTagsMapper2(a, a.sh2, a.metricStorage, metricMetaLoader, a.config.Cluster)

a.aggregatorHost = a.tagsMapper.mapTagAtStartup(a.hostName, format.BuiltinMetricNameBudgetAggregatorHost)

Expand All @@ -316,6 +320,7 @@ func MakeAggregator(dc *pcache.DiskCache, storageDir string, listenAddr string,
a.insertsSema = semaphore.NewWeighted(a.insertsSemaSize)
_ = a.insertsSema.Acquire(context.Background(), a.insertsSemaSize)

go a.tagsMapper2.goRun()
go a.goTicker()
for i := 0; i < a.config.RecentInserters; i++ {
go a.goInsert(a.insertsSema, a.cancelInsertsCtx, a.bucketsToSend, i)
Expand Down
43 changes: 43 additions & 0 deletions internal/aggregator/aggregator_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
nowUnix := uint32(now.Unix())
receiveDelay := now.Sub(time.Unix(int64(args.Time), 0)).Seconds()
// All hosts must be valid and non-empty
hostName := string(args.Header.HostName) // allocate once
hostTagId := a.tagsMapper.mapOrFlood(now, args.Header.HostName, format.BuiltinMetricNameBudgetHost, false)
ownerTagId := a.tagsMapper.mapOrFlood(now, args.Header.Owner, format.BuiltinMetricNameBudgetOwner, false)
if ownerTagId == 0 {
Expand Down Expand Up @@ -351,6 +352,9 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
lockedShard := -1
var newKeys []data_model.Key
var usedMetrics []int32
unknownTags := map[string]format.CreateMappingExtra{}
mappingHits := 0
mappingMisses := 0

// We do not want to decompress under lock, so we decompress before ifs, then rarely throw away decompressed data.

Expand Down Expand Up @@ -412,6 +416,43 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
}
s := aggBucket.lockShard(&lockedShard, sID)
// If agents send lots of strings, this loop is non-trivial amount of work.
// May be, if mappingHits + mappingMisses > some limit, we should simply copy strings to STags
for i, str := range item.Skeys {
if len(str) == 0 {
continue
}
if m, ok := a.mcagg.GetValueBytes(aggBucket.time, str); ok {
mappingHits++
k.Tags[i] = m
if len(resp.Mappings) < maxSendKnownTagsToAgent {
// we could send the same mapping, but we do not want deduplicate here
resp.Mappings = append(resp.Mappings, tlstatshouse.Mapping{
Str: string(str),
Value: m,
})
}
continue
}
mappingMisses++
astr := string(str) // allocate here
if len(unknownTags) < maxUknownTagsInBucket {
if _, ok := unknownTags[astr]; !ok { // TODO - benchmark if checking before adding is faster or slower
unknownTags[astr] = format.CreateMappingExtra{
MetricID: k.Metric,
TagIDKey: int32(i + format.TagIDShift),
ClientEnv: k.Tags[0],
AgentEnv: agentEnv,
Route: route,
BuildArch: buildArch,
HostName: hostName,
Host: hostTagId,
}
}
}
k.SetSTag(i, astr)
}

mi, created := s.GetOrCreateMultiItem(&k, data_model.AggregatorStringTopCapacity, nil)
mi.MergeWithTLMultiItem(rng, &item, hostTagId)
if created {
Expand All @@ -423,6 +464,8 @@ func (a *Aggregator) handleSendSourceBucketAny(hctx *rpc.HandlerContext, args tl
}
aggBucket.lockShard(&lockedShard, -1)

a.tagsMapper2.AddUnknownTags(unknownTags, aggBucket.time)

aggBucket.mu.Lock()

if aggBucket.usedMetrics == nil {
Expand Down
6 changes: 6 additions & 0 deletions internal/aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type ConfigAggregatorRemote struct {
DenyOldAgents bool
MirrorChWrite bool
WriteToV3First bool
MappingCacheSize int64
MappingCacheTTL int
}

type ConfigAggregator struct {
Expand Down Expand Up @@ -77,6 +79,8 @@ func DefaultConfigAggregator() ConfigAggregator {
DenyOldAgents: true,
MirrorChWrite: true,
WriteToV3First: false,
MappingCacheSize: 1 << 30,
MappingCacheTTL: 86400 * 7,
},
}
}
Expand All @@ -91,6 +95,8 @@ func (c *ConfigAggregatorRemote) Bind(f *flag.FlagSet, d ConfigAggregatorRemote,
f.BoolVar(&c.DenyOldAgents, "deny-old-agents", d.DenyOldAgents, "Statshouse will ignore data from outdated agents")
f.BoolVar(&c.MirrorChWrite, "mirror-ch-writes", d.MirrorChWrite, "Write metrics into both v3 and v2 tables")
f.BoolVar(&c.WriteToV3First, "write-to-v3-first", d.WriteToV3First, "Write metrics into v3 table first")
f.Int64Var(&c.MappingCacheSize, "mappings-cache-size-agg", d.MappingCacheSize, "Mappings cache size both in memory and on disk for aggregator.")
f.IntVar(&c.MappingCacheTTL, "mappings-cache-ttl-agg", d.MappingCacheTTL, "Mappings cache item TTL since last used for aggregator.")
}
}

Expand Down

0 comments on commit 8543897

Please sign in to comment.