diff --git a/helper/RowBinary/write_buffer.go b/helper/RowBinary/write_buffer.go index af74d0dd..acab9282 100644 --- a/helper/RowBinary/write_buffer.go +++ b/helper/RowBinary/write_buffer.go @@ -76,6 +76,32 @@ func (wb *WriteBuffer) WriteBytes(p []byte) { wb.Used += copy(wb.Body[wb.Used:], p) } +func (wb *WriteBuffer) WriteTagged(tagged []string) { + l := len(tagged) - 1 + if l < 0 { + return + } + for i := 0; i < len(tagged); i++ { + l += len(tagged[i]) + } + wb.Used += binary.PutUvarint(wb.Body[wb.Used:], uint64(l)) + for i := 0; i < len(tagged); i++ { + if i > 0 { + if i == 1 { + wb.Body[wb.Used] = '?' + } else { + if i%2 == 1 { + wb.Body[wb.Used] = '&' + } else { + wb.Body[wb.Used] = '=' + } + } + wb.Used++ + } + wb.Used += copy(wb.Body[wb.Used:], tagged[i]) + } +} + func (wb *WriteBuffer) WriteString(s string) { wb.Used += binary.PutUvarint(wb.Body[wb.Used:], uint64(len(s))) wb.Used += copy(wb.Body[wb.Used:], []byte(s)) @@ -135,6 +161,14 @@ func (wb *WriteBuffer) WriteGraphitePoint(name []byte, value float64, timestamp wb.WriteUint32(version) } +func (wb *WriteBuffer) WriteGraphitePointTagged(labels []string, value float64, timestamp uint32, version uint32) { + wb.WriteTagged(labels) + wb.WriteFloat64(value) + wb.WriteUint32(timestamp) + wb.WriteUint16(TimestampToDays(timestamp)) + wb.WriteUint32(version) +} + func (wb *WriteBuffer) CanWriteGraphitePoint(metricLen int) bool { // required (maxvarint{5}, name{metricLen}, value{8}, timestamp{4}, days(date){2}, version{4}) return (WriteBufferSize - wb.Used) > (metricLen + 23) diff --git a/helper/RowBinary/writer.go b/helper/RowBinary/writer.go index 65040a01..1c5cd34b 100644 --- a/helper/RowBinary/writer.go +++ b/helper/RowBinary/writer.go @@ -94,6 +94,34 @@ func (w *Writer) WritePoint(metric string, value float64, timestamp int64) { w.pointsWritten++ } +func (w *Writer) WritePointTagged(metric []string, value float64, timestamp int64) { + if w.wb == nil { + w.wb = GetWriteBuffer() + } + l := len(metric) - 1 + for i := 0; i < len(metric); i++ { + l += len(metric[i]) + } + if !w.wb.CanWriteGraphitePoint(l) { + w.Flush() + if l > WriteBufferSize-50 { + w.writeErrors++ + return + // return fmt.Error("metric too long (%d bytes)", len(name)) + } + w.wb = GetWriteBuffer() + } + + w.wb.WriteGraphitePointTagged( + metric, + value, + uint32(timestamp), + w.now, + ) + + w.pointsWritten++ +} + func (w *Writer) PointsWritten() uint32 { return w.pointsWritten } diff --git a/helper/pb/pb.go b/helper/pb/pb.go new file mode 100644 index 00000000..a2fb70fb --- /dev/null +++ b/helper/pb/pb.go @@ -0,0 +1,118 @@ +package pb + +import ( + "errors" + "math" +) + +var ErrorTruncated = errors.New("Message truncated") +var ErrorUnknownWireType = errors.New("Unknown wire type") + +func WireType(p []byte) (byte, []byte, error) { + for i := 0; i < len(p); i++ { + if p[i]&0x80 == 0 { // last byte of varint + return p[0] & 0x07, p[i+1:], nil + } + } + return 0, p, ErrorTruncated +} + +func Uint64(p []byte) (uint64, []byte, error) { + var ret uint64 + for i := 0; i < len(p); i++ { + ret += uint64(p[i]&0x7f) << (7 * uint(i)) + if p[i]&0x80 == 0 { // last byte of varint + return ret, p[i+1:], nil + } + } + return 0, p, ErrorTruncated +} + +func Int64(p []byte) (int64, []byte, error) { + var ret int64 + for i := 0; i < len(p); i++ { + ret += int64(p[i]&0x7f) << (7 * uint(i)) + if p[i]&0x80 == 0 { // last byte of varint + return ret, p[i+1:], nil + } + } + return 0, p, ErrorTruncated +} + +func Double(p []byte) (float64, []byte, error) { + if len(p) < 8 { + return 0, p, ErrorTruncated + } + + u := uint64(p[0]) | (uint64(p[1]) << 8) | (uint64(p[2]) << 16) | (uint64(p[3]) << 24) | + (uint64(p[4]) << 32) | (uint64(p[5]) << 40) | (uint64(p[6]) << 48) | (uint64(p[7]) << 56) + + return math.Float64frombits(u), p[8:], nil +} + +func SkipVarint(p []byte) ([]byte, error) { + for i := 0; i < len(p); i++ { + if p[i]&0x80 == 0 { // last byte of varint + return p[i+1:], nil + } + } + return p, ErrorTruncated +} + +func Bytes(p []byte) ([]byte, []byte, error) { + if len(p) < 1 { + return nil, p, ErrorTruncated + } + + if p[0] < 128 { // single byte varint + l := int(p[0]) + p = p[1:] + if len(p) < l { + return nil, p, ErrorTruncated + } + + return p[:l], p[l:], nil + } + + l64, p, err := Uint64(p) + if err != nil { + return nil, p, err + } + + l := int(l64) + + if len(p) < l { + return nil, p, ErrorTruncated + } + + return p[:l], p[l:], nil +} + +func Skip(p []byte) ([]byte, error) { + var wt byte + var err error + wt, p, err = WireType(p) + if err != nil { + return p, err + } + + switch wt { + case 0: // Varint + return SkipVarint(p) + case 1: // 64-bit + if len(p) < 8 { + return p, ErrorTruncated + } + return p[8:], nil + case 2: // Length-delimited + _, p, err = Bytes(p) + return p, err + case 5: // 32-bit + if len(p) < 4 { + return p, ErrorTruncated + } + return p[4:], nil + default: + return p, ErrorUnknownWireType + } +} diff --git a/receiver/prometheus.go b/receiver/prometheus.go index b7467cf3..b36639c2 100644 --- a/receiver/prometheus.go +++ b/receiver/prometheus.go @@ -14,15 +14,161 @@ import ( "go.uber.org/zap" "github.com/lomik/carbon-clickhouse/helper/RowBinary" + "github.com/lomik/carbon-clickhouse/helper/pb" "github.com/lomik/carbon-clickhouse/helper/prompb" "github.com/lomik/carbon-clickhouse/helper/tags" ) +var nameLabel = []byte("\n\b__name__\x12") + type PrometheusRemoteWrite struct { Base listener *net.TCPListener } +func (rcv *PrometheusRemoteWrite) unpackFast(ctx context.Context, bufBody []byte) error { + + b := bufBody + var err error + var ts []byte + var sample []byte + + metricBuffer := newPrometheusMetricBuffer() + + var metric []string + var samplesOffset int + + var value float64 + var timestamp int64 + + writer := RowBinary.NewWriter(ctx, rcv.writeChan) + +TimeSeriesLoop: + for len(b) > 0 { + if b[0] != 0x0a { // repeated prometheus.TimeSeries timeseries = 1; + if b, err = pb.Skip(b); err != nil { + break TimeSeriesLoop + } + continue TimeSeriesLoop + } + + if ts, b, err = pb.Bytes(b[1:]); err != nil { + break TimeSeriesLoop + } + + if metric, samplesOffset, err = metricBuffer.timeSeries(ts); err != nil { + break TimeSeriesLoop + } + + ts = ts[samplesOffset:] + SamplesLoop: + for len(ts) > 0 { + if ts[0] != 0x12 { // repeated Sample samples = 2; + if ts, err = pb.Skip(ts); err != nil { + break TimeSeriesLoop + } + continue SamplesLoop + } + + if sample, ts, err = pb.Bytes(ts[1:]); err != nil { + break TimeSeriesLoop + } + + timestamp = 0 + value = 0 + + for len(sample) > 0 { + switch sample[0] { + case 0x09: // double value = 1; + if value, sample, err = pb.Double(sample[1:]); err != nil { + break TimeSeriesLoop + } + case 0x10: // int64 timestamp = 2; + if timestamp, sample, err = pb.Int64(sample[1:]); err != nil { + break TimeSeriesLoop + } + default: + if sample, err = pb.Skip(sample); err != nil { + break TimeSeriesLoop + } + } + } + + if math.IsNaN(value) { + continue SamplesLoop + } + + if rcv.isDropString("", writer.Now(), uint32(timestamp/1000), value) { + continue + } + + writer.WritePointTagged(metric, value, timestamp/1000) + } + } + + if err != nil { + return err + } + + writer.Flush() + + if samplesCount := writer.PointsWritten(); samplesCount > 0 { + atomic.AddUint64(&rcv.stat.samplesReceived, uint64(samplesCount)) + } + + if writeErrors := writer.WriteErrors(); writeErrors > 0 { + atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors)) + } + + return nil +} + +func (rcv *PrometheusRemoteWrite) unpackDefault(ctx context.Context, bufBody []byte) error { + var req prompb.WriteRequest + if err := proto.Unmarshal(bufBody, &req); err != nil { + return err + } + + writer := RowBinary.NewWriter(ctx, rcv.writeChan) + + series := req.GetTimeseries() + for i := 0; i < len(series); i++ { + metric, err := tags.Prometheus(series[i].GetLabels()) + + if err != nil { + return err + } + + samples := series[i].GetSamples() + + for j := 0; j < len(samples); j++ { + if samples[j] == nil { + continue + } + if math.IsNaN(samples[j].Value) { + continue + } + if rcv.isDropString(metric, writer.Now(), uint32(samples[j].Timestamp/1000), samples[j].Value) { + continue + } + + writer.WritePoint(metric, samples[j].Value, samples[j].Timestamp/1000) + } + } + + writer.Flush() + + if samplesCount := writer.PointsWritten(); samplesCount > 0 { + atomic.AddUint64(&rcv.stat.samplesReceived, uint64(samplesCount)) + } + + if writeErrors := writer.WriteErrors(); writeErrors > 0 { + atomic.AddUint64(&rcv.stat.errors, uint64(writeErrors)) + } + + return nil +} + func (rcv *PrometheusRemoteWrite) ServeHTTP(w http.ResponseWriter, r *http.Request) { compressed, err := ioutil.ReadAll(r.Body) if err != nil { diff --git a/receiver/prometheus_metric.go b/receiver/prometheus_metric.go new file mode 100644 index 00000000..8beea8b8 --- /dev/null +++ b/receiver/prometheus_metric.go @@ -0,0 +1,187 @@ +package receiver + +import ( + "bytes" + "errors" + "net/url" + + "github.com/lomik/carbon-clickhouse/helper/pb" +) + +type prometheusLabel struct { + name []byte + value []byte +} + +type prometheusMetricBuffer struct { + labels []prometheusLabel + queryEscape map[string]string + metric []string // ["name", "key1", "value1", ...] + metricUsed int + // labelsEncoded bytes.Buffer +} + +func newPrometheusMetricBuffer() *prometheusMetricBuffer { + return &prometheusMetricBuffer{ + labels: make([]prometheusLabel, 16), + queryEscape: make(map[string]string), + metric: make([]string, 128), + } +} + +func shouldEscapeByte(c byte) bool { + // §2.3 Unreserved characters (alphanum) + if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' { + return false + } + + switch c { + case '-', '_', '.', '~': // §2.3 Unreserved characters (mark) + return false + } + + // Everything else must be escaped. + return true +} + +func shouldQueryEscape(b []byte) bool { + for i := 0; i < len(b); i++ { + if shouldEscapeByte(b[i]) { + return true + } + } + + return false +} + +func (mb *prometheusMetricBuffer) urlQueryEscape(b []byte) string { + bs := unsafeString(b) + if !shouldQueryEscape(b) { + return bs + } + + v, exists := mb.queryEscape[bs] + if exists { + return v + } + + v = url.QueryEscape(bs) + mb.queryEscape[bs] = v + return v +} + +// timeSeries returns (["metric_name", "key1", "value1", "key2", "value2", ...], firstSamplesOffset, error) +func (mb *prometheusMetricBuffer) timeSeries(tsBody []byte) ([]string, int, error) { + ts := tsBody + labelIndex := 0 + var label []byte + var err error + var samplesOffset int + var firstLabelLen int + + for len(ts) > 0 { + switch ts[0] { + case 0x0a: // repeated Label labels = 1; + if len(mb.labels) < labelIndex+1 { + mb.labels = append(mb.labels, prometheusLabel{}) + } + mb.labels[labelIndex].name = nil + mb.labels[labelIndex].value = nil + + if label, ts, err = pb.Bytes(ts[1:]); err != nil { + return nil, 0, err + } + + if firstLabelLen == 0 { + firstLabelLen = len(tsBody) - len(ts) + } + + for len(label) > 0 { + switch label[0] { + case 0x0a: // string name = 1; + if mb.labels[labelIndex].name, label, err = pb.Bytes(label[1:]); err != nil { + return nil, 0, err + } + case 0x12: // string value = 2; + if mb.labels[labelIndex].value, label, err = pb.Bytes(label[1:]); err != nil { + return nil, 0, err + } + default: + if label, err = pb.Skip(label); err != nil { + return nil, 0, err + } + } + } + if mb.labels[labelIndex].name != nil && mb.labels[labelIndex].value != nil { + labelIndex++ + } + + continue + case 0x12: // repeated Sample samples = 2; + if samplesOffset == 0 { + samplesOffset = len(tsBody) - len(ts) + } + default: // unknown field + } + + if ts, err = pb.Skip(ts); err != nil { + return nil, 0, err + } + } + + var nameFound bool + for i := 0; i < labelIndex; i++ { + if bytes.Equal(mb.labels[i].name, []byte("__name__")) { + if i != 0 { + mb.labels[0], mb.labels[i] = mb.labels[i], mb.labels[0] + } + nameFound = true + break + } + } + + if !nameFound { + return nil, 0, errors.New("__name__ not found") + } + + mb.metric[0] = unsafeString(mb.labels[0].value) + + mb.metricUsed = 1 + + // sort.Slice(labels, func(i, j int) bool { return bytes.Compare(labels[i].name, labels[j].name) < 0 }) + // alloc free sort + // check for sort + + sortRequired := false + for i := 1; i < labelIndex-1; i++ { + if bytes.Compare(mb.labels[i].name, mb.labels[i+1].name) > 0 { + sortRequired = true + break + } + } + + if sortRequired { + for i := 1; i < labelIndex-1; i++ { + for j := i + 1; j < labelIndex; j++ { + if bytes.Compare(mb.labels[i].name, mb.labels[j].name) > 0 { + mb.labels[i], mb.labels[j] = mb.labels[j], mb.labels[i] + } + } + } + } + + if labelIndex*2 > len(mb.metric) { + for i := len(mb.metric); i < labelIndex*2; i++ { + mb.metric = append(mb.metric, "") + } + } + + for i := 1; i < labelIndex; i++ { + mb.metric[mb.metricUsed] = mb.urlQueryEscape(mb.labels[i].name) + mb.metricUsed++ + mb.metric[mb.metricUsed] = mb.urlQueryEscape(mb.labels[i].value) + mb.metricUsed++ + } + + return mb.metric[:mb.metricUsed], samplesOffset, nil +} diff --git a/receiver/prometheus_test.go b/receiver/prometheus_test.go index 513323ed..646f6782 100644 --- a/receiver/prometheus_test.go +++ b/receiver/prometheus_test.go @@ -1,11 +1,13 @@ package receiver import ( + "context" "encoding/base64" "testing" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/lomik/carbon-clickhouse/helper/RowBinary" "github.com/lomik/carbon-clickhouse/helper/prompb" "github.com/stretchr/testify/assert" ) @@ -25,6 +27,39 @@ func TestProm1Unpack(t *testing.T) { assert.Equal(1663, len(req.Timeseries)) } +func TestProm1UnpackFast(t *testing.T) { + assert := assert.New(t) + + compressed, err := base64.StdEncoding.DecodeString(prom1) + assert.NoError(err) + + reqBuf, err := snappy.Decode(nil, compressed) + assert.NoError(err) + + fast := &PrometheusRemoteWrite{} + fast.writeChan = make(chan *RowBinary.WriteBuffer, 1024) + assert.NoError(fast.unpackFast(context.Background(), reqBuf)) + + slow := &PrometheusRemoteWrite{} + slow.writeChan = make(chan *RowBinary.WriteBuffer, 1024) + assert.NoError(slow.unpackDefault(context.Background(), reqBuf)) + + var wbf, wbs *RowBinary.WriteBuffer +chanLoop: + for { + select { + case wbf = <-fast.writeChan: + default: + break chanLoop + } + + wbs = <-slow.writeChan + if !assert.Equal(wbf.Body[:wbf.Used], wbs.Body[:wbs.Used]) { + return + } + } +} + func BenchmarkProm1Snappy(b *testing.B) { assert := assert.New(b) @@ -42,7 +77,7 @@ func BenchmarkProm1Snappy(b *testing.B) { } } -func BenchmarkProm1Unmarshal(b *testing.B) { +func BenchmarkProm1UnpackFast(b *testing.B) { assert := assert.New(b) compressed, err := base64.StdEncoding.DecodeString(prom1) @@ -51,15 +86,59 @@ func BenchmarkProm1Unmarshal(b *testing.B) { reqBuf, err := snappy.Decode(nil, compressed) assert.NoError(err) - var req prompb.WriteRequest + h := &PrometheusRemoteWrite{} + h.writeChan = make(chan *RowBinary.WriteBuffer, 1024) + var wb *RowBinary.WriteBuffer + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err = h.unpackFast(context.Background(), reqBuf) + if err != nil { + b.Fatalf("Unexpected error: %#v", err.Error()) + } + readLoop: + for { + select { + case wb = <-h.writeChan: + wb.Release() + default: + break readLoop + } + } + } +} + +func BenchmarkProm1UnpackSlow(b *testing.B) { + assert := assert.New(b) + + compressed, err := base64.StdEncoding.DecodeString(prom1) + assert.NoError(err) + + reqBuf, err := snappy.Decode(nil, compressed) + assert.NoError(err) + + h := &PrometheusRemoteWrite{} + h.writeChan = make(chan *RowBinary.WriteBuffer, 1024) + var wb *RowBinary.WriteBuffer b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - err = proto.Unmarshal(reqBuf, &req) + err = h.unpackDefault(context.Background(), reqBuf) if err != nil { b.Fatalf("Unexpected error: %#v", err.Error()) } + readLoop: + for { + select { + case wb = <-h.writeChan: + wb.Release() + default: + break readLoop + } + } } }