diff --git a/pkg/kafka/partition/committer_refactor.go b/pkg/kafka/partition/committer_refactor.go new file mode 100644 index 000000000000..9873fa75f90f --- /dev/null +++ b/pkg/kafka/partition/committer_refactor.go @@ -0,0 +1,135 @@ +package partition + +import ( + "context" + "strconv" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" +) + +type refactoredPartitionCommitter struct { + commitRequestsTotal prometheus.Counter + commitRequestsLatency prometheus.Histogram + commitFailuresTotal prometheus.Counter + lastCommittedOffset prometheus.Gauge + + logger log.Logger + reader ReaderIfc + commitFreq time.Duration + + toCommit *atomic.Int64 + wg sync.WaitGroup + cancel context.CancelFunc +} + +func newRefactoredCommitter(reader ReaderIfc, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *refactoredPartitionCommitter { + c := &refactoredPartitionCommitter{ + logger: logger, + reader: reader, + commitFreq: commitFreq, + commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingest_storage_reader_offset_commit_requests_total", + Help: "Total number of requests issued to commit the last consumed offset (includes both successful and failed requests).", + ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))}, + }), + commitFailuresTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingest_storage_reader_offset_commit_failures_total", + Help: "Total number of failed requests to commit the last consumed offset.", + ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))}, + }), + commitRequestsLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingest_storage_reader_offset_commit_request_duration_seconds", + Help: "The duration of requests to commit the last consumed offset.", + ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))}, + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + Buckets: prometheus.DefBuckets, + }), + lastCommittedOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingest_storage_reader_last_committed_offset", + Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.", + ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))}, + }), + toCommit: atomic.NewInt64(-1), + } + + // Initialize the last committed offset metric to -1 to signal no offset has been committed yet + c.lastCommittedOffset.Set(-1) + + if commitFreq > 0 { + c.wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + go c.autoCommitLoop(ctx) + } + + return c +} + +func (c *refactoredPartitionCommitter) autoCommitLoop(ctx context.Context) { + defer c.wg.Done() + commitTicker := time.NewTicker(c.commitFreq) + defer commitTicker.Stop() + + previousOffset := c.toCommit.Load() + for { + select { + case <-ctx.Done(): + return + case <-commitTicker.C: + currOffset := c.toCommit.Load() + if currOffset == previousOffset { + continue + } + + if err := c.Commit(ctx, currOffset); err == nil { + previousOffset = currOffset + } + } + } +} + +func (c *refactoredPartitionCommitter) EnqueueOffset(o int64) { + if c.commitFreq > 0 { + c.toCommit.Store(o) + } +} + +func (c *refactoredPartitionCommitter) Commit(ctx context.Context, offset int64) error { + startTime := time.Now() + c.commitRequestsTotal.Inc() + + if err := c.reader.Commit(ctx, offset); err != nil { + level.Error(c.logger).Log("msg", "failed to commit offset", "err", err, "offset", offset) + c.commitFailuresTotal.Inc() + c.commitRequestsLatency.Observe(time.Since(startTime).Seconds()) + return err + } + + level.Debug(c.logger).Log("msg", "successfully committed offset", "offset", offset) + c.lastCommittedOffset.Set(float64(offset)) + c.commitRequestsLatency.Observe(time.Since(startTime).Seconds()) + return nil +} + +func (c *refactoredPartitionCommitter) Stop() { + if c.commitFreq <= 0 { + return + } + c.cancel() + c.wg.Wait() + + offset := c.toCommit.Load() + if offset < 0 { + return + } + // Commit has internal timeouts, so this call shouldn't block for too long + _ = c.Commit(context.Background(), offset) +} diff --git a/pkg/kafka/partition/reader_refactor.go b/pkg/kafka/partition/reader_refactor.go new file mode 100644 index 000000000000..ef6edaf64fdc --- /dev/null +++ b/pkg/kafka/partition/reader_refactor.go @@ -0,0 +1,299 @@ +package partition + +import ( + "context" + "fmt" + "math" + "strconv" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/multierror" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" +) + +type SpecialOffset int + +const ( + KafkaStartOffset SpecialOffset = -2 + KafkaEndOffset SpecialOffset = -1 +) + +type ReaderIfc interface { + Client() *kgo.Client + Topic() string + Partition() int32 + ConsumerGroup() string + FetchLastCommittedOffset(ctx context.Context) (int64, error) + FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) + Poll(ctx context.Context) ([]Record, error) + Commit(ctx context.Context, offset int64) error + // Set the target offset for consumption. reads will begin from here. + SetOffsetForConsumption(offset int64) +} + +// readerMetrics contains metrics specific to Kafka reading operations +type refactoredReaderMetrics struct { + recordsPerFetch prometheus.Histogram + fetchesErrors prometheus.Counter + fetchesTotal prometheus.Counter + fetchWaitDuration prometheus.Histogram + receiveDelay prometheus.Histogram + lastCommittedOffset prometheus.Gauge +} + +func newRefactoredReaderMetrics(r prometheus.Registerer, partitionID int32) *refactoredReaderMetrics { + return &refactoredReaderMetrics{ + fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_kafka_reader_fetch_wait_duration_seconds", + Help: "How long the reader spent waiting for a batch of records from Kafka.", + NativeHistogramBucketFactor: 1.1, + }), + recordsPerFetch: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_kafka_reader_records_per_fetch", + Help: "The number of records received in a single fetch operation.", + Buckets: prometheus.ExponentialBuckets(1, 2, 15), + }), + fetchesErrors: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_kafka_reader_fetch_errors_total", + Help: "The number of fetch errors encountered.", + }), + fetchesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_kafka_reader_fetches_total", + Help: "Total number of Kafka fetches performed.", + }), + receiveDelay: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_kafka_reader_receive_delay_seconds", + Help: "Delay between producing a record and receiving it.", + NativeHistogramZeroThreshold: math.Pow(2, -10), + NativeHistogramBucketFactor: 1.2, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), + }), + lastCommittedOffset: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingest_storage_reader_last_committed_offset", + Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.", + ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, + }), + } +} + +// Reader provides low-level access to Kafka partition reading operations +type RefactoredReader struct { + client *kgo.Client + topic string + partitionID int32 + consumerGroup string + metrics *refactoredReaderMetrics + logger log.Logger +} + +// NewReader creates a new Reader instance +func NewRefactoredReader( + client *kgo.Client, + topic string, + partitionID int32, + consumerGroup string, + logger log.Logger, + reg prometheus.Registerer, +) *RefactoredReader { + return &RefactoredReader{ + client: client, + topic: topic, + partitionID: partitionID, + consumerGroup: consumerGroup, + metrics: newRefactoredReaderMetrics(reg, partitionID), + logger: logger, + } +} + +// Client returns the underlying Kafka client +func (r *RefactoredReader) Client() *kgo.Client { + return r.client +} + +// Topic returns the topic being read +func (r *RefactoredReader) Topic() string { + return r.topic +} + +// Partition returns the partition being read +func (r *RefactoredReader) Partition() int32 { + return r.partitionID +} + +// ConsumerGroup returns the consumer group +func (r *RefactoredReader) ConsumerGroup() string { + return r.consumerGroup +} + +// FetchLastCommittedOffset retrieves the last committed offset for this partition +func (r *RefactoredReader) FetchLastCommittedOffset(ctx context.Context) (int64, error) { + req := kmsg.NewPtrOffsetFetchRequest() + req.Topics = []kmsg.OffsetFetchRequestTopic{{ + Topic: r.topic, + Partitions: []int32{r.partitionID}, + }} + req.Group = r.consumerGroup + + resps := r.client.RequestSharded(ctx, req) + + // Since we issued a request for only 1 partition, we expect exactly 1 response. + if expected, actual := 1, len(resps); actual != expected { + return 0, fmt.Errorf("unexpected number of responses: %d", len(resps)) + } + + // Ensure no error occurred. + res := resps[0] + if res.Err != nil { + return 0, res.Err + } + + // Parse the response. + fetchRes, ok := res.Resp.(*kmsg.OffsetFetchResponse) + if !ok { + return 0, errors.New("unexpected response type") + } + + if len(fetchRes.Groups) != 1 || + len(fetchRes.Groups[0].Topics) != 1 || + len(fetchRes.Groups[0].Topics[0].Partitions) != 1 { + return 0, errors.New("malformed response") + } + + partition := fetchRes.Groups[0].Topics[0].Partitions[0] + if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { + return 0, err + } + + return partition.Offset, nil +} + +// FetchPartitionOffset retrieves the offset for a specific position +func (r *RefactoredReader) FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error) { + partitionReq := kmsg.NewListOffsetsRequestTopicPartition() + partitionReq.Partition = r.partitionID + partitionReq.Timestamp = int64(position) + + topicReq := kmsg.NewListOffsetsRequestTopic() + topicReq.Topic = r.topic + topicReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{partitionReq} + + req := kmsg.NewPtrListOffsetsRequest() + req.IsolationLevel = 0 // 0 means READ_UNCOMMITTED. + req.Topics = []kmsg.ListOffsetsRequestTopic{topicReq} + + // Even if we share the same client, other in-flight requests are not canceled once this context is canceled + // (or its deadline is exceeded). We've verified it with a unit test. + resps := r.client.RequestSharded(ctx, req) + + // Since we issued a request for only 1 partition, we expect exactly 1 response. + if len(resps) != 1 { + return 0, fmt.Errorf("unexpected number of responses: %d", len(resps)) + } + + // Ensure no error occurred. + res := resps[0] + if res.Err != nil { + return 0, res.Err + } + + listRes, ok := res.Resp.(*kmsg.ListOffsetsResponse) + if !ok { + return 0, errors.New("unexpected response type") + } + + if len(listRes.Topics) != 1 || + len(listRes.Topics[0].Partitions) != 1 { + return 0, errors.New("malformed response") + } + + partition := listRes.Topics[0].Partitions[0] + if err := kerr.ErrorForCode(partition.ErrorCode); err != nil { + return 0, err + } + + return partition.Offset, nil +} + +// Poll retrieves the next batch of records from Kafka +func (r *RefactoredReader) Poll(ctx context.Context) ([]Record, error) { + start := time.Now() + fetches := r.client.PollFetches(ctx) + r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds()) + + // Record metrics + r.metrics.fetchesTotal.Add(float64(len(fetches))) + var numRecords int + fetches.EachRecord(func(record *kgo.Record) { + numRecords++ + r.metrics.receiveDelay.Observe(time.Since(record.Timestamp).Seconds()) + }) + r.metrics.recordsPerFetch.Observe(float64(numRecords)) + + // Handle errors + var errs multierror.MultiError + fetches.EachError(func(topic string, partition int32, err error) { + if errors.Is(err, context.Canceled) { + return + } + errs.Add(fmt.Errorf("topic %q, partition %d: %w", topic, partition, err)) + }) + if len(errs) > 0 { + r.metrics.fetchesErrors.Add(float64(len(errs))) + return nil, fmt.Errorf("fetch errors: %v", errs.Err()) + } + + // Build records slice + records := make([]Record, 0, fetches.NumRecords()) + fetches.EachRecord(func(rec *kgo.Record) { + if rec.Partition != r.partitionID { + return + } + records = append(records, Record{ + // This context carries the tracing data for this individual record; + // kotel populates this data when it fetches the messages. + Ctx: rec.Context, + TenantID: string(rec.Key), + Content: rec.Value, + Offset: rec.Offset, + }) + }) + + return records, nil +} + +func (r *RefactoredReader) SetOffsetForConsumption(offset int64) { + r.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + r.topic: {r.partitionID: kgo.NewOffset().At(offset)}, + }) +} + +// Commit commits an offset to the consumer group +func (r *RefactoredReader) Commit(ctx context.Context, offset int64) error { + admin := kadm.NewClient(r.client) + + // Commit the last consumed offset. + toCommit := kadm.Offsets{} + toCommit.AddOffset(r.topic, r.partitionID, offset, -1) + + committed, err := admin.CommitOffsets(ctx, r.consumerGroup, toCommit) + if err != nil { + return err + } else if !committed.Ok() { + return committed.Error() + } + + committedOffset, _ := committed.Lookup(r.topic, r.partitionID) + level.Debug(r.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At) + r.metrics.lastCommittedOffset.Set(float64(committedOffset.At)) + return nil +} diff --git a/pkg/kafka/partition/reader_service_refactor.go b/pkg/kafka/partition/reader_service_refactor.go new file mode 100644 index 000000000000..555e004866c0 --- /dev/null +++ b/pkg/kafka/partition/reader_service_refactor.go @@ -0,0 +1,354 @@ +package partition + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type serviceMetrics struct { + phase *prometheus.GaugeVec + receiveDelay *prometheus.HistogramVec + partition *prometheus.GaugeVec +} + +func newServiceMetrics(r prometheus.Registerer) *serviceMetrics { + return &serviceMetrics{ + partition: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_ingest_storage_reader_partition", + Help: "The partition ID assigned to this reader.", + }, []string{"id"}), + phase: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Name: "loki_ingest_storage_reader_phase", + Help: "The current phase of the consumer.", + }, []string{"phase"}), + receiveDelay: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Name: "loki_ingest_storage_reader_receive_delay_seconds", + Help: "Delay between producing a record and receiving it in the consumer.", + }, []string{"phase"}), + } +} + +type ReaderService struct { + services.Service + + cfg ReaderConfig + reader ReaderIfc + consumerFactory ConsumerFactory + logger log.Logger + metrics *serviceMetrics + committer *refactoredPartitionCommitter + + lastProcessedOffset int64 +} + +type ReaderConfig struct { + TargetConsumerLagAtStartup time.Duration + MaxConsumerLagAtStartup time.Duration + ConsumerGroupOffsetCommitFreq time.Duration +} + +func NewReaderService( + cfg ReaderConfig, + reader ReaderIfc, + consumerFactory ConsumerFactory, + logger log.Logger, + reg prometheus.Registerer, +) *ReaderService { + s := &ReaderService{ + cfg: cfg, + reader: reader, + consumerFactory: consumerFactory, + logger: logger, + metrics: newServiceMetrics(reg), + lastProcessedOffset: -1, + } + + // Create the committer + s.committer = newRefactoredCommitter(reader, cfg.ConsumerGroupOffsetCommitFreq, logger, reg) + + s.Service = services.NewBasicService(s.starting, s.running, nil) + return s +} + +func (s *ReaderService) starting(ctx context.Context) error { + level.Info(s.logger).Log( + "msg", "starting reader service", + "partition", s.reader.Partition(), + "consumer_group", s.reader.ConsumerGroup(), + ) + s.metrics.reportStarting(s.reader.Partition()) + + // Fetch the last committed offset to determine where to start reading + lastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx) + if err != nil { + return fmt.Errorf("fetching last committed offset: %w", err) + } + + if lastCommittedOffset == int64(KafkaEndOffset) { + level.Warn(s.logger).Log( + "msg", "no committed offset found for partition, starting from the beginning", + "partition", s.reader.Partition(), + "consumer_group", s.reader.ConsumerGroup(), + ) + lastCommittedOffset = int64(KafkaStartOffset) + } + + if lastCommittedOffset > 0 { + lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset. + } + + s.reader.SetOffsetForConsumption(lastCommittedOffset) + + if targetLag, maxLag := s.cfg.TargetConsumerLagAtStartup, s.cfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { + consumer, err := s.consumerFactory(s.committer) + if err != nil { + return fmt.Errorf("creating consumer: %w", err) + } + + cancelCtx, cancel := context.WithCancel(ctx) + recordsChan := make(chan []Record) + wait := consumer.Start(cancelCtx, recordsChan) + + defer func() { + close(recordsChan) + cancel() + wait() + }() + + err = s.processNextFetchesUntilTargetOrMaxLagHonored(ctx, maxLag, targetLag, recordsChan) + if err != nil { + level.Error(s.logger).Log( + "msg", "failed to catch up to max lag", + "partition", s.reader.Partition(), + "consumer_group", s.reader.ConsumerGroup(), + "err", err, + ) + return err + } + } + + return nil +} + +func (s *ReaderService) running(ctx context.Context) error { + level.Info(s.logger).Log( + "msg", "reader service running", + "partition", s.reader.Partition(), + "consumer_group", s.reader.ConsumerGroup(), + ) + s.metrics.reportRunning(s.reader.Partition()) + + consumer, err := s.consumerFactory(s.committer) + if err != nil { + return fmt.Errorf("creating consumer: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + recordsChan := s.startFetchLoop(ctx) + wait := consumer.Start(ctx, recordsChan) + wait() + s.committer.Stop() + return nil +} + +// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored. +// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be +// reached once this function successfully returns (only maxLag is guaranteed). +func (s *ReaderService) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error { + logger := log.With(s.logger, "target_lag", targetLag, "max_lag", maxLag) + level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored") + + attempts := []func() (time.Duration, error){ + // First process fetches until at least the max lag is honored. + func() (time.Duration, error) { + return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) + }, + + // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag + // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). + // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed + // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously + // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data + // written in the meanwhile. + func() (time.Duration, error) { + timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) + defer cancel() + + return s.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since) + }, + + // If the target lag hasn't been reached with the previous attempt then we'll move on. However, + // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. + func() (time.Duration, error) { + return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since) + }, + } + + var currLag time.Duration + for _, attempt := range attempts { + var err error + + currLag, err = attempt() + if errors.Is(err, errWaitTargetLagDeadlineExceeded) { + continue + } + if err != nil { + return err + } + if currLag <= targetLag { + level.Info(logger).Log( + "msg", "partition reader consumed partition and current lag is lower or equal to configured target consumer lag", + "last_consumed_offset", s.committer.lastCommittedOffset, + "current_lag", currLag, + ) + return nil + } + } + + level.Warn(logger).Log( + "msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag", + "last_consumed_offset", s.committer.lastCommittedOffset, + "current_lag", currLag, + ) + return nil +} + +func (p *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) { + boff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: time.Second, + MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded). + }) + currLag := time.Duration(0) + + for boff.Ongoing() { + // Send a direct request to the Kafka backend to fetch the partition start offset. + partitionStartOffset, err := p.reader.FetchPartitionOffset(ctx, kafkaStartOffset) + if err != nil { + level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) + boff.Wait() + continue + } + + consumerGroupLastCommittedOffset, err := p.reader.FetchLastCommittedOffset(ctx) + if err != nil { + level.Warn(logger).Log("msg", "partition reader failed to fetch last committed offset", "err", err) + boff.Wait() + continue + } + + // Send a direct request to the Kafka backend to fetch the last produced offset. + // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further + // latency. + lastProducedOffsetRequestedAt := time.Now() + lastProducedOffset, err := p.reader.FetchPartitionOffset(ctx, kafkaEndOffset) + if err != nil { + level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) + boff.Wait() + continue + } + lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset. + + level.Debug(logger).Log("msg", "fetched latest offset information", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + + // Ensure there are some records to consume. For example, if the partition has been inactive for a long + // time and all its records have been deleted, the partition start offset may be > 0 but there are no + // records to actually consume. + if partitionStartOffset > lastProducedOffset { + level.Info(logger).Log("msg", "partition reader found no records to consume because partition is empty", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + return 0, nil + } + + if consumerGroupLastCommittedOffset == lastProducedOffset { + level.Info(logger).Log("msg", "partition reader found no records to consume because it is already up-to-date", "last_committed_offset", consumerGroupLastCommittedOffset, "last_produced_offset", lastProducedOffset) + return 0, nil + } + + // This message is NOT expected to be logged with a very high rate. In this log we display the last measured + // lag. If we don't have it (lag is zero value), then it will not be logged. + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) + + for boff.Ongoing() { + // Continue reading until we reached the desired offset. + if lastProducedOffset <= p.lastProcessedOffset { + break + } + if time.Since(lastProducedOffsetRequestedAt) > time.Minute { + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) + } + + timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + records, err := p.reader.Poll(timedCtx) + cancel() + + if err != nil { + level.Error(logger).Log("msg", "error polling records", "err", err) + continue + } + if len(records) > 0 { + recordsChan <- records + p.lastProcessedOffset = records[len(records)-1].Offset + } + } + + if boff.Err() != nil { + return 0, boff.ErrCause() + } + + // If it took less than the max desired lag to replay the partition + // then we can stop here, otherwise we'll have to redo it. + if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag { + return currLag, nil + } + } + + return 0, boff.ErrCause() +} + +func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record { + records := make(chan []Record) + go func() { + defer close(records) + for { + select { + case <-ctx.Done(): + return + default: + res, err := s.reader.Poll(ctx) + if err != nil { + level.Error(s.logger).Log("msg", "error polling records", "err", err) + continue + } + if len(res) > 0 { + records <- res + s.lastProcessedOffset = res[len(res)-1].Offset + } + } + } + }() + return records +} + +func (s *serviceMetrics) reportStarting(partition int32) { + s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1) + s.phase.WithLabelValues(phaseStarting).Set(1) + s.phase.WithLabelValues(phaseRunning).Set(0) +} + +func (s *serviceMetrics) reportRunning(partition int32) { + s.partition.WithLabelValues(strconv.Itoa(int(partition))).Set(1) + s.phase.WithLabelValues(phaseStarting).Set(0) + s.phase.WithLabelValues(phaseRunning).Set(1) +}