Skip to content

Commit

Permalink
Merge pull request #121 from vinted/v2.48.1+vinted_patches
Browse files Browse the repository at this point in the history
*: pull in vinted patches
  • Loading branch information
GiedriusS authored Dec 19, 2023
2 parents 6389421 + e9f884b commit 163f77d
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 58 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/buf-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,3 @@ jobs:
- uses: bufbuild/buf-lint-action@bd48f53224baaaf0fc55de9a913e7680ca6dbea4 # v1.0.3
with:
input: 'prompb'
- uses: bufbuild/buf-breaking-action@f47418c81c00bfd65394628385593542f64db477 # v1.1.2
with:
input: 'prompb'
against: 'https://github.com/prometheus/prometheus.git#branch=main,ref=HEAD,subdir=prompb'
73 changes: 41 additions & 32 deletions prompb/remote.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions prompb/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ message ReadRequest {
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
STREAMED_XOR_CHUNKS = 1;

// It is like the previous one but series are maximally compacted into frames.
// This significantly improves throughput.
// Response headers:
// Content-Type: "application/x-compact-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
COMPACT_XOR_CHUNKS = 2;
}

// accepted_response_types allows negotiating the content type of the response.
Expand Down
32 changes: 21 additions & 11 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R
supported := map[prompb.ReadRequest_ResponseType]struct{}{
prompb.ReadRequest_SAMPLES: {},
prompb.ReadRequest_STREAMED_XOR_CHUNKS: {},
prompb.ReadRequest_COMPACT_XOR_CHUNKS: {},
}

for _, resType := range accepted {
Expand All @@ -225,11 +226,13 @@ func StreamChunkedReadResponses(
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
marshalPool *sync.Pool,
) (annotations.Annotations, error) {
freeImmediately bool,
) (annotations.Annotations, func(), error) {
var (
chks []prompb.Chunk
lbls []prompb.Label
iter chunks.Iterator
chks []prompb.Chunk
lbls []prompb.Label
iter chunks.Iterator
returnSlices []*[]byte
)

for ss.Next() {
Expand All @@ -250,7 +253,7 @@ func StreamChunkedReadResponses(
chk := iter.At()

if chk.Chunk == nil {
return ss.Warnings(), fmt.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
return ss.Warnings(), nil, fmt.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}

// Cut the chunk.
Expand All @@ -277,23 +280,30 @@ func StreamChunkedReadResponses(

b, err := resp.PooledMarshal(marshalPool)
if err != nil {
return ss.Warnings(), fmt.Errorf("marshal ChunkedReadResponse: %w", err)
return ss.Warnings(), nil, fmt.Errorf("marshal ChunkedReadResponse: %w", err)
}

if _, err := stream.Write(b); err != nil {
return ss.Warnings(), fmt.Errorf("write to stream: %w", err)
return ss.Warnings(), nil, fmt.Errorf("write to stream: %w", err)
}

// We immediately flush the Write() so it is safe to return to the pool.
marshalPool.Put(&b)
if freeImmediately {
marshalPool.Put(&b)
} else {
returnSlices = append(returnSlices, &b)
}
chks = chks[:0]
frameBytesLeft = maxDataLength
}
if err := iter.Err(); err != nil {
return ss.Warnings(), err
return ss.Warnings(), nil, err
}
}
return ss.Warnings(), ss.Err()
return ss.Warnings(), func() {
for _, rs := range returnSlices {
marshalPool.Put(rs)
}
}, ss.Err()
}

// MergeLabels merges two sets of sorted proto labels, preferring those in
Expand Down
6 changes: 3 additions & 3 deletions storage/remote/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func TestNegotiateResponseType(t *testing.T) {

_, err = NegotiateResponseType([]prompb.ReadRequest_ResponseType{20})
require.Error(t, err, "expected error due to not supported requested response types")
require.Equal(t, "server does not support any of the requested response types: [20]; supported: map[SAMPLES:{} STREAMED_XOR_CHUNKS:{}]", err.Error())
require.Equal(t, "server does not support any of the requested response types: [20]; supported: map[SAMPLES:{} STREAMED_XOR_CHUNKS:{} COMPACT_XOR_CHUNKS:{}]", err.Error())
}

func TestMergeLabels(t *testing.T) {
Expand Down Expand Up @@ -749,11 +749,11 @@ func TestStreamResponse(t *testing.T) {
}}
css := newMockChunkSeriesSet(testData)
writer := mockWriter{}
warning, err := StreamChunkedReadResponses(&writer, 0,
warning, _, err := StreamChunkedReadResponses(&writer, 0,
css,
nil,
maxBytesInFrame,
&sync.Pool{})
&sync.Pool{}, true)
require.Nil(t, warning)
require.Nil(t, err)
expectData := []*prompb.ChunkedSeries{{
Expand Down
89 changes: 89 additions & 0 deletions storage/remote/compact_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remote

import (
"encoding/binary"
"fmt"
"hash"
"hash/crc32"
"io"
"net/http"
)

type ChunkedCompactWriter struct {
writer io.Writer
flusher http.Flusher

crc32 hash.Hash32
writeBuf []byte
}

// NewChunkedCompactWriter constructs a ChunkedCompactWriter.
func NewChunkedCompactWriter(writeBuf []byte, w io.Writer, f http.Flusher) *ChunkedCompactWriter {
return &ChunkedCompactWriter{writeBuf: writeBuf[:0], writer: w, flusher: f, crc32: crc32.New(castagnoliTable)}
}

func (w *ChunkedCompactWriter) Close() error {
if len(w.writeBuf) == 0 {
return nil
}

n, err := w.writer.Write(w.writeBuf)
if err != nil {
return err
}
if n != len(w.writeBuf) {
return fmt.Errorf("short write: wrote %v but buffer is %v", n, len(w.writeBuf))
}

w.flusher.Flush()
return nil
}

func (w *ChunkedCompactWriter) Write(b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}

// len(b) + crc32 + binary.MaxVarintLen64
requiredSpaceBytes := len(b) + 32/8 + binary.MaxVarintLen64

leftSpaceBytes := cap(w.writeBuf) - len(w.writeBuf)

if len(w.writeBuf) > 0 && leftSpaceBytes < requiredSpaceBytes {
n, err := w.writer.Write(w.writeBuf)
if err != nil {
return n, err
}
if n != len(w.writeBuf) {
return n, fmt.Errorf("short write: wrote %v but buf is %v", n, len(w.writeBuf))
}
w.flusher.Flush()
w.writeBuf = w.writeBuf[:0]
}

var buf [binary.MaxVarintLen64]byte
v := binary.PutUvarint(buf[:], uint64(len(b)))
w.writeBuf = append(w.writeBuf, buf[:v]...)

w.crc32.Reset()
if _, err := w.crc32.Write(b); err != nil {
return 0, err
}
w.writeBuf = binary.BigEndian.AppendUint32(w.writeBuf, w.crc32.Sum32())
w.writeBuf = append(w.writeBuf, b...)

return len(b), nil
}
Loading

0 comments on commit 163f77d

Please sign in to comment.