Skip to content

Commit

Permalink
fast prometheus unpacker
Browse files Browse the repository at this point in the history
  • Loading branch information
lomik committed May 31, 2019
1 parent 5e131de commit 9a1158a
Show file tree
Hide file tree
Showing 6 changed files with 595 additions and 3 deletions.
34 changes: 34 additions & 0 deletions helper/RowBinary/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,32 @@ func (wb *WriteBuffer) WriteBytes(p []byte) {
wb.Used += copy(wb.Body[wb.Used:], p)
}

func (wb *WriteBuffer) WriteTagged(tagged []string) {
l := len(tagged) - 1
if l < 0 {
return
}
for i := 0; i < len(tagged); i++ {
l += len(tagged[i])
}
wb.Used += binary.PutUvarint(wb.Body[wb.Used:], uint64(l))
for i := 0; i < len(tagged); i++ {
if i > 0 {
if i == 1 {
wb.Body[wb.Used] = '?'
} else {
if i%2 == 1 {
wb.Body[wb.Used] = '&'
} else {
wb.Body[wb.Used] = '='
}
}
wb.Used++
}
wb.Used += copy(wb.Body[wb.Used:], tagged[i])
}
}

func (wb *WriteBuffer) WriteString(s string) {
wb.Used += binary.PutUvarint(wb.Body[wb.Used:], uint64(len(s)))
wb.Used += copy(wb.Body[wb.Used:], []byte(s))
Expand Down Expand Up @@ -135,6 +161,14 @@ func (wb *WriteBuffer) WriteGraphitePoint(name []byte, value float64, timestamp
wb.WriteUint32(version)
}

func (wb *WriteBuffer) WriteGraphitePointTagged(labels []string, value float64, timestamp uint32, version uint32) {
wb.WriteTagged(labels)
wb.WriteFloat64(value)
wb.WriteUint32(timestamp)
wb.WriteUint16(TimestampToDays(timestamp))
wb.WriteUint32(version)
}

func (wb *WriteBuffer) CanWriteGraphitePoint(metricLen int) bool {
// required (maxvarint{5}, name{metricLen}, value{8}, timestamp{4}, days(date){2}, version{4})
return (WriteBufferSize - wb.Used) > (metricLen + 23)
Expand Down
28 changes: 28 additions & 0 deletions helper/RowBinary/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ func (w *Writer) WritePoint(metric string, value float64, timestamp int64) {
w.pointsWritten++
}

func (w *Writer) WritePointTagged(metric []string, value float64, timestamp int64) {
if w.wb == nil {
w.wb = GetWriteBuffer()
}
l := len(metric) - 1
for i := 0; i < len(metric); i++ {
l += len(metric[i])
}
if !w.wb.CanWriteGraphitePoint(l) {
w.Flush()
if l > WriteBufferSize-50 {
w.writeErrors++
return
// return fmt.Error("metric too long (%d bytes)", len(name))
}
w.wb = GetWriteBuffer()
}

w.wb.WriteGraphitePointTagged(
metric,
value,
uint32(timestamp),
w.now,
)

w.pointsWritten++
}

func (w *Writer) PointsWritten() uint32 {
return w.pointsWritten
}
Expand Down
118 changes: 118 additions & 0 deletions helper/pb/pb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package pb

import (
"errors"
"math"
)

var ErrorTruncated = errors.New("Message truncated")
var ErrorUnknownWireType = errors.New("Unknown wire type")

func WireType(p []byte) (byte, []byte, error) {
for i := 0; i < len(p); i++ {
if p[i]&0x80 == 0 { // last byte of varint
return p[0] & 0x07, p[i+1:], nil
}
}
return 0, p, ErrorTruncated
}

func Uint64(p []byte) (uint64, []byte, error) {
var ret uint64
for i := 0; i < len(p); i++ {
ret += uint64(p[i]&0x7f) << (7 * uint(i))
if p[i]&0x80 == 0 { // last byte of varint
return ret, p[i+1:], nil
}
}
return 0, p, ErrorTruncated
}

func Int64(p []byte) (int64, []byte, error) {
var ret int64
for i := 0; i < len(p); i++ {
ret += int64(p[i]&0x7f) << (7 * uint(i))
if p[i]&0x80 == 0 { // last byte of varint
return ret, p[i+1:], nil
}
}
return 0, p, ErrorTruncated
}

func Double(p []byte) (float64, []byte, error) {
if len(p) < 8 {
return 0, p, ErrorTruncated
}

u := uint64(p[0]) | (uint64(p[1]) << 8) | (uint64(p[2]) << 16) | (uint64(p[3]) << 24) |
(uint64(p[4]) << 32) | (uint64(p[5]) << 40) | (uint64(p[6]) << 48) | (uint64(p[7]) << 56)

return math.Float64frombits(u), p[8:], nil
}

func SkipVarint(p []byte) ([]byte, error) {
for i := 0; i < len(p); i++ {
if p[i]&0x80 == 0 { // last byte of varint
return p[i+1:], nil
}
}
return p, ErrorTruncated
}

func Bytes(p []byte) ([]byte, []byte, error) {
if len(p) < 1 {
return nil, p, ErrorTruncated
}

if p[0] < 128 { // single byte varint
l := int(p[0])
p = p[1:]
if len(p) < l {
return nil, p, ErrorTruncated
}

return p[:l], p[l:], nil
}

l64, p, err := Uint64(p)
if err != nil {
return nil, p, err
}

l := int(l64)

if len(p) < l {
return nil, p, ErrorTruncated
}

return p[:l], p[l:], nil
}

func Skip(p []byte) ([]byte, error) {
var wt byte
var err error
wt, p, err = WireType(p)
if err != nil {
return p, err
}

switch wt {
case 0: // Varint
return SkipVarint(p)
case 1: // 64-bit
if len(p) < 8 {
return p, ErrorTruncated
}
return p[8:], nil
case 2: // Length-delimited
_, p, err = Bytes(p)
return p, err
case 5: // 32-bit
if len(p) < 4 {
return p, ErrorTruncated
}
return p[4:], nil
default:
return p, ErrorUnknownWireType
}
}
146 changes: 146 additions & 0 deletions receiver/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,161 @@ import (
"go.uber.org/zap"

"github.com/lomik/carbon-clickhouse/helper/RowBinary"
"github.com/lomik/carbon-clickhouse/helper/pb"
"github.com/lomik/carbon-clickhouse/helper/prompb"
"github.com/lomik/carbon-clickhouse/helper/tags"
)

var nameLabel = []byte("\n\b__name__\x12")

type PrometheusRemoteWrite struct {
Base
listener *net.TCPListener
}

func (rcv *PrometheusRemoteWrite) unpackFast(ctx context.Context, bufBody []byte) error {

b := bufBody
var err error
var ts []byte
var sample []byte

metricBuffer := newPrometheusMetricBuffer()

var metric []string
var samplesOffset int

var value float64
var timestamp int64

writer := RowBinary.NewWriter(ctx, rcv.writeChan)

TimeSeriesLoop:
for len(b) > 0 {
if b[0] != 0x0a { // repeated prometheus.TimeSeries timeseries = 1;
if b, err = pb.Skip(b); err != nil {
break TimeSeriesLoop
}
continue TimeSeriesLoop
}

if ts, b, err = pb.Bytes(b[1:]); err != nil {
break TimeSeriesLoop
}

if metric, samplesOffset, err = metricBuffer.timeSeries(ts); err != nil {
break TimeSeriesLoop
}

ts = ts[samplesOffset:]
SamplesLoop:
for len(ts) > 0 {
if ts[0] != 0x12 { // repeated Sample samples = 2;
if ts, err = pb.Skip(ts); err != nil {
break TimeSeriesLoop
}
continue SamplesLoop
}

if sample, ts, err = pb.Bytes(ts[1:]); err != nil {
break TimeSeriesLoop
}

timestamp = 0
value = 0

for len(sample) > 0 {
switch sample[0] {
case 0x09: // double value = 1;
if value, sample, err = pb.Double(sample[1:]); err != nil {
break TimeSeriesLoop
}
case 0x10: // int64 timestamp = 2;
if timestamp, sample, err = pb.Int64(sample[1:]); err != nil {
break TimeSeriesLoop
}
default:
if sample, err = pb.Skip(sample); err != nil {
break TimeSeriesLoop
}
}
}

if math.IsNaN(value) {
continue SamplesLoop
}

if rcv.isDropString("", writer.Now(), uint32(timestamp/1000), value) {
continue
}

writer.WritePointTagged(metric, value, timestamp/1000)
}
}

if err != nil {
return err
}

writer.Flush()

if samplesCount := writer.PointsWritten(); samplesCount > 0 {
atomic.AddUint64(&rcv.stat.samplesReceived, uint64(samplesCount))
}

if writeErrors := writer.WriteErrors(); writeErrors > 0 {
atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors))
}

return nil
}

