Skip to content

Commit

Permalink
Merge pull request #52 from vinted/v2.39.0_for_thanos_new_streamer
Browse files Browse the repository at this point in the history
remote: implement compact writer
  • Loading branch information
GiedriusS authored Jan 26, 2023
2 parents bac8c28 + 2949a5f commit 8a66b6d
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 46 deletions.
71 changes: 39 additions & 32 deletions prompb/remote.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions prompb/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ message ReadRequest {
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
STREAMED_XOR_CHUNKS = 1;

// It is like the previous one but series are maximally compacted into frames.
// This significantly improves throughput.
// Response headers:
// Content-Type: "application/x-compact-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
COMPACT_XOR_CHUNKS = 2;
}

// accepted_response_types allows negotiating the content type of the response.
Expand Down
30 changes: 20 additions & 10 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_SAMPLES: {},
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
prompb.ReadRequest_COMPACT_XOR_CHUNKS: {},
}

for _, resType := range accepted {
Expand All @@ -193,10 +194,12 @@ func StreamChunkedReadResponses(
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
marshalPool *sync.Pool,
) (storage.Warnings, error) {
freeImmediately bool,
) (storage.Warnings, func(), error) {
var (
chks []prompb.Chunk
lbls []prompb.Label
chks []prompb.Chunk
lbls []prompb.Label
returnSlices []*[]byte
)

for ss.Next() {
Expand All @@ -217,7 +220,7 @@ func StreamChunkedReadResponses(
chk := iter.At()

if chk.Chunk == nil {
return ss.Warnings(), fmt.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
return ss.Warnings(), nil, fmt.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}

// Cut the chunk.
Expand All @@ -244,23 +247,30 @@ func StreamChunkedReadResponses(

b, err := resp.PooledMarshal(marshalPool)
if err != nil {
return ss.Warnings(), fmt.Errorf("marshal ChunkedReadResponse: %w", err)
return ss.Warnings(), nil, fmt.Errorf("marshal ChunkedReadResponse: %w", err)
}

if _, err := stream.Write(b); err != nil {
return ss.Warnings(), fmt.Errorf("write to stream: %w", err)
return ss.Warnings(), nil, fmt.Errorf("write to stream: %w", err)
}

// We immediately flush the Write() so it is safe to return to the pool.
marshalPool.Put(&b)
if freeImmediately {
marshalPool.Put(&b)
} else {
returnSlices = append(returnSlices, &b)
}
chks = chks[:0]
frameBytesLeft = maxDataLength
}
if err := iter.Err(); err != nil {
return ss.Warnings(), err
return ss.Warnings(), nil, err
}
}
return ss.Warnings(), ss.Err()
return ss.Warnings(), func() {
for _, rs := range returnSlices {
marshalPool.Put(rs)
}
}, ss.Err()
}

// MergeLabels merges two sets of sorted proto labels, preferring those in
Expand Down
89 changes: 89 additions & 0 deletions storage/remote/compact_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
"encoding/binary"
"fmt"
"hash"
"hash/crc32"
"io"
"net/http"
)

type ChunkedCompactWriter struct {
writer io.Writer
flusher http.Flusher

crc32 hash.Hash32
writeBuf []byte
}

// NewChunkedCompactWriter constructs a ChunkedCompactWriter.
func NewChunkedCompactWriter(writeBuf []byte, w io.Writer, f http.Flusher) *ChunkedCompactWriter {
return &ChunkedCompactWriter{writeBuf: writeBuf[:0], writer: w, flusher: f, crc32: crc32.New(castagnoliTable)}
}

func (w *ChunkedCompactWriter) Close() error {
if len(w.writeBuf) == 0 {
return nil
}

n, err := w.writer.Write(w.writeBuf)
if err != nil {
return err
}
if n != len(w.writeBuf) {
return fmt.Errorf("short write: wrote %v but buffer is %v", n, len(w.writeBuf))
}

w.flusher.Flush()
return nil
}

func (w *ChunkedCompactWriter) Write(b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}

// len(b) + crc32 + binary.MaxVarintLen64
requiredSpaceBytes := len(b) + 32/8 + binary.MaxVarintLen64

leftSpaceBytes := cap(w.writeBuf) - len(w.writeBuf)

if len(w.writeBuf) > 0 && leftSpaceBytes < requiredSpaceBytes {
n, err := w.writer.Write(w.writeBuf)
if err != nil {
return n, err
}
if n != len(w.writeBuf) {
return n, fmt.Errorf("short write: wrote %v but buf is %v", n, len(w.writeBuf))
}
w.flusher.Flush()
w.writeBuf = w.writeBuf[:0]
}

var buf [binary.MaxVarintLen64]byte
v := binary.PutUvarint(buf[:], uint64(len(b)))
w.writeBuf = append(w.writeBuf, buf[:v]...)

w.crc32.Reset()
if _, err := w.crc32.Write(b); err != nil {
return 0, err
}
w.writeBuf = binary.BigEndian.AppendUint32(w.writeBuf, w.crc32.Sum32())
w.writeBuf = append(w.writeBuf, b...)

return len(b), nil
}
Loading

0 comments on commit 8a66b6d

Please sign in to comment.