Skip to content

Commit

Permalink
enable fast parser
Browse files Browse the repository at this point in the history
  • Loading branch information
lomik committed May 31, 2019
1 parent 9a1158a commit 14f83a3
Showing 1 changed file with 3 additions and 41 deletions.
44 changes: 3 additions & 41 deletions receiver/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,49 +182,11 @@ func (rcv *PrometheusRemoteWrite) ServeHTTP(w http.ResponseWriter, r *http.Reque
return
}

var req prompb.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
err = rcv.unpackFast(r.Context(), reqBuf)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

writer := RowBinary.NewWriter(r.Context(), rcv.writeChan)

series := req.GetTimeseries()
for i := 0; i < len(series); i++ {
metric, err := tags.Prometheus(series[i].GetLabels())

if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

samples := series[i].GetSamples()

for j := 0; j < len(samples); j++ {
if samples[j] == nil {
continue
}
if math.IsNaN(samples[j].Value) {
continue
}
if rcv.isDropString(metric, writer.Now(), uint32(samples[j].Timestamp/1000), samples[j].Value) {
continue
}

writer.WritePoint(metric, samples[j].Value, samples[j].Timestamp/1000)
}
}

writer.Flush()

if samplesCount := writer.PointsWritten(); samplesCount > 0 {
atomic.AddUint64(&rcv.stat.samplesReceived, uint64(samplesCount))
}

if writeErrors := writer.WriteErrors(); writeErrors > 0 {
atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors))
}
}

// Addr returns binded socket address. For bind port 0 in tests
Expand Down

0 comments on commit 14f83a3

Please sign in to comment.