diff --git a/receiver/prometheus.go b/receiver/prometheus.go index b36639c2..cd1c84cd 100644 --- a/receiver/prometheus.go +++ b/receiver/prometheus.go @@ -182,49 +182,11 @@ func (rcv *PrometheusRemoteWrite) ServeHTTP(w http.ResponseWriter, r *http.Reque return } - var req prompb.WriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + err = rcv.unpackFast(r.Context(), reqBuf) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } - - writer := RowBinary.NewWriter(r.Context(), rcv.writeChan) - - series := req.GetTimeseries() - for i := 0; i < len(series); i++ { - metric, err := tags.Prometheus(series[i].GetLabels()) - - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - samples := series[i].GetSamples() - - for j := 0; j < len(samples); j++ { - if samples[j] == nil { - continue - } - if math.IsNaN(samples[j].Value) { - continue - } - if rcv.isDropString(metric, writer.Now(), uint32(samples[j].Timestamp/1000), samples[j].Value) { - continue - } - - writer.WritePoint(metric, samples[j].Value, samples[j].Timestamp/1000) - } - } - - writer.Flush() - - if samplesCount := writer.PointsWritten(); samplesCount > 0 { - atomic.AddUint64(&rcv.stat.samplesReceived, uint64(samplesCount)) - } - - if writeErrors := writer.WriteErrors(); writeErrors > 0 { - atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors)) - } } // Addr returns binded socket address. For bind port 0 in tests