From 058da208b3e215dbcf390f3598fc391c5f201c7f Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Wed, 31 Jul 2024 14:44:17 -0700 Subject: [PATCH] wip volume snapshotter support Signed-off-by: Erik Sipsma --- cache/manager.go | 16 ++- cache/manager_dagger.go | 167 +++++++++++++++++++++++++++++++ cache/refs.go | 53 +++++----- solver/llbsolver/mounts/mount.go | 2 +- worker/base/worker.go | 6 ++ 5 files changed, 218 insertions(+), 26 deletions(-) create mode 100644 cache/manager_dagger.go diff --git a/cache/manager.go b/cache/manager.go index 1c35de8a04d4..d2823d2bc040 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -21,6 +21,7 @@ import ( "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" + "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/flightcontrol" "github.com/moby/buildkit/util/progress" @@ -49,6 +50,9 @@ type ManagerOpt struct { Differ diff.Comparer MetadataStore *metadata.Store MountPoolRoot string + + // dagger-specific, see manager_dagger.go + VolumeSnapshotter CtdVolumeSnapshotter } type Accessor interface { @@ -62,6 +66,9 @@ type Accessor interface { IdentityMapping() *idtools.IdentityMapping Merge(ctx context.Context, parents []ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error) Diff(ctx context.Context, lower, upper ImmutableRef, pg progress.Controller, opts ...RefOption) (ImmutableRef, error) + + // Dagger-specific, see manager_dagger.go + GetOrInitVolume(ctx context.Context, id string, sharingMode pb.CacheSharingOpt, parent ImmutableRef) (MutableRef, error) } type Controller interface { @@ -97,6 +104,9 @@ type cacheManager struct { muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results unlazyG flightcontrol.Group[struct{}] + + // dagger-specific, see manager_dagger.go + volumeSnapshotter VolumeSnapshotter } func NewManager(opt ManagerOpt) (Manager, error) { @@ -110,6 +120,8 @@ func NewManager(opt ManagerOpt) (Manager, error) { Differ: opt.Differ, MetadataStore: opt.MetadataStore, records: make(map[string]*cacheRecord), + + volumeSnapshotter: newVolumeSnapshotter(context.TODO(), opt.VolumeSnapshotter, opt.LeaseManager), } if err := cm.init(context.TODO()); err != nil { @@ -444,7 +456,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt return rec, nil } else if IsNotFound(err) { // The equal mutable for this ref is not found, check to see if our snapshot exists - if _, statErr := cm.Snapshotter.Stat(ctx, md.getSnapshotID()); statErr != nil { + if _, statErr := cm.snapshotterFor(md).Stat(ctx, md.getSnapshotID()); statErr != nil { // this ref's snapshot also doesn't exist, just remove this record cm.MetadataStore.Clear(id) return nil, errors.Wrap(errNotFound, id) @@ -484,7 +496,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt if rec.mutable { // If the record is mutable, then the snapshot must exist - if _, err := cm.Snapshotter.Stat(ctx, rec.ID()); err != nil { + if _, err := cm.snapshotterFor(md).Stat(ctx, rec.ID()); err != nil { if !cerrdefs.IsNotFound(err) { return nil, errors.Wrap(err, "failed to check mutable ref snapshot") } diff --git a/cache/manager_dagger.go b/cache/manager_dagger.go new file mode 100644 index 000000000000..48236c160a06 --- /dev/null +++ b/cache/manager_dagger.go @@ -0,0 +1,167 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/containerd/containerd/leases" + ctdsnapshots "github.com/containerd/containerd/snapshots" + cerrdefs "github.com/containerd/errdefs" + "github.com/moby/buildkit/client" + "github.com/moby/buildkit/snapshot" + "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/solver/pb" + "github.com/moby/buildkit/util/bklog" +) + +type AcquireSnapshotter interface { + Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error) +} + +type CtdVolumeSnapshotter interface { + ctdsnapshots.Snapshotter + Name() string + AcquireSnapshotter +} + +type VolumeSnapshotter interface { + snapshot.MergeSnapshotter + AcquireSnapshotter +} + +func newVolumeSnapshotter(ctx context.Context, ctdSnapshoter CtdVolumeSnapshotter, leaseManager leases.Manager) VolumeSnapshotter { + return volumeSnapshotterAdapter{ + MergeSnapshotter: snapshot.NewMergeSnapshotter(ctx, containerd.NewSnapshotter( + ctdSnapshoter.Name(), + ctdSnapshoter, + "buildkit", + nil, // no idmapping + ), leaseManager), + base: ctdSnapshoter, + } + +} + +type volumeSnapshotterAdapter struct { + snapshot.MergeSnapshotter + base CtdVolumeSnapshotter +} + +var _ VolumeSnapshotter = (*volumeSnapshotterAdapter)(nil) + +func (sn volumeSnapshotterAdapter) Acquire(ctx context.Context, key string, sharingMode pb.CacheSharingOpt) (func() error, error) { + return sn.base.Acquire(ctx, key, sharingMode) +} + +func (cm *cacheManager) GetOrInitVolume( + ctx context.Context, + id string, + sharingMode pb.CacheSharingOpt, + parent ImmutableRef, +) (_ MutableRef, rerr error) { + // TODO: support parent ref + if parent != nil { + return nil, fmt.Errorf("parent ref is not supported") + } + parentID := "" + + rec, err := func() (*cacheRecord, error) { + cm.mu.Lock() + defer cm.mu.Unlock() + + rec, err := cm.getRecord(ctx, id) + switch { + case err == nil: + return rec, nil + + case errors.Is(err, errNotFound): + // TODO: more optimal to Lease+Prepare outside of manager lock, could put in rec lock? + + l, err := cm.LeaseManager.Create(ctx, func(l *leases.Lease) error { + l.ID = id + l.Labels = map[string]string{ + "containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano), + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("failed to create lease: %w", err) + } + defer func() { + if err != nil { + ctx := context.WithoutCancel(ctx) + if err := cm.LeaseManager.Delete(ctx, leases.Lease{ + ID: l.ID, + }); err != nil { + bklog.G(ctx).Errorf("failed to remove lease: %+v", err) + } + } + }() + + if err := cm.LeaseManager.AddResource(ctx, l, leases.Resource{ + ID: id, + Type: "snapshots/" + cm.volumeSnapshotter.Name(), + }); err != nil && !cerrdefs.IsAlreadyExists(err) { + return nil, fmt.Errorf("failed to add snapshot %s resource to lease: %w", id, err) + } + + if err := cm.volumeSnapshotter.Prepare(ctx, id, parentID); err != nil { + return nil, fmt.Errorf("failed to prepare volume: %w", err) + } + + md, _ := cm.getMetadata(id) + + rec = &cacheRecord{ + mu: &sync.Mutex{}, + mutable: true, + cm: cm, + refs: make(map[ref]struct{}), + cacheMetadata: md, + } + + opts := []RefOption{ + WithRecordType(client.UsageRecordTypeCacheMount), + WithDescription(fmt.Sprintf("cache mount %s", id)), // TODO: support human readable name? + CachePolicyRetain, + withSnapshotID(id), + } + if err := initializeMetadata(rec.cacheMetadata, rec.parentRefs, opts...); err != nil { + return nil, err + } + + cm.records[id] = rec + return rec, nil + + default: + return nil, fmt.Errorf("failed to get volume cache record: %w", err) + } + }() + + releaseFunc, err := cm.volumeSnapshotter.Acquire(ctx, id, sharingMode) + if err != nil { + return nil, fmt.Errorf("failed to acquire volume: %w", err) + } + defer func() { + if rerr != nil { + rerr = errors.Join(rerr, releaseFunc()) + } + }() + + rec.mu.Lock() + defer rec.mu.Unlock() + + // TODO: note about how we are creating multiple mutable refs on a cacheRecord but it is safe to do so it turns out + ref := rec.mref(true, DescHandlers{}) + ref.releaseFunc = releaseFunc + return ref, nil +} + +func (cm *cacheManager) snapshotterFor(md *cacheMetadata) snapshot.MergeSnapshotter { + if md.GetRecordType() == client.UsageRecordTypeCacheMount { + return cm.volumeSnapshotter + } + return cm.Snapshotter +} diff --git a/cache/refs.go b/cache/refs.go index 08a38818a795..a174170c0adf 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -300,7 +300,7 @@ func (cr *cacheRecord) isLazy(ctx context.Context) (bool, error) { } // If the snapshot is a remote snapshot, this layer is lazy. - if info, err := cr.cm.Snapshotter.Stat(ctx, cr.getSnapshotID()); err == nil { + if info, err := cr.cm.snapshotterFor(cr.cacheMetadata).Stat(ctx, cr.getSnapshotID()); err == nil { if _, ok := info.Labels["containerd.io/snapshot/remote"]; ok { return true, nil } @@ -342,7 +342,7 @@ func (cr *cacheRecord) size(ctx context.Context) (int64, error) { var usage snapshots.Usage if !cr.getBlobOnly() { var err error - usage, err = cr.cm.Snapshotter.Usage(ctx, driverID) + usage, err = cr.cm.snapshotterFor(cr.cacheMetadata).Usage(ctx, driverID) if err != nil { cr.mu.Lock() isDead := cr.isDead() @@ -412,14 +412,14 @@ func (cr *cacheRecord) mount(ctx context.Context) (_ snapshot.Mountable, rerr er }() if err := cr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.viewLeaseID()}, leases.Resource{ ID: mountSnapshotID, - Type: "snapshots/" + cr.cm.Snapshotter.Name(), + Type: "snapshots/" + cr.cm.snapshotterFor(cr.cacheMetadata).Name(), }); err != nil && !cerrdefs.IsAlreadyExists(err) { return nil, err } // Return the mount direct from View rather than setting it using the Mounts call below. // The two are equivalent for containerd snapshotters but the moby snapshotter requires // the use of the mountable returned by View in this case. - mnts, err := cr.cm.Snapshotter.View(ctx, mountSnapshotID, cr.getSnapshotID()) + mnts, err := cr.cm.snapshotterFor(cr.cacheMetadata).View(ctx, mountSnapshotID, cr.getSnapshotID()) if err != nil && !cerrdefs.IsAlreadyExists(err) { return nil, err } @@ -429,8 +429,7 @@ func (cr *cacheRecord) mount(ctx context.Context) (_ snapshot.Mountable, rerr er if cr.mountCache != nil { return cr.mountCache, nil } - - mnts, err := cr.cm.Snapshotter.Mounts(ctx, mountSnapshotID) + mnts, err := cr.cm.snapshotterFor(cr.cacheMetadata).Mounts(ctx, mountSnapshotID) if err != nil { return nil, err } @@ -620,6 +619,9 @@ type mutableRef struct { *cacheRecord triggerLastUsed bool descHandlers DescHandlers + + // dagger-specific + releaseFunc func() error } // hold ref lock before calling @@ -972,7 +974,7 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Grou } var mnt snapshot.Mountable - if sr.cm.Snapshotter.Name() == "stargz" { + if sr.cm.snapshotterFor(sr.cacheMetadata).Name() == "stargz" { if err := sr.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { mnt, rerr = sr.mount(ctx) }); err != nil { @@ -1004,7 +1006,7 @@ func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr erro return nil } - if sr.cm.Snapshotter.Name() == "stargz" { + if sr.cm.snapshotterFor(sr.cacheMetadata).Name() == "stargz" { if err := sr.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { if rerr = sr.prepareRemoteSnapshotsStargzMode(ctx, s); rerr != nil { return @@ -1023,7 +1025,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, dhs := sr.descHandlers for _, r := range sr.layerChain() { r := r - info, err := r.cm.Snapshotter.Stat(ctx, r.getSnapshotID()) + info, err := r.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, r.getSnapshotID()) if err != nil && !cerrdefs.IsNotFound(err) { return err } else if cerrdefs.IsNotFound(err) { @@ -1040,7 +1042,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, // For avoiding collosion among calls, keys of these tmp labels contain an unique ID. flds, labels := makeTmpLabelsStargzMode(snapshots.FilterInheritedLabels(dh.SnapshotLabels), s) info.Labels = labels - if _, err := r.cm.Snapshotter.Update(ctx, info, flds...); err != nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Update(ctx, info, flds...); err != nil { return errors.Wrapf(err, "failed to add tmp remote labels for remote snapshot") } defer func() { @@ -1048,7 +1050,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, for k := range info.Labels { info.Labels[k] = "" // Remove labels appended in this call } - if _, err := r.cm.Snapshotter.Update(ctx, info, flds...); err != nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Update(ctx, info, flds...); err != nil { bklog.G(ctx).Warn(errors.Wrapf(err, "failed to remove tmp remote labels")) } }() @@ -1067,7 +1069,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s for _, r := range sr.layerChain() { r := r snapshotID := r.getSnapshotID() - if _, err := r.cm.Snapshotter.Stat(ctx, snapshotID); err == nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, snapshotID); err == nil { continue } @@ -1099,11 +1101,11 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s if r.layerParent != nil { parentID = r.layerParent.getSnapshotID() } - if err := r.cm.Snapshotter.Prepare(ctx, key, parentID, opts...); err != nil { + if err := r.cm.snapshotterFor(sr.cacheMetadata).Prepare(ctx, key, parentID, opts...); err != nil { if cerrdefs.IsAlreadyExists(err) { // Check if the targeting snapshot ID has been prepared as // a remote snapshot in the snapshotter. - info, err := r.cm.Snapshotter.Stat(ctx, snapshotID) + info, err := r.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, snapshotID) if err == nil { // usable as remote snapshot without unlazying. defer func() { ctx := context.WithoutCancel(ctx) @@ -1120,7 +1122,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s WithField("name", info.Name). Debug("snapshots exist but labels are nil") } - if _, err := r.cm.Snapshotter.Update(ctx, info, tmpFields...); err != nil { + if _, err := r.cm.snapshotterFor(sr.cacheMetadata).Update(ctx, info, tmpFields...); err != nil { bklog.G(ctx).Warn(errors.Wrapf(err, "failed to remove tmp remote labels after prepare")) } @@ -1160,7 +1162,7 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields func (sr *immutableRef) unlazy(ctx context.Context, dhs DescHandlers, pg progress.Controller, s session.Group, topLevel bool, ensureContentStore bool) error { _, err := g.Do(ctx, sr.ID()+"-unlazy", func(ctx context.Context) (_ *leaseutil.LeaseRef, rerr error) { - if _, err := sr.cm.Snapshotter.Stat(ctx, sr.getSnapshotID()); err == nil { + if _, err := sr.cm.snapshotterFor(sr.cacheMetadata).Stat(ctx, sr.getSnapshotID()); err == nil { if !ensureContentStore { return nil, nil } @@ -1239,7 +1241,7 @@ func (sr *immutableRef) unlazyDiffMerge(ctx context.Context, dhs DescHandlers, p defer statusDone() } - return sr.cm.Snapshotter.Merge(ctx, sr.getSnapshotID(), diffs) + return sr.cm.snapshotterFor(sr.cacheMetadata).Merge(ctx, sr.getSnapshotID(), diffs) } // should be called within sizeG.Do call for this ref's ID @@ -1310,12 +1312,12 @@ func (sr *immutableRef) unlazyLayer(ctx context.Context, dhs DescHandlers, pg pr key := fmt.Sprintf("extract-%s %s", identity.NewID(), sr.getChainID()) - err = sr.cm.Snapshotter.Prepare(ctx, key, parentID) + err = sr.cm.snapshotterFor(sr.cacheMetadata).Prepare(ctx, key, parentID) if err != nil { return err } - mountable, err := sr.cm.Snapshotter.Mounts(ctx, key) + mountable, err := sr.cm.snapshotterFor(sr.cacheMetadata).Mounts(ctx, key) if err != nil { return err } @@ -1332,7 +1334,7 @@ func (sr *immutableRef) unlazyLayer(ctx context.Context, dhs DescHandlers, pg pr if err := unmount(); err != nil { return err } - if err := sr.cm.Snapshotter.Commit(ctx, sr.getSnapshotID(), key); err != nil { + if err := sr.cm.snapshotterFor(sr.cacheMetadata).Commit(ctx, sr.getSnapshotID(), key); err != nil { if !errors.Is(err, cerrdefs.ErrAlreadyExists) { return err } @@ -1430,13 +1432,13 @@ func (cr *cacheRecord) finalize(ctx context.Context) error { if err := cr.cm.LeaseManager.AddResource(ctx, leases.Lease{ID: cr.ID()}, leases.Resource{ ID: cr.getSnapshotID(), - Type: "snapshots/" + cr.cm.Snapshotter.Name(), + Type: "snapshots/" + cr.cm.snapshotterFor(cr.cacheMetadata).Name(), }); err != nil { cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) return errors.Wrapf(err, "failed to add snapshot %s to lease", cr.getSnapshotID()) } - if err := cr.cm.Snapshotter.Commit(ctx, cr.getSnapshotID(), mutable.getSnapshotID()); err != nil { + if err := cr.cm.snapshotterFor(cr.cacheMetadata).Commit(ctx, cr.getSnapshotID(), mutable.getSnapshotID()); err != nil { cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: cr.ID()}) return errors.Wrapf(err, "failed to commit %s to %s during finalize", mutable.getSnapshotID(), cr.getSnapshotID()) } @@ -1517,7 +1519,7 @@ func (sr *mutableRef) Mount(ctx context.Context, readonly bool, s session.Group) } var mnt snapshot.Mountable - if sr.cm.Snapshotter.Name() == "stargz" && sr.layerParent != nil { + if sr.cm.snapshotterFor(sr.cacheMetadata).Name() == "stargz" && sr.layerParent != nil { if err := sr.layerParent.withRemoteSnapshotLabelsStargzMode(ctx, s, func() { mnt, rerr = sr.mount(ctx) }); err != nil { @@ -1589,6 +1591,11 @@ func (sr *mutableRef) release(ctx context.Context) (rerr error) { sr.updateLastUsed() sr.triggerLastUsed = false } + + if sr.releaseFunc != nil { + return sr.releaseFunc() + } + return nil } diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index 90acbd44bc12..f7ca460f95e7 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -381,7 +381,7 @@ func (mm *MountManager) MountableCache(ctx context.Context, m *pb.Mount, ref cac if m.CacheOpt == nil { return nil, errors.Errorf("missing cache mount options") } - return mm.getRefCacheDir(ctx, ref, m.CacheOpt.ID, m, m.CacheOpt.Sharing, g) + return mm.cm.GetOrInitVolume(ctx, m.CacheOpt.ID, m.CacheOpt.Sharing, ref) } func (mm *MountManager) MountableTmpFS(m *pb.Mount) cache.Mountable { diff --git a/worker/base/worker.go b/worker/base/worker.go index 8121c74c6530..95edb06fc021 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -81,6 +81,9 @@ type WorkerOpt struct { MetadataStore *metadata.Store MountPoolRoot string ResourceMonitor *resources.Monitor + + // dagger-specific + VolumeSnapshotter cache.CtdVolumeSnapshotter } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -111,6 +114,9 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { Differ: opt.Differ, MetadataStore: opt.MetadataStore, MountPoolRoot: opt.MountPoolRoot, + + // dagger-specific + VolumeSnapshotter: opt.VolumeSnapshotter, }) if err != nil { return nil, err