Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mutate: add uncompressed blob size annotation #510

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions mutate/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Compressor interface {
// indicate what compression type is used, e.g. "gzip", or "" for no
// compression.
MediaTypeSuffix() string

// BytesRead returns the number of bytes read from the uncompressed input
// stream at the current time, no guarantee of completion.
BytesRead() int64
}

type noopCompressor struct{}
Expand All @@ -34,28 +38,36 @@ func (nc noopCompressor) MediaTypeSuffix() string {
return ""
}

func (nc noopCompressor) BytesRead() int64 {
return -1
}

// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

// GzipCompressor provides gzip compression.
var GzipCompressor Compressor = gzipCompressor{}
var GzipCompressor Compressor = &gzipCompressor{}

type gzipCompressor struct{}
type gzipCompressor struct {
bytesRead int64
}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (gz *gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
if _, err := system.Copy(gzw, reader); err != nil {
bytesRead, err := system.Copy(gzw, reader)
if err != nil {
log.Warnf("gzip compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
gz.bytesRead = bytesRead
if err := gzw.Close(); err != nil {
log.Warnf("gzip compress: could not close gzip writer: %v", err)
// #nosec G104
Expand All @@ -76,25 +88,33 @@ func (gz gzipCompressor) MediaTypeSuffix() string {
return "gzip"
}

func (gz gzipCompressor) BytesRead() int64 {
return gz.bytesRead
}

// ZstdCompressor provides zstd compression.
var ZstdCompressor Compressor = zstdCompressor{}
var ZstdCompressor Compressor = &zstdCompressor{}

type zstdCompressor struct{}
type zstdCompressor struct {
bytesRead int64
}

func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs *zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {

pipeReader, pipeWriter := io.Pipe()
zw, err := zstd.NewWriter(pipeWriter)
if err != nil {
return nil, err
}
go func() {
if _, err := system.Copy(zw, reader); err != nil {
bytesRead, err := system.Copy(zw, reader)
if err != nil {
log.Warnf("zstd compress: could not compress layer: %v", err)
// #nosec G104
_ = pipeWriter.CloseWithError(errors.Wrap(err, "compressing layer"))
return
}
zs.bytesRead = bytesRead
if err := zw.Close(); err != nil {
log.Warnf("zstd compress: could not close gzip writer: %v", err)
// #nosec G104
Expand All @@ -114,3 +134,7 @@ func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs zstdCompressor) MediaTypeSuffix() string {
return "zstd"
}

func (zs zstdCompressor) BytesRead() int64 {
return zs.bytesRead
}
10 changes: 10 additions & 0 deletions mutate/mutate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package mutate

import (
"context"
"fmt"
"io"
"reflect"
"time"
Expand All @@ -36,6 +37,8 @@ import (
"github.com/pkg/errors"
)

const UmociUncompressedBlobSizeAnnotation = "ci.umo.uncompressed_blob_size"

func configPtr(c ispec.Image) *ispec.Image { return &c }
func manifestPtr(m ispec.Manifest) *ispec.Manifest { return &m }
func timePtr(t time.Time) *time.Time { return &t }
Expand Down Expand Up @@ -295,6 +298,13 @@ func (m *Mutator) Add(ctx context.Context, mediaType string, r io.Reader, histor
compressedMediaType = compressedMediaType + "+" + compressor.MediaTypeSuffix()
}

if annotations == nil {
annotations = make(map[string]string)
}
if compressor.BytesRead() >= 0 {
annotations[UmociUncompressedBlobSizeAnnotation] = fmt.Sprintf("%d", compressor.BytesRead())
}

// Append to layers.
desc = ispec.Descriptor{
MediaType: compressedMediaType,
Expand Down
15 changes: 12 additions & 3 deletions mutate/mutate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestMutateAdd(t *testing.T) {

// This isn't a valid image, but whatever.
buffer := bytes.NewBufferString("contents")
bufferSize := buffer.Len()

// Add a new layer.
annotations := map[string]string{"hello": "world"}
Expand Down Expand Up @@ -253,10 +254,18 @@ func TestMutateAdd(t *testing.T) {
if mutator.manifest.Layers[1].Digest == expectedLayerDigest {
t.Errorf("manifest.Layers[1].Digest is not the same!")
}
if len(mutator.manifest.Layers[1].Annotations) != 1 || mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!")
if len(mutator.manifest.Layers[1].Annotations) != 2 {
t.Errorf("manifest.Layers[1].Annotations was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations["hello"] != "world" {
t.Errorf("manifest.Layers[1].Annotations['hello'] was not set correctly!: %+v", mutator.manifest.Layers[1].Annotations)
}
if mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation] != fmt.Sprintf("%d", bufferSize) {
t.Errorf("manifest.Layers[1].Annotations['%s'] was not set correctly!: %q, expected %d",
UmociUncompressedBlobSizeAnnotation,
mutator.manifest.Layers[1].Annotations[UmociUncompressedBlobSizeAnnotation],
bufferSize)
}

if mutator.manifest.Layers[1].Digest != newLayerDesc.Digest {
t.Fatalf("unexpected digest for new layer: %v %v", mutator.manifest.Layers[1].Digest, newLayerDesc.Digest)
}
Expand Down
4 changes: 2 additions & 2 deletions test/create.bats
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ function teardown() {
# Verify that the hashes of the blobs and index match (blobs are
# content-addressable so using hashes is a bit silly, but whatever).
known_hashes=(
"6f3716b8ae2bb4b3fd0a851f5a553946ea351ed5fdda0bd708d325363c0487f7 $IMAGE/index.json"
"73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc $IMAGE/blobs/sha256/73ecb862d0ebee78acdf8553d34d1ae5c13df509030ac262e561d096bb480dfc"
"b780d08bfed4850ab807b7c308682fae6868ed9c09c0c842063c418ebe2a19fb $IMAGE/index.json"
"27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231 $IMAGE/blobs/sha256/27bcd3b4f31740cc087346382aaba3fe1c01872d75ead1bd2b9f7053d2bb3231"
"e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083 $IMAGE/blobs/sha256/e7013826daf8b5d68f82c5b790ca5e9de222a834f2cb3fe3532030161bd72083"
"f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89 $IMAGE/blobs/sha256/f4a39a97d97aa834da7ad2d92940f9636a57e3d9b3cc7c53242451b02a6cea89"
)
Expand Down