func (rcv *PrometheusRemoteWrite) unpackDefault(ctx context.Context, bufBody []byte) error {
var req prompb.WriteRequest
if err := proto.Unmarshal(bufBody, &req); err != nil {
return err
}

writer := RowBinary.NewWriter(ctx, rcv.writeChan)

series := req.GetTimeseries()
for i := 0; i < len(series); i++ {
metric, err := tags.Prometheus(series[i].GetLabels())

if err != nil {
return err
}

samples := series[i].GetSamples()

for j := 0; j < len(samples); j++ {
if samples[j] == nil {
continue
}
if math.IsNaN(samples[j].Value) {
continue
}
if rcv.isDropString(metric, writer.Now(), uint32(samples[j].Timestamp/1000), samples[j].Value) {
continue
}

writer.WritePoint(metric, samples[j].Value, samples[j].Timestamp/1000)
}
}

writer.Flush()

if samplesCount := writer.PointsWritten(); samplesCount > 0 {
atomic.AddUint64(&rcv.stat.samplesReceived, uint64(samplesCount))
}

if writeErrors := writer.WriteErrors(); writeErrors > 0 {
atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors))
}

return nil
}

func (rcv *PrometheusRemoteWrite) ServeHTTP(w http.ResponseWriter, r *http.Request) {
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
Expand Down
Loading

0 comments on commit 9a1158a

Please sign in to comment.