Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel context for mirror push command #53

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/mirror/cmd/modules/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pull

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -160,6 +161,7 @@ func pullExternalModulesToLocalFS(

pullCtx := &contexts.PullContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: logger,
Insecure: insecure,
SkipTLSVerification: skipVerifyTLS,
Expand Down
3 changes: 3 additions & 0 deletions internal/mirror/cmd/modules/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package push

import (
"context"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -139,6 +140,7 @@ func pushModulesToRegistry(
}

if err = layouts.PushLayoutToRepo(
context.TODO(),
moduleLayout,
moduleRegistryPath,
authProvider,
Expand All @@ -152,6 +154,7 @@ func pushModulesToRegistry(

logger.InfoF("Pushing releases for module %s", moduleName)
if err = layouts.PushLayoutToRepo(
context.TODO(),
moduleReleasesLayout,
moduleReleasesRegistryPath,
authProvider,
Expand Down
2 changes: 2 additions & 0 deletions internal/mirror/cmd/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pull

import (
"bufio"
"context"
"crypto/md5"
"fmt"
"io"
Expand Down Expand Up @@ -122,6 +123,7 @@ func buildPullContext() *contexts.PullContext {

mirrorCtx := &contexts.PullContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: logger,
Insecure: Insecure,
SkipTLSVerification: TLSSkipVerify,
Expand Down
2 changes: 2 additions & 0 deletions internal/mirror/cmd/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package push

import (
"context"
"fmt"
"log/slog"
"os"
Expand Down Expand Up @@ -146,6 +147,7 @@ func buildPushContext() *contexts.PushContext {

mirrorCtx := &contexts.PushContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: logger,
Insecure: Insecure,
SkipTLSVerification: TLSSkipVerify,
Expand Down
2 changes: 2 additions & 0 deletions internal/mirror/cmd/vulndb/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pull

import (
"context"
"fmt"
"log/slog"
"path/filepath"
Expand Down Expand Up @@ -85,6 +86,7 @@ func pull(_ *cobra.Command, _ []string) error {

pullContext := &contexts.PullContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: logger,
RegistryAuth: getSourceRegistryAuthProvider(),
DeckhouseRegistryRepo: SourceRegistryRepo,
Expand Down
3 changes: 3 additions & 0 deletions internal/mirror/cmd/vulndb/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package push

import (
"context"
"fmt"
"log/slog"
"path"
Expand Down Expand Up @@ -81,6 +82,7 @@ func push(_ *cobra.Command, _ []string) error {

pushContext := &contexts.PushContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: logger,
RegistryAuth: getRegistryAuthProvider(),
RegistryHost: RegistryHost,
Expand All @@ -107,6 +109,7 @@ func push(_ *cobra.Command, _ []string) error {
}

err = layouts.PushLayoutToRepo(
pushContext.Ctx,
ociLayout,
repo,
pushContext.RegistryAuth,
Expand Down
5 changes: 5 additions & 0 deletions pkg/libmirror/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package bundle

import (
"context"
"crypto/rand"
"io"
"io/fs"
Expand Down Expand Up @@ -50,6 +51,7 @@ func TestBundlePackingAndUnpacking(t *testing.T) {

err = Pack(&contexts.PullContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
BundlePath: tarBundlePath,
UnpackedImagesPath: packFromDir,
},
Expand All @@ -58,6 +60,7 @@ func TestBundlePackingAndUnpacking(t *testing.T) {
require.FileExists(t, tarBundlePath)

err = Unpack(&contexts.BaseContext{
Ctx: context.TODO(),
BundlePath: tarBundlePath,
UnpackedImagesPath: unpackToDir,
})
Expand Down Expand Up @@ -87,6 +90,7 @@ func TestChunkedBundlePackingAndUnpacking(t *testing.T) {

err = Pack(&contexts.PullContext{
BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
BundlePath: bundlePath,
UnpackedImagesPath: packFromDir,
},
Expand All @@ -105,6 +109,7 @@ func TestChunkedBundlePackingAndUnpacking(t *testing.T) {
}

err = Unpack(&contexts.BaseContext{
Ctx: context.TODO(),
BundlePath: bundlePath,
UnpackedImagesPath: unpackToDir,
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/libmirror/contexts/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package contexts

import (
"context"

"github.com/google/go-containerregistry/pkg/authn"
)

Expand All @@ -35,6 +37,8 @@ type Logger interface {

// BaseContext hold data related to pending registry mirroring operation.
type BaseContext struct {
Ctx context.Context

// --registry-login + --registry-password (can be nil in this case) or --license depending on the operation requested
RegistryAuth authn.Authenticator
RegistryHost string // --registry (FQDN with port, if one is provided)
Expand Down
3 changes: 2 additions & 1 deletion pkg/libmirror/images/digests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package images

import (
"context"
"encoding/json"
"os"
"path/filepath"
Expand All @@ -43,7 +44,7 @@ func TestExtractImageDigestsFromDeckhouseInstaller(t *testing.T) {

installersLayout := createOCILayoutWithInstallerImage(t, "localhost:5001/deckhouse", installerTag, expectedImages)
images, err := ExtractImageDigestsFromDeckhouseInstaller(
&contexts.PullContext{BaseContext: contexts.BaseContext{DeckhouseRegistryRepo: "localhost:5001/deckhouse"}},
&contexts.PullContext{BaseContext: contexts.BaseContext{Ctx: context.TODO(), DeckhouseRegistryRepo: "localhost:5001/deckhouse"}},
installerTag,
installersLayout,
)
Expand Down
4 changes: 3 additions & 1 deletion pkg/libmirror/layouts/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package layouts

import (
"context"
"fmt"
"path"
"strings"
Expand Down Expand Up @@ -182,9 +183,10 @@ func PullImageSet(
}

err = retry.RunTask(
pullCtx.Ctx,
pullCtx.Logger,
fmt.Sprintf("[%d / %d] Pulling %s ", pullCount, totalCount, imageReferenceString),
task.WithConstantRetries(5, 10*time.Second, func() error {
task.WithConstantRetries(5, 10*time.Second, func(ctx context.Context) error {
img, err := remote.Image(ref, remoteOpts...)
if err != nil {
if errorutil.IsImageNotFoundError(err) && pullOpts.allowMissingTags {
Expand Down
3 changes: 3 additions & 0 deletions pkg/libmirror/layouts/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package layouts

import (
"context"
"log/slog"
"net/http/httptest"
"strings"
Expand Down Expand Up @@ -71,6 +72,7 @@ func TestPullTrivyVulnerabilityDatabaseImageSuccessSkipTLS(t *testing.T) {

err := PullTrivyVulnerabilityDatabasesImages(
&contexts.PullContext{BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: testLogger,
RegistryAuth: authn.Anonymous,
DeckhouseRegistryRepo: deckhouseRepo,
Expand Down Expand Up @@ -126,6 +128,7 @@ func TestPullTrivyVulnerabilityDatabaseImageSuccessInsecure(t *testing.T) {

err := PullTrivyVulnerabilityDatabasesImages(
&contexts.PullContext{BaseContext: contexts.BaseContext{
Ctx: context.TODO(),
Logger: testLogger,
RegistryAuth: authn.Anonymous,
DeckhouseRegistryRepo: deckhouseRepo,
Expand Down
39 changes: 26 additions & 13 deletions pkg/libmirror/layouts/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package layouts

import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/google/go-containerregistry/pkg/authn"
Expand All @@ -28,18 +28,19 @@ import (
"github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/samber/lo"
"github.com/samber/lo/parallel"

"github.com/deckhouse/deckhouse-cli/pkg/libmirror/contexts"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/auth"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/errorutil"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/parallel"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry/task"
)

var ErrEmptyLayout = errors.New("No images in layout")

func PushLayoutToRepo(
ctx context.Context,
imagesLayout layout.Path,
registryRepo string,
authProvider authn.Authenticator,
Expand Down Expand Up @@ -73,7 +74,11 @@ func PushLayoutToRepo(
tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
logger.InfoF("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef)
pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts)(manifestSet[0], 0)

ctx, ctxCancel := context.WithCancel(ctx)
if err := pushImage(ctx, ctxCancel, logger, registryRepo, index, imagesCount, refOpts, remoteOpts)(manifestSet[0], 0); err != nil {
return err
}
imagesCount += 1
continue
}
Expand All @@ -84,9 +89,8 @@ func PushLayoutToRepo(
logger.InfoF("- %s", registryRepo+":"+manifest.Annotations["io.deckhouse.image.short_tag"])
}

parallel.ForEach(manifestSet, pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts))

return nil
ctx, ctxCancel := context.WithCancel(ctx)
return parallel.ForEachWithErrors(manifestSet, pushImage(ctx, ctxCancel, logger, registryRepo, index, imagesCount, refOpts, remoteOpts))
})
if err != nil {
return fmt.Errorf("Push batch of images: %w", err)
Expand All @@ -99,43 +103,52 @@ func PushLayoutToRepo(
}

func pushImage(
ctx context.Context,
ctxCancel context.CancelFunc,
logger contexts.Logger,
registryRepo string,
index v1.ImageIndex,
imagesCount int,
refOpts []name.Option,
remoteOpts []remote.Option,
) func(v1.Descriptor, int) {
return func(manifest v1.Descriptor, _ int) {
) func(v1.Descriptor, int) error {
return func(manifest v1.Descriptor, _ int) error {
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
imageRef := registryRepo + ":" + tag
img, err := index.Image(manifest.Digest)
if err != nil {
logger.WarnF("Read image: %v", err)
os.Exit(1)
ctxCancel()
return err
}
ref, err := name.ParseReference(imageRef, refOpts...)
if err != nil {
logger.WarnF("Parse image reference: %v", err)
os.Exit(1)
ctxCancel()
return err
}

err = retry.RunTask(silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func() error {
err = retry.RunTask(ctx, silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func(ctx context.Context) error {
remoteOpts := remoteOpts
remoteOpts = append(remoteOpts, remote.WithContext(ctx))
if err = remote.Write(ref, img, remoteOpts...); err != nil {
if errorutil.IsTrivyMediaTypeNotAllowedError(err) {
logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning)
os.Exit(1)
ctxCancel()
return err
}
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
}
return nil
}))
if err != nil {
logger.WarnF("Push image: %v", err)
os.Exit(1)
ctxCancel()
return err
}

imagesCount += 1
return nil
}
}

Expand Down
Loading