Skip to content

Commit

Permalink
Merge pull request #8 from mikemccracken/2024.04.15/stacker/cp-uncomp…
Browse files Browse the repository at this point in the history
…ressed-size-annotation

mutate: add uncompressed blob size annotation
  • Loading branch information
rchincha authored Apr 15, 2024
2 parents 3b35cc5 + db2143f commit 1d2c8ab
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 11 deletions.
35 changes: 29 additions & 6 deletions mutate/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type Compressor interface {

// WithOpt applies an option and can be chained.
WithOpt(CompressorOpt) Compressor

// BytesRead returns the number of bytes read from the uncompressed input
// stream at the current time, no guarantee of completion.
BytesRead() int64
}

// CompressorOpt is a compressor option which can be used to configure a
Expand All @@ -41,6 +45,10 @@ func (nc noopCompressor) MediaTypeSuffix() string {
return ""
}

func (nc noopCompressor) BytesRead() int64 {
return -1
}

// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

Expand All @@ -55,22 +63,25 @@ type GzipBlockSize int

type gzipCompressor struct {
blockSize int
bytesRead int64
}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (gz *gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(gz.blockSize, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := system.Copy(gzw, reader); err != nil {
bytesRead, err := system.Copy(gzw, reader)
if err != nil {
log.Warnf("gzip compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
gz.bytesRead = bytesRead
if err := gzw.Close(); err != nil {
log.Warnf("gzip compress: could not close gzip writer: %v", err)
// #nosec G104
Expand Down Expand Up @@ -100,25 +111,33 @@ func (gz gzipCompressor) WithOpt(opt CompressorOpt) Compressor {
return gz
}

func (gz gzipCompressor) BytesRead() int64 {
return gz.bytesRead
}

// ZstdCompressor provides zstd compression.
var ZstdCompressor Compressor = zstdCompressor{}
var ZstdCompressor Compressor = &zstdCompressor{}

type zstdCompressor struct{}
type zstdCompressor struct {
bytesRead int64
}

func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs *zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {

pipeReader, pipeWriter := io.Pipe()
zw, err := zstd.NewWriter(pipeWriter)
if err != nil {
return nil, err
}
go func() {
if _, err := system.Copy(zw, reader); err != nil {
bytesRead, err := system.Copy(zw, reader)
if err != nil {
log.Warnf("zstd compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
zs.bytesRead = bytesRead
if err := zw.Close(); err != nil {
log.Warnf("zstd compress: could not close gzip writer: %v", err)
// #nosec G104
Expand All @@ -142,3 +161,7 @@ func (zs zstdCompressor) MediaTypeSuffix() string {
func (zs zstdCompressor) WithOpt(CompressorOpt) Compressor {
return zs
}

func (zs zstdCompressor) BytesRead() int64 {
return zs.bytesRead
}
10 changes: 10 additions & 0 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package mutate

import (
"context"
"fmt"
"io"
"reflect"
"time"
Expand All @@ -36,6 +37,8 @@ import (
"github.com/pkg/errors"
)

const UmociUncompressedBlobSizeAnnotation = "ci.umo.uncompressed_blob_size"

func configPtr(c ispec.Image) *ispec.Image { return &c }
func manifestPtr(m ispec.Manifest) *ispec.Manifest { return &m }
func timePtr(t time.Time) *time.Time { return &t }
Expand Down Expand Up @@ -295,6 +298,13 @@ func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, histor
compressedMediaType = compressedMediaType + "+" + compressor.MediaTypeSuffix()
}

if annotations == nil {
annotations = make(map[string]string)
}
if compressor.BytesRead() >= 0 {
annotations[UmociUncompressedBlobSizeAnnotation] = fmt.Sprintf("%d", compressor.BytesRead())
}

// Append to layers.
desc = ispec.Descriptor{
MediaType: compressedMediaType,
Expand Down
15 changes: 12 additions & 3 deletions mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestMutateAdd(t *testing.T) {

// This isn't a valid image, but whatever.
buffer := bytes.NewBufferString("contents")
bufferSize := buffer.Len()

// Add a new layer.
annotations := map[string]string{"hello": "world"}
Expand Down Expand Up @@ -253,10 +254,18 @@ func TestMutateAdd(t *testing.T) {
if mutator.manifest.Layers[1].Digest == expectedLayerDigest {
t.Errorf("manifest.Layers[1].Digest is not the same!")
}
if len(mutator.manifest.Layers[1].Annotations) != 1 || mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!")
if len(mutator.manifest.Layers[1].Annotations) != 2 {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations['hello'] was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation] != fmt.Sprintf("%d", bufferSize) {
t.Errorf("manifest.Layers[1].Annotations['%s'] was not set correctly!: %q, expected %d",
UmociUncompressedBlobSizeAnnotation,
mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation],
bufferSize)
}

if mutator.manifest.Layers[1].Digest != newLayerDesc.Digest {
t.Fatalf("unexpected digest for new layer: %v %v", mutator.manifest.Layers[1].Digest, newLayerDesc.Digest)
}
Expand Down
4 changes: 2 additions & 2 deletions test/create.bats
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ function teardown() {
# Verify that the hashes of the blobs and index match (blobs are
# content-addressable so using hashes is a bit silly, but whatever).
known_hashes=(
"6f3716b8ae2bb4b3fd0a851f5a553946ea351ed5fdda0bd708d325363c0487f7 $IMAGE/index.json"
"73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc $IMAGE/blobs/sha256/73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc"
"b780d08bfed4850ab807b7c308682fae6868ed9c09c0c842063c418ebe2a19fb $IMAGE/index.json"
"27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231 $IMAGE/blobs/sha256/27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231"
"e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083 $IMAGE/blobs/sha256/e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083"
"f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89 $IMAGE/blobs/sha256/f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89"
)
Expand Down

0 comments on commit 1d2c8ab

Please sign in to comment.