Skip to content

Commit

Permalink
mutate: add uncompressed blob size annotation
Browse files Browse the repository at this point in the history
Sometimes it is useful to have a reliable estimate of the space that
would be taken up by a layer when it is unpacked. Since gzip's headers
are only accurate up to 4GiB, larger layers can not be easily measured
without uncompressing them.

This commit adds a field to the gzip and zstd compressors to store
the bytes read from the stream during compression and uses that to set
an annotation on newly generated layers.

If the noop compressor is used, the annotation is not added, as the
existing "compressed" layer size would be the same.

Signed-off-by: Michael McCracken <[email protected]>
(cherry picked from commit 8f65e8f)
  • Loading branch information
mikemccracken committed Apr 15, 2024
1 parent 3b35cc5 commit db2143f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 11 deletions.
35 changes: 29 additions & 6 deletions mutate/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type Compressor interface {

// WithOpt applies an option and can be chained.
WithOpt(CompressorOpt) Compressor

// BytesRead returns the number of bytes read from the uncompressed input
// stream at the current time, no guarantee of completion.
BytesRead() int64
}

// CompressorOpt is a compressor option which can be used to configure a
Expand All @@ -41,6 +45,10 @@ func (nc noopCompressor) MediaTypeSuffix() string {
return ""
}

func (nc noopCompressor) BytesRead() int64 {
return -1
}

// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

Expand All @@ -55,22 +63,25 @@ type GzipBlockSize int

type gzipCompressor struct {
blockSize int
bytesRead int64
}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (gz *gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(gz.blockSize, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := system.Copy(gzw, reader); err != nil {
bytesRead, err := system.Copy(gzw, reader)
if err != nil {
log.Warnf("gzip compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
gz.bytesRead = bytesRead
if err := gzw.Close(); err != nil {
log.Warnf("gzip compress: could not close gzip writer: %v", err)
// #nosec G104
Expand Down Expand Up @@ -100,25 +111,33 @@ func (gz gzipCompressor) WithOpt(opt CompressorOpt) Compressor {
return gz
}

func (gz gzipCompressor) BytesRead() int64 {
return gz.bytesRead
}

// ZstdCompressor provides zstd compression.
var ZstdCompressor Compressor = zstdCompressor{}
var ZstdCompressor Compressor = &zstdCompressor{}

type zstdCompressor struct{}
type zstdCompressor struct {
bytesRead int64
}

func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs *zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {

pipeReader, pipeWriter := io.Pipe()
zw, err := zstd.NewWriter(pipeWriter)
if err != nil {
return nil, err
}
go func() {
if _, err := system.Copy(zw, reader); err != nil {
bytesRead, err := system.Copy(zw, reader)
if err != nil {
log.Warnf("zstd compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
zs.bytesRead = bytesRead
if err := zw.Close(); err != nil {
log.Warnf("zstd compress: could not close gzip writer: %v", err)
// #nosec G104
Expand All @@ -142,3 +161,7 @@ func (zs zstdCompressor) MediaTypeSuffix() string {
func (zs zstdCompressor) WithOpt(CompressorOpt) Compressor {
return zs
}

func (zs zstdCompressor) BytesRead() int64 {
return zs.bytesRead
}
10 changes: 10 additions & 0 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package mutate

import (
"context"
"fmt"
"io"
"reflect"
"time"
Expand All @@ -36,6 +37,8 @@ import (
"github.com/pkg/errors"
)

const UmociUncompressedBlobSizeAnnotation = "ci.umo.uncompressed_blob_size"

func configPtr(c ispec.Image) *ispec.Image { return &c }
func manifestPtr(m ispec.Manifest) *ispec.Manifest { return &m }
func timePtr(t time.Time) *time.Time { return &t }
Expand Down Expand Up @@ -295,6 +298,13 @@ func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, histor
compressedMediaType = compressedMediaType + "+" + compressor.MediaTypeSuffix()
}

if annotations == nil {
annotations = make(map[string]string)
}
if compressor.BytesRead() >= 0 {
annotations[UmociUncompressedBlobSizeAnnotation] = fmt.Sprintf("%d", compressor.BytesRead())
}

// Append to layers.
desc = ispec.Descriptor{
MediaType: compressedMediaType,
Expand Down
15 changes: 12 additions & 3 deletions mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestMutateAdd(t *testing.T) {

// This isn't a valid image, but whatever.
buffer := bytes.NewBufferString("contents")
bufferSize := buffer.Len()

// Add a new layer.
annotations := map[string]string{"hello": "world"}
Expand Down Expand Up @@ -253,10 +254,18 @@ func TestMutateAdd(t *testing.T) {
if mutator.manifest.Layers[1].Digest == expectedLayerDigest {
t.Errorf("manifest.Layers[1].Digest is not the same!")
}
if len(mutator.manifest.Layers[1].Annotations) != 1 || mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!")
if len(mutator.manifest.Layers[1].Annotations) != 2 {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations['hello'] was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation] != fmt.Sprintf("%d", bufferSize) {
t.Errorf("manifest.Layers[1].Annotations['%s'] was not set correctly!: %q, expected %d",
UmociUncompressedBlobSizeAnnotation,
mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation],
bufferSize)
}

if mutator.manifest.Layers[1].Digest != newLayerDesc.Digest {
t.Fatalf("unexpected digest for new layer: %v %v", mutator.manifest.Layers[1].Digest, newLayerDesc.Digest)
}
Expand Down
4 changes: 2 additions & 2 deletions test/create.bats
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ function teardown() {
# Verify that the hashes of the blobs and index match (blobs are
# content-addressable so using hashes is a bit silly, but whatever).
known_hashes=(
"6f3716b8ae2bb4b3fd0a851f5a553946ea351ed5fdda0bd708d325363c0487f7 $IMAGE/index.json"
"73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc $IMAGE/blobs/sha256/73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc"
"b780d08bfed4850ab807b7c308682fae6868ed9c09c0c842063c418ebe2a19fb $IMAGE/index.json"
"27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231 $IMAGE/blobs/sha256/27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231"
"e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083 $IMAGE/blobs/sha256/e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083"
"f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89 $IMAGE/blobs/sha256/f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89"
)
Expand Down

0 comments on commit db2143f

Please sign in to comment.