Skip to content

Commit

Permalink
address comment; handle histogram partial append errors
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jun 4, 2024
1 parent ca0ba27 commit 25d380d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 83 deletions.
144 changes: 61 additions & 83 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,65 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
firstPartialErr = errFn()
}
}

handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels) (rollback bool) {
// Check if the error is a soft error we can proceed on. If so, we keep track
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})

case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
perLabelSetSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})

case errors.Is(cause, histogram.ErrHistogramSpanNegativeOffset):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, histogram.ErrHistogramSpansBucketsMismatch):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, histogram.ErrHistogramNegativeBucketCount):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, histogram.ErrHistogramCountNotBigEnough):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, histogram.ErrHistogramCountMismatch):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

default:
rollback = true
}
return
}
)

// Walk the samples, appending them to the users database
Expand Down Expand Up @@ -1121,50 +1180,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

failedSamplesCount++

// Check if the error is a soft error we can proceed on. If so, we keep track
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
continue

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
perLabelSetSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels); !rollback {
continue
}

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
Expand Down Expand Up @@ -1203,49 +1221,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

failedSamplesCount++

// Check if the error is a soft error we can proceed on. If so, we keep track
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
continue

case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
continue

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
continue
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
updateFirstPartial(func() error {
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
})
if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels); !rollback {
continue
}

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func newLabelSetCounter(limiter *Limiter) *labelSetCounter {
}

func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTSDB, metric labels.Labels) error {
labels.Labels{}.String()
return m.limiter.AssertMaxSeriesPerLabelSet(u.userID, metric, func(set validation.MaxSeriesPerLabelSet) (int, error) {
s := m.shards[util.HashFP(model.Fingerprint(set.Hash))%numMetricCounterShards]
s.RLock()
Expand Down

0 comments on commit 25d380d

Please sign in to comment.