Skip to content

Commit

Permalink
wip volume snapshotter support
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Sipsma <[email protected]>
  • Loading branch information
sipsma committed Jul 31, 2024
1 parent ffadc9f commit 058da20
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 26 deletions.
16 changes: 14 additions & 2 deletions cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/bklog"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/progress"
Expand Down Expand Up @@ -49,6 +50,9 @@ type ManagerOpt struct {
Differ diff.Comparer
MetadataStore *metadata.Store
MountPoolRoot string

// dagger-specific, see manager_dagger.go
VolumeSnapshotter CtdVolumeSnapshotter
}

type Accessor interface {
Expand All @@ -62,6 +66,9 @@ type Accessor interface {
IdentityMapping() *idtools.IdentityMapping
Merge(ctx context.Context, parents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)
Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error)

// Dagger-specific, see manager_dagger.go
GetOrInitVolume(ctx context.Context, id string, sharingMode pb.CacheSharingOpt, parent ImmutableRef) (MutableRef, error)
}

type Controller interface {
Expand Down Expand Up @@ -97,6 +104,9 @@ type cacheManager struct {

muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group[struct{}]

// dagger-specific, see manager_dagger.go
volumeSnapshotter VolumeSnapshotter
}

func NewManager(opt ManagerOpt) (Manager, error) {
Expand All @@ -110,6 +120,8 @@ func NewManager(opt ManagerOpt) (Manager, error) {
Differ: opt.Differ,
MetadataStore: opt.MetadataStore,
records: make(map[string]*cacheRecord),

volumeSnapshotter: newVolumeSnapshotter(context.TODO(), opt.VolumeSnapshotter, opt.LeaseManager),
}

if err := cm.init(context.TODO()); err != nil {
Expand Down Expand Up @@ -444,7 +456,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt
return rec, nil
} else if IsNotFound(err) {
// The equal mutable for this ref is not found, check to see if our snapshot exists
if _, statErr := cm.Snapshotter.Stat(ctx, md.getSnapshotID()); statErr != nil {
if _, statErr := cm.snapshotterFor(md).Stat(ctx, md.getSnapshotID()); statErr != nil {
// this ref's snapshot also doesn't exist, just remove this record
cm.MetadataStore.Clear(id)
return nil, errors.Wrap(errNotFound, id)
Expand Down Expand Up @@ -484,7 +496,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt

if rec.mutable {
// If the record is mutable, then the snapshot must exist
if _, err := cm.Snapshotter.Stat(ctx, rec.ID()); err != nil {
if _, err := cm.snapshotterFor(md).Stat(ctx, rec.ID()); err != nil {
if !cerrdefs.IsNotFound(err) {
return nil, errors.Wrap(err, "failed to check mutable ref snapshot")
}
Expand Down
167 changes: 167 additions & 0 deletions cache/manager_dagger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package cache

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/containerd/containerd/leases"
ctdsnapshots "github.com/containerd/containerd/snapshots"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/bklog"
)

type AcquireSnapshotter interface {
Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error)
}

type CtdVolumeSnapshotter interface {
ctdsnapshots.Snapshotter
Name() string
AcquireSnapshotter
}

type VolumeSnapshotter interface {
snapshot.MergeSnapshotter
AcquireSnapshotter
}

func newVolumeSnapshotter(ctx context.Context, ctdSnapshoter CtdVolumeSnapshotter, leaseManager leases.Manager) VolumeSnapshotter {
return volumeSnapshotterAdapter{
MergeSnapshotter: snapshot.NewMergeSnapshotter(ctx, containerd.NewSnapshotter(
ctdSnapshoter.Name(),
ctdSnapshoter,
"buildkit",
nil, // no idmapping
), leaseManager),
base: ctdSnapshoter,
}

}

type volumeSnapshotterAdapter struct {
snapshot.MergeSnapshotter
base CtdVolumeSnapshotter
}

var _ VolumeSnapshotter = (*volumeSnapshotterAdapter)(nil)

func (sn volumeSnapshotterAdapter) Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error) {
return sn.base.Acquire(ctx, key, sharingMode)
}

func (cm *cacheManager) GetOrInitVolume(
ctx context.Context,
id string,
sharingMode pb.CacheSharingOpt,
parent ImmutableRef,
) (_ MutableRef, rerr error) {
// TODO: support parent ref
if parent != nil {
return nil, fmt.Errorf("parent ref is not supported")
}
parentID := ""

rec, err := func() (*cacheRecord, error) {
cm.mu.Lock()
defer cm.mu.Unlock()

rec, err := cm.getRecord(ctx, id)
switch {
case err == nil:
return rec, nil

case errors.Is(err, errNotFound):
// TODO: more optimal to Lease+Prepare outside of manager lock, could put in rec lock?

l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = id
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to create lease: %w", err)
}
defer func() {
if err != nil {
ctx := context.WithoutCancel(ctx)
if err := cm.LeaseManager.Delete(ctx, leases.Lease{
ID: l.ID,
}); err != nil {
bklog.G(ctx).Errorf("failed to remove lease: %+v", err)
}
}
}()

if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: id,
Type: "snapshots/" + cm.volumeSnapshotter.Name(),
}); err != nil && !cerrdefs.IsAlreadyExists(err) {
return nil, fmt.Errorf("failed to add snapshot %s resource to lease: %w", id, err)
}

if err := cm.volumeSnapshotter.Prepare(ctx, id, parentID); err != nil {
return nil, fmt.Errorf("failed to prepare volume: %w", err)
}

md, _ := cm.getMetadata(id)

rec = &cacheRecord{
mu: &sync.Mutex{},
mutable: true,
cm: cm,
refs: make(map[ref]struct{}),
cacheMetadata: md,
}

opts := []RefOption{
WithRecordType(client.UsageRecordTypeCacheMount),
WithDescription(fmt.Sprintf("cache mount %s", id)), // TODO: support human readable name?
CachePolicyRetain,
withSnapshotID(id),
}
if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil {
return nil, err
}

cm.records[id] = rec
return rec, nil

default:
return nil, fmt.Errorf("failed to get volume cache record: %w", err)
}
}()

releaseFunc, err := cm.volumeSnapshotter.Acquire(ctx, id, sharingMode)
if err != nil {
return nil, fmt.Errorf("failed to acquire volume: %w", err)
}
defer func() {
if rerr != nil {
rerr = errors.Join(rerr, releaseFunc())
}
}()

rec.mu.Lock()
defer rec.mu.Unlock()

// TODO: note about how we are creating multiple mutable refs on a cacheRecord but it is safe to do so it turns out
ref := rec.mref(true, DescHandlers{})
ref.releaseFunc = releaseFunc
return ref, nil
}

func (cm *cacheManager) snapshotterFor(md *cacheMetadata) snapshot.MergeSnapshotter {
if md.GetRecordType() == client.UsageRecordTypeCacheMount {
return cm.volumeSnapshotter
}
return cm.Snapshotter
}
Loading

0 comments on commit 058da20

Please sign in to comment.