Skip to content

Commit

Permalink
Block-builder-scheduler initial structure (#9650)
Browse files Browse the repository at this point in the history
Adds initial structure for block-builder-scheduler living at pkg/blockbuilder/scheduler. Adds a new block-builder-scheduler target to the binary. This target currently just connects to Kafka and records per-partition start and end offset gauge metrics. In the future it will compute jobs and assign them to block-builder workers.
  • Loading branch information
seizethedave authored Oct 22, 2024
1 parent b27a999 commit 9a55c6f
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 19 deletions.
36 changes: 36 additions & 0 deletions pkg/blockbuilder/scheduler/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"flag"
"fmt"
"time"

"github.com/grafana/mimir/pkg/storage/ingest"
)

type Config struct {
BuilderConsumerGroup string `yaml:"builder_consumer_group"`
SchedulerConsumerGroup string `yaml:"scheduler_consumer_group"`
SchedulingInterval time.Duration `yaml:"kafka_monitor_interval"`

// Config parameters defined outside the block-builder-scheduler config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.BuilderConsumerGroup, "block-builder-scheduler.builder-consumer-group", "block-builder", "The Kafka consumer group used by block-builders.")
f.StringVar(&cfg.SchedulerConsumerGroup, "block-builder-scheduler.scheduler-consumer-group", "block-builder-scheduler", "The Kafka consumer group used by block-builder-scheduler.")
f.DurationVar(&cfg.SchedulingInterval, "block-builder-scheduler.scheduling-interval", 20*time.Second, "How frequently to recompute the schedule.")
}

func (cfg *Config) Validate() error {
if err := cfg.Kafka.Validate(); err != nil {
return err
}
if cfg.SchedulingInterval <= 0 {
return fmt.Errorf("scheduling interval (%d) must be positive", cfg.SchedulingInterval)
}
return nil
}
33 changes: 33 additions & 0 deletions pkg/blockbuilder/scheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type schedulerMetrics struct {
updateScheduleDuration prometheus.Histogram
partitionStartOffset *prometheus.GaugeVec
partitionEndOffset *prometheus.GaugeVec
}

func newSchedulerMetrics(reg prometheus.Registerer) schedulerMetrics {
return schedulerMetrics{
updateScheduleDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_blockbuilder_scheduler_schedule_update_seconds",
Help: "Time spent updating the schedule.",

NativeHistogramBucketFactor: 1.1,
}),
partitionStartOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_scheduler_partition_start_offset",
Help: "The observed start offset of each partition.",
}, []string{"partition"}),
partitionEndOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_scheduler_partition_end_offset",
Help: "The observed end offset of each partition.",
}, []string{"partition"}),
}
}
101 changes: 101 additions & 0 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/mimir/pkg/storage/ingest"
)

type BlockBuilderScheduler struct {
services.Service

kafkaClient *kgo.Client
cfg Config
logger log.Logger
register prometheus.Registerer
metrics schedulerMetrics
}

func New(
cfg Config,
logger log.Logger,
reg prometheus.Registerer,
) (*BlockBuilderScheduler, error) {
s := &BlockBuilderScheduler{
cfg: cfg,
logger: logger,
register: reg,
metrics: newSchedulerMetrics(reg),
}
s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
return s, nil
}

func (s *BlockBuilderScheduler) starting(context.Context) error {
kc, err := ingest.NewKafkaReaderClient(
s.cfg.Kafka,
ingest.NewKafkaReaderClientMetrics("block-builder-scheduler", s.register),
s.logger,
kgo.ConsumerGroup(s.cfg.SchedulerConsumerGroup),
// The scheduler simply monitors partitions. We don't want it committing offsets.
kgo.DisableAutoCommit(),
)
if err != nil {
return fmt.Errorf("creating kafka reader: %w", err)
}
s.kafkaClient = kc
return nil
}

func (s *BlockBuilderScheduler) stopping(_ error) error {
s.kafkaClient.Close()
return nil
}

func (s *BlockBuilderScheduler) running(ctx context.Context) error {
updateTick := time.NewTicker(s.cfg.SchedulingInterval)
defer updateTick.Stop()
for {
select {
case <-updateTick.C:
s.updateSchedule(ctx)
case <-ctx.Done():
return nil
}
}
}

func (s *BlockBuilderScheduler) updateSchedule(ctx context.Context) {
startTime := time.Now()
// Eventually this will also include job computation. But for now, collect partition data.
admin := kadm.NewClient(s.kafkaClient)

startOffsets, err := admin.ListStartOffsets(ctx, s.cfg.Kafka.Topic)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list start offsets", "err", err)
}
endOffsets, err := admin.ListEndOffsets(ctx, s.cfg.Kafka.Topic)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list end offsets", "err", err)
}

s.metrics.updateScheduleDuration.Observe(time.Since(startTime).Seconds())

