Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: improve fetching tracing #9361

6 changes: 3 additions & 3 deletions development/mimir-ingest-storage/docker-compose.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ std.manifestYamlDoc({
'-ingester.ring.prefix=exclusive-prefix',
'-ingest-storage.kafka.consume-from-position-at-startup=start',
'-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
'-ingest-storage.kafka.replay-shards=2',
'-ingest-storage.kafka.batch-size=150',
'-ingest-storage.kafka.replay-concurrency=4',
'-ingest-storage.kafka.ingestion-concurrency=2',
'-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
'-ingest-storage.kafka.fetch-concurrency=4',
'-ingest-storage.kafka.records-per-fetch=6000',
],
extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'],
Expand Down
2 changes: 1 addition & 1 deletion development/mimir-ingest-storage/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@
"command":
- "sh"
- "-c"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=start -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.replay-shards=2 -ingest-storage.kafka.batch-size=150 -ingest-storage.kafka.replay-concurrency=4 -ingest-storage.kafka.records-per-fetch=6000"
- "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=start -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.fetch-concurrency=4 -ingest-storage.kafka.records-per-fetch=6000"
"depends_on":
"kafka_1":
"condition": "service_healthy"
Expand Down
32 changes: 18 additions & 14 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"math"
"os"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -799,10 +798,12 @@ type fetchResult struct {
kgo.FetchPartition
ctx context.Context
fetchedBytes int

waitingToBePickedUpFromOrderedFetchesSpan opentracing.Span
}

func (fr fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) {
var logger log.Logger = spanlogger.FromContext(fr.ctx, log.NewLogfmtLogger(os.Stderr))
func (fr *fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) {
var logger log.Logger = spanlogger.FromContext(fr.ctx, log.NewNopLogger())

msg := "fetched records"
if fr.Err != nil {
Expand Down Expand Up @@ -835,11 +836,12 @@ func (fr fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) {
)
}

func (fr fetchResult) logOrderedFetch() {
if fr.ctx == nil {
return
}
spanlogger.FromContext(fr.ctx, log.NewNopLogger()).DebugLog("msg", "fetch result is enqueued for consuming")
func (fr *fetchResult) startWaitingForConsumption() {
fr.waitingToBePickedUpFromOrderedFetchesSpan, fr.ctx = opentracing.StartSpanFromContext(fr.ctx, "fetchResult.waitingForConsumption")
}

func (fr *fetchResult) finishWaitingForConsumption() {
fr.waitingToBePickedUpFromOrderedFetchesSpan.Finish()
}

func newEmptyFetchResult(ctx context.Context, err error) fetchResult {
Expand Down Expand Up @@ -1059,6 +1061,9 @@ func (r *concurrentFetchers) parseFetchResponse(ctx context.Context, startOffset
rawPartitionResp := resp.Topics[0].Partitions[0]
partition, _ := kgo.ProcessRespPartition(parseOptions, &rawPartitionResp, observeMetrics)
partition.EachRecord(r.tracer.OnFetchRecordBuffered)
partition.EachRecord(func(r *kgo.Record) {
spanlogger.FromContext(r.Context, log.NewNopLogger()).DebugLog("msg", "received record")
})

fetchedBytes := len(rawPartitionResp.RecordBatches)
if !r.trackCompressedBytes {
Expand Down Expand Up @@ -1115,17 +1120,17 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa

for w := range wants {
// Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested.
wantSpan, ctx := opentracing.StartSpanFromContext(ctx, "concurrentFetcher.fetch")
wantSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch")
wantSpan.SetTag("start_offset", w.startOffset)
wantSpan.SetTag("end_offset", w.endOffset)

for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ {
attemptSpan, ctx := opentracing.StartSpanFromContext(ctx, "concurrentFetcher.fetch.attempt")
attemptSpan, ctx := spanlogger.NewWithLogger(ctx, wantSpan, "concurrentFetcher.fetch.attempt")
attemptSpan.SetTag("attempt", attempt)

f := r.fetchSingle(ctx, w)
if f.Err != nil {
w = handleKafkaFetchErr(f.Err, w, errBackoff, newRecordsProducedBackoff, r.startOffsets, r.client, spanlogger.FromContext(ctx, logger))
w = handleKafkaFetchErr(f.Err, w, errBackoff, newRecordsProducedBackoff, r.startOffsets, r.client, attemptSpan)
}
if len(f.Records) == 0 {
// Typically if we had an error, then there wouldn't be any records.
Expand All @@ -1141,8 +1146,7 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa
errBackoff.Reset()
newRecordsProducedBackoff.Reset()

// Propagate the span context to consuming the records.
f.ctx = ctx
f.startWaitingForConsumption()
select {
case w.result <- f:
case <-ctx.Done():
Expand Down Expand Up @@ -1208,10 +1212,10 @@ func (r *concurrentFetchers) runFetchers(ctx context.Context, startOffset int64)
}
nextFetch = nextFetch.UpdateBytesPerRecord(result.fetchedBytes, len(result.Records))
bufferedResult = result
bufferedResult.logOrderedFetch()
readyBufferedResults = r.orderedFetches

case readyBufferedResults <- bufferedResult:
bufferedResult.finishWaitingForConsumption()
readyBufferedResults = nil
bufferedResult = fetchResult{}
}
Expand Down
Loading