Skip to content

Commit

Permalink
mutate: add uncompressed blob size annotation
Browse files Browse the repository at this point in the history
Sometimes it is useful to have a reliable estimate of the space that
would be taken up by a layer when it is unpacked. Since gzip's headers
are only accurate up to 4GiB, larger layers can not be easily measured
without uncompressing them.

This commit adds a field to the gzip and zstd compressors to store
the bytes read from the stream during compression and uses that to set
an annotation on newly generated layers.

If the noop compressor is used, the annotation is not added, as the
existing "compressed" layer size would be the same.

Signed-off-by: Michael McCracken <[email protected]>
  • Loading branch information
mikemccracken committed Oct 23, 2023
1 parent 312b2db commit 8f65e8f
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
40 changes: 32 additions & 8 deletions mutate/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Compressor interface {
// indicate what compression type is used, e.g. "gzip", or "" for no
// compression.
MediaTypeSuffix() string

// BytesRead returns the number of bytes read from the uncompressed input
// stream at the current time, no guarantee of completion.
BytesRead() int64
}

type noopCompressor struct{}
Expand All @@ -34,28 +38,36 @@ func (nc noopCompressor) MediaTypeSuffix() string {
return ""
}

func (nc noopCompressor) BytesRead() int64 {
return -1
}

// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

// GzipCompressor provides gzip compression.
var GzipCompressor Compressor = gzipCompressor{}
var GzipCompressor Compressor = &gzipCompressor{}

type gzipCompressor struct{}
type gzipCompressor struct {
bytesRead int64
}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (gz *gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := system.Copy(gzw, reader); err != nil {
bytesRead, err := system.Copy(gzw, reader)
if err != nil {
log.Warnf("gzip compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
gz.bytesRead = bytesRead
if err := gzw.Close(); err != nil {
log.Warnf("gzip compress: could not close gzip writer: %v", err)
// #nosec G104
Expand All @@ -76,25 +88,33 @@ func (gz gzipCompressor) MediaTypeSuffix() string {
return "gzip"
}

func (gz gzipCompressor) BytesRead() int64 {
return gz.bytesRead
}

// ZstdCompressor provides zstd compression.
var ZstdCompressor Compressor = zstdCompressor{}
var ZstdCompressor Compressor = &zstdCompressor{}

type zstdCompressor struct{}
type zstdCompressor struct {
bytesRead int64
}

func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs *zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {

pipeReader, pipeWriter := io.Pipe()
zw, err := zstd.NewWriter(pipeWriter)
if err != nil {
return nil, err
}
go func() {
if _, err := system.Copy(zw, reader); err != nil {
bytesRead, err := system.Copy(zw, reader)
if err != nil {
log.Warnf("zstd compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
zs.bytesRead = bytesRead
if err := zw.Close(); err != nil {
log.Warnf("zstd compress: could not close gzip writer: %v", err)
// #nosec G104
Expand All @@ -114,3 +134,7 @@ func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs zstdCompressor) MediaTypeSuffix() string {
return "zstd"
}

func (zs zstdCompressor) BytesRead() int64 {
return zs.bytesRead
}
10 changes: 10 additions & 0 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package mutate

import (
"context"
"fmt"
"io"
"reflect"
"time"
Expand All @@ -36,6 +37,8 @@ import (
"github.com/pkg/errors"
)

const UmociUncompressedBlobSizeAnnotation = "ci.umo.uncompressed_blob_size"

func configPtr(c ispec.Image) *ispec.Image { return &c }
func manifestPtr(m ispec.Manifest) *ispec.Manifest { return &m }
func timePtr(t time.Time) *time.Time { return &t }
Expand Down Expand Up @@ -295,6 +298,13 @@ func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, histor
compressedMediaType = compressedMediaType + "+" + compressor.MediaTypeSuffix()
}

if annotations == nil {
annotations = make(map[string]string)
}
if compressor.BytesRead() >= 0 {
annotations[UmociUncompressedBlobSizeAnnotation] = fmt.Sprintf("%d", compressor.BytesRead())
}

// Append to layers.
desc = ispec.Descriptor{
MediaType: compressedMediaType,
Expand Down
15 changes: 12 additions & 3 deletions mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestMutateAdd(t *testing.T) {

// This isn't a valid image, but whatever.
buffer := bytes.NewBufferString("contents")
bufferSize := buffer.Len()

// Add a new layer.
annotations := map[string]string{"hello": "world"}
Expand Down Expand Up @@ -253,10 +254,18 @@ func TestMutateAdd(t *testing.T) {
if mutator.manifest.Layers[1].Digest == expectedLayerDigest {
t.Errorf("manifest.Layers[1].Digest is not the same!")
}
if len(mutator.manifest.Layers[1].Annotations) != 1 || mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!")
if len(mutator.manifest.Layers[1].Annotations) != 2 {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations['hello'] was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation] != fmt.Sprintf("%d", bufferSize) {
t.Errorf("manifest.Layers[1].Annotations['%s'] was not set correctly!: %q, expected %d",
UmociUncompressedBlobSizeAnnotation,
mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation],
bufferSize)
}

if mutator.manifest.Layers[1].Digest != newLayerDesc.Digest {
t.Fatalf("unexpected digest for new layer: %v %v", mutator.manifest.Layers[1].Digest, newLayerDesc.Digest)
}
Expand Down
4 changes: 2 additions & 2 deletions test/create.bats
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ function teardown() {
# Verify that the hashes of the blobs and index match (blobs are
# content-addressable so using hashes is a bit silly, but whatever).
known_hashes=(
"6f3716b8ae2bb4b3fd0a851f5a553946ea351ed5fdda0bd708d325363c0487f7 $IMAGE/index.json"
"73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc $IMAGE/blobs/sha256/73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc"
"b780d08bfed4850ab807b7c308682fae6868ed9c09c0c842063c418ebe2a19fb $IMAGE/index.json"
"27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231 $IMAGE/blobs/sha256/27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231"
"e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083 $IMAGE/blobs/sha256/e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083"
"f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89 $IMAGE/blobs/sha256/f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89"
)
Expand Down

0 comments on commit 8f65e8f

Please sign in to comment.