startOffsets.Each(func(o kadm.ListedOffset) {
s.metrics.partitionStartOffset.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.Offset))
})
endOffsets.Each(func(o kadm.ListedOffset) {
s.metrics.partitionEndOffset.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.Offset))
})
}
40 changes: 22 additions & 18 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
alertstorelocal "github.com/grafana/mimir/pkg/alertmanager/alertstore/local"
"github.com/grafana/mimir/pkg/api"
"github.com/grafana/mimir/pkg/blockbuilder"
blockbuilderscheduler "github.com/grafana/mimir/pkg/blockbuilder/scheduler"
"github.com/grafana/mimir/pkg/compactor"
"github.com/grafana/mimir/pkg/continuoustest"
"github.com/grafana/mimir/pkg/distributor"
Expand Down Expand Up @@ -113,24 +114,25 @@ type Config struct {
PrintConfig bool `yaml:"-"`
ApplicationName string `yaml:"-"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
LimitsConfig validation.Limits `yaml:"limits"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
IngestStorage ingest.Config `yaml:"ingest_storage"`
BlockBuilder blockbuilder.Config `yaml:"block_builder" doc:"hidden"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
ActivityTracker activitytracker.Config `yaml:"activity_tracker"`
Vault vault.Config `yaml:"vault"`
API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Querier querier.Config `yaml:"querier"`
IngesterClient client.Config `yaml:"ingester_client"`
Ingester ingester.Config `yaml:"ingester"`
Flusher flusher.Config `yaml:"flusher"`
LimitsConfig validation.Limits `yaml:"limits"`
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
IngestStorage ingest.Config `yaml:"ingest_storage"`
BlockBuilder blockbuilder.Config `yaml:"block_builder" doc:"hidden"`
BlockBuilderScheduler blockbuilderscheduler.Config `yaml:"block_builder_scheduler" doc:"hidden"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
ActivityTracker activitytracker.Config `yaml:"activity_tracker"`
Vault vault.Config `yaml:"vault"`

Ruler ruler.Config `yaml:"ruler"`
RulerStorage rulestore.Config `yaml:"ruler_storage"`
Expand Down Expand Up @@ -184,6 +186,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.Frontend.RegisterFlags(f, logger)
c.IngestStorage.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f, logger)
c.BlockBuilderScheduler.RegisterFlags(f)
c.BlocksStorage.RegisterFlags(f)
c.Compactor.RegisterFlags(f, logger)
c.StoreGateway.RegisterFlags(f, logger)
Expand Down Expand Up @@ -733,6 +736,7 @@ type Mimir struct {
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
BlockBuilderScheduler *blockbuilderscheduler.BlockBuilderScheduler
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/grafana/mimir/pkg/alertmanager/alertstore/bucketclient"
"github.com/grafana/mimir/pkg/api"
"github.com/grafana/mimir/pkg/blockbuilder"
blockbuilderscheduler "github.com/grafana/mimir/pkg/blockbuilder/scheduler"
"github.com/grafana/mimir/pkg/compactor"
"github.com/grafana/mimir/pkg/continuoustest"
"github.com/grafana/mimir/pkg/distributor"
Expand Down Expand Up @@ -102,6 +103,7 @@ const (
TenantFederation string = "tenant-federation"
UsageStats string = "usage-stats"
BlockBuilder string = "block-builder"
BlockBuilderScheduler string = "block-builder-scheduler"
ContinuousTest string = "continuous-test"
All string = "all"

Expand Down Expand Up @@ -1093,6 +1095,17 @@ func (t *Mimir) initBlockBuilder() (_ services.Service, err error) {
return t.BlockBuilder, nil
}

func (t *Mimir) initBlockBuilderScheduler() (services.Service, error) {
t.Cfg.BlockBuilderScheduler.Kafka = t.Cfg.IngestStorage.KafkaConfig

s, err := blockbuilderscheduler.New(t.Cfg.BlockBuilderScheduler, util_log.Logger, t.Registerer)
if err != nil {
return nil, errors.Wrap(err, "block-builder-scheduler init")
}
t.BlockBuilderScheduler = s
return s, nil
}

func (t *Mimir) initContinuousTest() (services.Service, error) {
client, err := continuoustest.NewClient(t.Cfg.ContinuousTest.Client, util_log.Logger)
if err != nil {
Expand Down Expand Up @@ -1144,6 +1157,7 @@ func (t *Mimir) setupModuleManager() error {
mm.RegisterModule(TenantFederation, t.initTenantFederation, modules.UserInvisibleModule)
mm.RegisterModule(UsageStats, t.initUsageStats, modules.UserInvisibleModule)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(BlockBuilderScheduler, t.initBlockBuilderScheduler)
mm.RegisterModule(ContinuousTest, t.initContinuousTest)
mm.RegisterModule(Vault, t.initVault, modules.UserInvisibleModule)
mm.RegisterModule(Write, nil)
Expand Down Expand Up @@ -1180,6 +1194,7 @@ func (t *Mimir) setupModuleManager() error {
StoreGateway: {API, Overrides, MemberlistKV, Vault},
TenantFederation: {Queryable},
BlockBuilder: {API, Overrides},
BlockBuilderScheduler: {API},
ContinuousTest: {API},
Write: {Distributor, Ingester},
Read: {QueryFrontend, Querier},
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstRet
r.metrics.fetchedDiscardedRecordBytes.Add(float64(doubleFetchedBytes))
}

// fetchSingle attempts to find out the leader leader Kafka broker for a partition and then sends a fetch request to the leader of the fetchWant request and parses the responses
// fetchSingle attempts to find out the leader Kafka broker for a partition and then sends a fetch request to the leader of the fetchWant request and parses the responses
// fetchSingle returns a fetchResult which may or may not fulfil the entire fetchWant.
// If ctx is cancelled, fetchSingle will return an empty fetchResult without an error.
func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant) (fr fetchResult) {
Expand Down

0 comments on commit 9a55c6f

Please sign in to comment.