From 4912eb3f62424f298012d5f07b28c877d9137721 Mon Sep 17 00:00:00 2001 From: Moji Date: Tue, 3 Sep 2024 15:40:29 +0200 Subject: [PATCH 1/3] chore: refactor logger to be structured (#555) * chore: refactor logger to be structured * fix: linter complains --- pkg/instance/build.go | 52 ++++++++++++++++++++++++++++---------- pkg/instance/execution.go | 27 ++++++++++---------- pkg/instance/monitoring.go | 20 ++++++++++++--- pkg/instance/network.go | 42 ++++++++++++++++++++++++------ pkg/instance/resources.go | 12 +++++++-- pkg/instance/security.go | 23 ++++++++++++----- pkg/instance/sidecars.go | 7 ++++- pkg/instance/state.go | 7 ++++- pkg/instance/storage.go | 47 ++++++++++++++++++++++++++-------- pkg/knuu/knuu.go | 8 +++--- pkg/log/logger.go | 11 ++++++-- pkg/minio/minio.go | 24 +++++++++++------- pkg/traefik/traefik.go | 6 ++--- 13 files changed, 207 insertions(+), 79 deletions(-) diff --git a/pkg/instance/build.go b/pkg/instance/build.go index 82d04f7..a15b1ba 100644 --- a/pkg/instance/build.go +++ b/pkg/instance/build.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "github.com/celestiaorg/knuu/pkg/builder" @@ -135,7 +136,10 @@ func (b *build) SetUser(user string) error { if err := b.builderFactory.SetUser(user); err != nil { return ErrSettingUser.WithParams(user, b.instance.name).Wrap(err) } - b.instance.Logger.Debugf("Set user '%s' for instance '%s'", user, b.instance.name) + b.instance.Logger.WithFields(logrus.Fields{ + "instance": b.instance.name, + "user": user, + }).Debugf("Set user for instance") return nil } @@ -148,7 +152,10 @@ func (b *build) Commit(ctx context.Context) error { if !b.builderFactory.Changed() { b.imageName = b.builderFactory.ImageNameFrom() - b.instance.Logger.Debugf("No need to build and push image for instance '%s'", b.instance.name) + b.instance.Logger.WithFields(logrus.Fields{ + "instance": b.instance.name, + "image": b.imageName, + }).Debugf("no need to build and push image for instance") b.instance.SetState(StateCommitted) return nil @@ -169,18 +176,31 @@ func (b *build) Commit(ctx context.Context) error { cachedImageName, exists := b.checkImageHashInCache(imageHash) if exists { b.imageName = cachedImageName - b.instance.Logger.Debugf("Using cached image for instance '%s'", b.instance.name) - } else { - b.instance.Logger.Debugf("Cannot use any cached image for instance '%s'", b.instance.name) - err = b.builderFactory.PushBuilderImage(ctx, imageName) - if err != nil { - return ErrPushingImage.WithParams(b.instance.name).Wrap(err) - } - b.updateImageCacheWithHash(imageHash, imageName) - b.imageName = imageName - b.instance.Logger.Debugf("Pushed new image for instance '%s'", b.instance.name) + + b.instance.Logger.WithFields(logrus.Fields{ + "instance": b.instance.name, + "image": b.imageName, + }).Debugf("using cached image for instance") + + b.instance.SetState(StateCommitted) + return nil } + b.instance.Logger.WithFields(logrus.Fields{ + "instance": b.instance.name, + }).Debugf("cannot use any cached image for instance") + err = b.builderFactory.PushBuilderImage(ctx, imageName) + if err != nil { + return ErrPushingImage.WithParams(b.instance.name).Wrap(err) + } + b.updateImageCacheWithHash(imageHash, imageName) + b.imageName = imageName + + b.instance.Logger.WithFields(logrus.Fields{ + "instance": b.instance.name, + "image": b.imageName, + }).Debugf("pushed new image for instance") + b.instance.SetState(StateCommitted) return nil } @@ -220,11 +240,15 @@ func (b *build) SetEnvironmentVariable(key, value string) error { if !b.instance.IsInState(StatePreparing, StateCommitted) { return ErrSettingEnvNotAllowed.WithParams(b.instance.state.String()) } - b.instance.Logger.Debugf("Setting environment variable '%s' in instance '%s'", key, b.instance.name) + b.instance.Logger.WithFields(logrus.Fields{ + "instance": b.instance.name, + "key": key, + // value is not logged to avoid leaking sensitive information + }).Debugf("Setting environment variable") + if b.instance.state == StatePreparing { return b.builderFactory.SetEnvVar(key, value) } - b.env[key] = value return nil } diff --git a/pkg/instance/execution.go b/pkg/instance/execution.go index 948c505..bd390ac 100644 --- a/pkg/instance/execution.go +++ b/pkg/instance/execution.go @@ -73,7 +73,10 @@ func (e *execution) StartWithCallback(ctx context.Context, callback func()) erro go func() { err := e.WaitInstanceIsRunning(ctx) if err != nil { - e.instance.Logger.Errorf("Error waiting for instance '%s' to be running: %s", e.instance.k8sName, err) + e.instance.Logger.WithFields(logrus.Fields{ + "instance": e.instance.k8sName, + "error": err, + }).Errorf("waiting for instance to be running") return } callback() @@ -117,10 +120,8 @@ func (e *execution) StartAsync(ctx context.Context) error { return ErrDeployingPodForInstance.WithParams(e.instance.k8sName).Wrap(err) } - e.instance.state = StateStarted + e.instance.SetState(StateStarted) e.instance.sidecars.setStateForSidecars(StateStarted) - e.instance.Logger.Debugf("Set state of instance '%s' to '%s'", e.instance.k8sName, e.instance.state.String()) - return nil } @@ -212,10 +213,9 @@ func (e *execution) Stop(ctx context.Context) error { if err := e.destroyPod(ctx); err != nil { return ErrDestroyingPod.WithParams(e.instance.k8sName).Wrap(err) } - e.instance.state = StateStopped - e.instance.sidecars.setStateForSidecars(StateStopped) - e.instance.Logger.Debugf("Set state of instance '%s' to '%s'", e.instance.k8sName, e.instance.state.String()) + e.instance.SetState(StateStopped) + e.instance.sidecars.setStateForSidecars(StateStopped) return nil } @@ -252,17 +252,18 @@ func (e *execution) Destroy(ctx context.Context) error { err := e.instance.sidecars.applyFunctionToSidecars( func(sidecar SidecarManager) error { - e.instance.Logger.Debugf("Destroying sidecar resources from '%s'", sidecar.Instance().k8sName) + e.instance.Logger.WithFields(logrus.Fields{ + "instance": e.instance.k8sName, + "sidecar": sidecar.Instance().k8sName, + }).Debugf("destroying sidecar resources") return sidecar.Instance().resources.destroyResources(ctx) }) if err != nil { return ErrDestroyingResourcesForSidecars.WithParams(e.instance.k8sName).Wrap(err) } - e.instance.state = StateDestroyed + e.instance.SetState(StateDestroyed) e.instance.sidecars.setStateForSidecars(StateDestroyed) - e.instance.Logger.Debugf("Set state of instance '%s' to '%s'", e.instance.k8sName, e.instance.state.String()) - return nil } @@ -344,9 +345,7 @@ func (e *execution) deployPod(ctx context.Context) error { e.instance.kubernetesReplicaSet = replicaSet // Log the deployment of the pod - e.instance.Logger.Debugf("Started statefulSet '%s'", e.instance.k8sName) - e.instance.Logger.Debugf("Set state of instance '%s' to '%s'", e.instance.k8sName, e.instance.state.String()) - + e.instance.Logger.WithField("instance", e.instance.k8sName).Debugf("started statefulSet") return nil } diff --git a/pkg/instance/monitoring.go b/pkg/instance/monitoring.go index 14eb68c..08dc9be 100644 --- a/pkg/instance/monitoring.go +++ b/pkg/instance/monitoring.go @@ -1,6 +1,9 @@ package instance -import v1 "k8s.io/api/core/v1" +import ( + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" +) type monitoring struct { instance *Instance @@ -22,7 +25,10 @@ func (m *monitoring) SetLivenessProbe(livenessProbe *v1.Probe) error { return err } m.livenessProbe = livenessProbe - m.instance.Logger.Debugf("Set liveness probe to '%s' in instance '%s'", livenessProbe, m.instance.name) + m.instance.Logger.WithFields(logrus.Fields{ + "instance": m.instance.name, + "liveness_probe": livenessProbe, + }).Debug("set liveness probe") return nil } @@ -35,7 +41,10 @@ func (m *monitoring) SetReadinessProbe(readinessProbe *v1.Probe) error { return err } m.readinessProbe = readinessProbe - m.instance.Logger.Debugf("Set readiness probe to '%s' in instance '%s'", readinessProbe, m.instance.name) + m.instance.Logger.WithFields(logrus.Fields{ + "instance": m.instance.name, + "readiness_probe": readinessProbe, + }).Debug("set readiness probe") return nil } @@ -48,7 +57,10 @@ func (m *monitoring) SetStartupProbe(startupProbe *v1.Probe) error { return err } m.startupProbe = startupProbe - m.instance.Logger.Debugf("Set startup probe to '%s' in instance '%s'", startupProbe, m.instance.name) + m.instance.Logger.WithFields(logrus.Fields{ + "instance": m.instance.name, + "startup_probe": startupProbe, + }).Debug("set startup probe") return nil } diff --git a/pkg/instance/network.go b/pkg/instance/network.go index 91caee1..ddbca44 100644 --- a/pkg/instance/network.go +++ b/pkg/instance/network.go @@ -5,6 +5,7 @@ import ( "net" "time" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" ) @@ -34,7 +35,10 @@ func (n *network) AddPortTCP(port int) error { } n.portsTCP = append(n.portsTCP, port) - n.instance.Logger.Debugf("Added TCP port '%d' to instance '%s'", port, n.instance.name) + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "port": port, + }).Debug("added tcp port to instance") return nil } @@ -79,7 +83,14 @@ func (n *network) PortForwardTCP(ctx context.Context, port int) (int, error) { if attempt == maxRetries { return -1, ErrForwardingPort.WithParams(maxRetries) } - n.instance.Logger.Debugf("Forwarding port %d failed, cause: %v, retrying after %v (retry %d/%d)", port, err, retryInterval, attempt, maxRetries) + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "port": port, + "error": err, + "attempt": attempt, + "max": maxRetries, + "retry_interval": retryInterval.String(), + }).Debug("forwarding port failed, retrying") } return localPort, nil } @@ -99,7 +110,10 @@ func (n *network) AddPortUDP(port int) error { } n.portsUDP = append(n.portsUDP, port) - n.instance.Logger.Debugf("Added UDP port '%d' to instance '%s'", port, n.instance.k8sName) + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "port": port, + }).Debug("added udp port to instance") return nil } @@ -152,7 +166,10 @@ func (n *network) deployService(ctx context.Context, portsTCP, portsUDP []int) e return ErrDeployingService.WithParams(n.instance.k8sName).Wrap(err) } n.kubernetesService = srv - n.instance.Logger.Debugf("Started service '%s'", n.instance.k8sName) + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "service": serviceName, + }).Debug("started service") return nil } @@ -174,7 +191,10 @@ func (n *network) patchService(ctx context.Context, portsTCP, portsUDP []int) er return ErrPatchingService.WithParams(serviceName).Wrap(err) } n.kubernetesService = srv - n.instance.Logger.Debugf("Patched service '%s'", serviceName) + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "service": serviceName, + }).Debug("patched service") return nil } @@ -257,7 +277,7 @@ func (n *network) deployOrPatchService(ctx context.Context, portsTCP, portsUDP [ return nil } - n.instance.Logger.Debugf("Ports not empty, deploying service for instance '%s'", n.instance.k8sName) + n.instance.Logger.WithField("instance", n.instance.name).Debug("ports not empty, deploying service") svc, _ := n.instance.K8sClient.GetService(ctx, n.instance.k8sName) if svc == nil { if err := n.deployService(ctx, portsTCP, portsUDP); err != nil { @@ -275,7 +295,10 @@ func (n *network) deployOrPatchService(ctx context.Context, portsTCP, portsUDP [ func (n *network) enableIfDisabled(ctx context.Context) error { disableNetwork, err := n.IsDisabled(ctx) if err != nil { - n.instance.Logger.Errorf("error checking network status for instance") + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "error": err, + }).Error("error checking network status for instance") return ErrCheckingNetworkStatusForInstance.WithParams(n.instance.k8sName).Wrap(err) } @@ -283,7 +306,10 @@ func (n *network) enableIfDisabled(ctx context.Context) error { return nil } if err := n.Enable(ctx); err != nil { - n.instance.Logger.Errorf("error enabling network for instance") + n.instance.Logger.WithFields(logrus.Fields{ + "instance": n.instance.name, + "error": err, + }).Error("error enabling network for instance") return ErrEnablingNetworkForInstance.WithParams(n.instance.k8sName).Wrap(err) } return nil diff --git a/pkg/instance/resources.go b/pkg/instance/resources.go index 1c54aa4..4b633f7 100644 --- a/pkg/instance/resources.go +++ b/pkg/instance/resources.go @@ -3,6 +3,7 @@ package instance import ( "context" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -26,7 +27,11 @@ func (r *resources) SetMemory(request, limit resource.Quantity) error { } r.memoryRequest = request r.memoryLimit = limit - r.instance.Logger.Debugf("Set memory to '%s' and limit to '%s' in instance '%s'", request.String(), limit.String(), r.instance.name) + r.instance.Logger.WithFields(logrus.Fields{ + "instance": r.instance.name, + "memory_request": request.String(), + "memory_limit": limit.String(), + }).Debug("set memory for instance") return nil } @@ -37,7 +42,10 @@ func (r *resources) SetCPU(request resource.Quantity) error { return ErrSettingCPUNotAllowed.WithParams(r.instance.state.String()) } r.cpuRequest = request - r.instance.Logger.Debugf("Set cpu to '%s' in instance '%s'", request.String(), r.instance.name) + r.instance.Logger.WithFields(logrus.Fields{ + "instance": r.instance.name, + "cpu_request": request.String(), + }).Debug("set cpu for instance") return nil } diff --git a/pkg/instance/security.go b/pkg/instance/security.go index 8ff9857..f2d954e 100644 --- a/pkg/instance/security.go +++ b/pkg/instance/security.go @@ -1,6 +1,9 @@ package instance import ( + "strings" + + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" ) @@ -40,7 +43,10 @@ func (s *security) SetPrivileged(privileged bool) error { return ErrSettingPrivilegedNotAllowed.WithParams(s.instance.state.String()) } s.privileged = privileged - s.instance.Logger.Debugf("Set privileged to '%t' for instance '%s'", privileged, s.instance.name) + s.instance.Logger.WithFields(logrus.Fields{ + "instance": s.instance.name, + "privileged": privileged, + }).Debug("set privileged for instance") return nil } @@ -51,7 +57,10 @@ func (s *security) AddKubernetesCapability(capability string) error { return ErrAddingCapabilityNotAllowed.WithParams(s.instance.state.String()) } s.capabilitiesAdd = append(s.capabilitiesAdd, capability) - s.instance.Logger.Debugf("Added capability '%s' to instance '%s'", capability, s.instance.name) + s.instance.Logger.WithFields(logrus.Fields{ + "instance": s.instance.name, + "capability": capability, + }).Debug("added capability to instance") return nil } @@ -61,10 +70,12 @@ func (s *security) AddKubernetesCapabilities(capabilities []string) error { if !s.instance.IsInState(StatePreparing, StateCommitted) { return ErrAddingCapabilitiesNotAllowed.WithParams(s.instance.state.String()) } - for _, capability := range capabilities { - s.capabilitiesAdd = append(s.capabilitiesAdd, capability) - s.instance.Logger.Debugf("Added capability '%s' to instance '%s'", capability, s.instance.name) - } + s.capabilitiesAdd = append(s.capabilitiesAdd, capabilities...) + + s.instance.Logger.WithFields(logrus.Fields{ + "instance": s.instance.name, + "capabilities": strings.Join(capabilities, ", "), + }).Debug("added capabilities to instance") return nil } diff --git a/pkg/instance/sidecars.go b/pkg/instance/sidecars.go index bdd235a..bd0d3db 100644 --- a/pkg/instance/sidecars.go +++ b/pkg/instance/sidecars.go @@ -3,6 +3,8 @@ package instance import ( "context" + "github.com/sirupsen/logrus" + "github.com/celestiaorg/knuu/pkg/system" ) @@ -58,7 +60,10 @@ func (s *sidecars) Add(ctx context.Context, sc SidecarManager) error { s.sidecars = append(s.sidecars, sc) sc.Instance().parentInstance = s.instance - s.instance.Logger.Debugf("Added sidecar '%s' to instance '%s'", sc.Instance().Name(), s.instance.name) + s.instance.Logger.WithFields(logrus.Fields{ + "sidecar": sc.Instance().Name(), + "instance": s.instance.name, + }).Debug("added sidecar to instance") return nil } diff --git a/pkg/instance/state.go b/pkg/instance/state.go index 30aa0fd..2aebcc4 100644 --- a/pkg/instance/state.go +++ b/pkg/instance/state.go @@ -1,5 +1,7 @@ package instance +import "github.com/sirupsen/logrus" + // InstanceState represents the state of the instance type InstanceState int @@ -33,7 +35,10 @@ func (i *Instance) IsInState(states ...InstanceState) bool { func (i *Instance) SetState(state InstanceState) { i.state = state - i.Logger.Debugf("Set state of instance '%s' to '%s'", i.name, i.state.String()) + i.Logger.WithFields(logrus.Fields{ + "instance": i.name, + "state": i.state.String(), + }).Debug("set state of instance") } func (i *Instance) IsState(state InstanceState) bool { diff --git a/pkg/instance/storage.go b/pkg/instance/storage.go index 89b7006..4cee01b 100644 --- a/pkg/instance/storage.go +++ b/pkg/instance/storage.go @@ -9,9 +9,10 @@ import ( "strconv" "strings" - "github.com/celestiaorg/knuu/pkg/k8s" - + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/resource" + + "github.com/celestiaorg/knuu/pkg/k8s" ) type storage struct { @@ -51,7 +52,12 @@ func (s *storage) AddFile(src string, dest string, chown string) error { return s.addFileToInstance(dstPath, dest, chown) } - s.instance.Logger.Debugf("Added file '%s' to instance '%s'", dest, s.instance.name) + s.instance.Logger.WithFields(logrus.Fields{ + "file": dest, + "instance": s.instance.name, + "state": s.instance.state, + "build_dir": s.instance.build.getBuildDir(), + }).Debug("added file") return nil } @@ -98,7 +104,12 @@ func (s *storage) AddFolder(src string, dest string, chown string) error { return ErrCopyingFolderToInstance.WithParams(src, s.instance.name).Wrap(err) } - s.instance.Logger.Debugf("Added folder '%s' to instance '%s'", dest, s.instance.name) + s.instance.Logger.WithFields(logrus.Fields{ + "folder": dest, + "instance": s.instance.name, + "state": s.instance.state, + "build_dir": s.instance.build.getBuildDir(), + }).Debug("added folder") return nil } @@ -132,7 +143,10 @@ func (s *storage) AddFileBytes(bytes []byte, dest string, chown string) error { func (s *storage) AddVolume(path string, size resource.Quantity) error { // temporary feat, we will remove it once we can add multiple volumes if len(s.volumes) > 0 { - s.instance.Logger.Debugf("Maximum volumes exceeded for instance '%s', volumes: %d", s.instance.name, len(s.volumes)) + s.instance.Logger.WithFields(logrus.Fields{ + "instance": s.instance.name, + "volumes": len(s.volumes), + }).Debug("maximum volumes exceeded") return ErrMaximumVolumesExceeded.WithParams(s.instance.name) } return s.AddVolumeWithOwner(path, size, 0) @@ -146,12 +160,20 @@ func (s *storage) AddVolumeWithOwner(path string, size resource.Quantity, owner } // temporary feat, we will remove it once we can add multiple volumes if len(s.volumes) > 0 { - s.instance.Logger.Debugf("Maximum volumes exceeded for instance '%s', volumes: %d", s.instance.name, len(s.volumes)) + s.instance.Logger.WithFields(logrus.Fields{ + "instance": s.instance.name, + "volumes": len(s.volumes), + }).Debug("maximum volumes exceeded") return ErrMaximumVolumesExceeded.WithParams(s.instance.name) } volume := s.instance.K8sClient.NewVolume(path, size, owner) s.volumes = append(s.volumes, volume) - s.instance.Logger.Debugf("Added volume '%s' with size '%s' and owner '%d' to instance '%s'", path, size.String(), owner, s.instance.name) + s.instance.Logger.WithFields(logrus.Fields{ + "volume": path, + "size": size.String(), + "owner": owner, + "instance": s.instance.name, + }).Debug("added volume") return nil } @@ -284,7 +306,10 @@ func (s *storage) deployVolume(ctx context.Context) error { totalSize.Add(volume.Size) } s.instance.K8sClient.CreatePersistentVolumeClaim(ctx, s.instance.k8sName, s.instance.execution.Labels(), totalSize) - s.instance.Logger.Debugf("Deployed persistent volume '%s'", s.instance.k8sName) + s.instance.Logger.WithFields(logrus.Fields{ + "total_size": totalSize.String(), + "instance": s.instance.name, + }).Debug("deployed persistent volume") return nil } @@ -295,7 +320,7 @@ func (s *storage) destroyVolume(ctx context.Context) error { if err != nil { return ErrFailedToDeletePersistentVolumeClaim.Wrap(err) } - s.instance.Logger.Debugf("Destroyed persistent volume '%s'", s.instance.k8sName) + s.instance.Logger.WithField("instance", s.instance.name).Debug("destroyed persistent volume") return nil } @@ -330,7 +355,7 @@ func (s *storage) deployFiles(ctx context.Context) error { return ErrFailedToCreateConfigMap.Wrap(err) } - s.instance.Logger.Debugf("Deployed configmap '%s'", s.instance.k8sName) + s.instance.Logger.WithField("configmap", s.instance.k8sName).Debug("deployed configmap") return nil } @@ -341,7 +366,7 @@ func (s *storage) destroyFiles(ctx context.Context) error { return ErrFailedToDeleteConfigMap.Wrap(err) } - s.instance.Logger.Debugf("Destroyed configmap '%s'", s.instance.k8sName) + s.instance.Logger.WithField("configmap", s.instance.k8sName).Debug("destroyed configmap") return nil } diff --git a/pkg/knuu/knuu.go b/pkg/knuu/knuu.go index 06ce179..898b512 100644 --- a/pkg/knuu/knuu.go +++ b/pkg/knuu/knuu.go @@ -88,7 +88,7 @@ func (k *Knuu) HandleStopSignal(ctx context.Context) { <-stop k.Logger.Info("Received signal to stop, cleaning up resources...") if err := k.CleanUp(ctx); err != nil { - k.Logger.Errorf("Error deleting namespace: %v", err) + k.Logger.WithField("error", err).Error("deleting namespace") } }() } @@ -120,7 +120,7 @@ func (k *Knuu) handleTimeout(ctx context.Context) error { k.Scope, k.K8sClient.Namespace(), instance.TimeoutHandlerInstance.String(), k.K8sClient.Namespace())) // Delete the namespace as it was created by knuu. - k.Logger.Debugf("The namespace generated [%s] will be deleted", k.K8sClient.Namespace()) + k.Logger.WithField("namespace", k.K8sClient.Namespace()).Debug("the namespace will be deleted") commands = append(commands, fmt.Sprintf("kubectl delete namespace %s", k.K8sClient.Namespace())) // Delete all labeled resources within the namespace. @@ -131,7 +131,7 @@ func (k *Knuu) handleTimeout(ctx context.Context) error { // Run the command if err := inst.Build().SetStartCommand("sh", "-c", finalCmd); err != nil { - k.Logger.Debugf("The full command generated is [%s]", finalCmd) + k.Logger.WithField("command", finalCmd).Error("cannot set start command") return ErrCannotSetStartCommand.Wrap(err) } @@ -225,6 +225,6 @@ func setupProxy(ctx context.Context, k *Knuu) error { if err != nil { return ErrCannotGetTraefikEndpoint.Wrap(err) } - k.Logger.Debugf("Proxy endpoint: %s", endpoint) + k.Logger.WithField("endpoint", endpoint).Debug("proxy endpoint") return nil } diff --git a/pkg/log/logger.go b/pkg/log/logger.go index 94dbcc0..7fba226 100644 --- a/pkg/log/logger.go +++ b/pkg/log/logger.go @@ -31,10 +31,17 @@ func DefaultLogger() *logrus.Logger { if customLevel := os.Getenv(envLogLevel); customLevel != "" { err := logger.Level.UnmarshalText([]byte(customLevel)) if err != nil { - logger.Warnf("Failed to parse %s: %v, defaulting to INFO", envLogLevel, err) + logger.WithFields(logrus.Fields{ + "env_log_level": envLogLevel, + "error": err, + }). + Warn("Failed to parse env var LOG_LEVEL, defaulting to INFO") } } - logger.Infof("%s: %s", envLogLevel, logger.GetLevel()) + logger.WithFields(logrus.Fields{ + "env_log_level": envLogLevel, + "log_level": logger.GetLevel(), + }).Info("Current log level") return logger } diff --git a/pkg/minio/minio.go b/pkg/minio/minio.go index c5c3600..ad06303 100644 --- a/pkg/minio/minio.go +++ b/pkg/minio/minio.go @@ -104,7 +104,10 @@ func (m *Minio) Push(ctx context.Context, localReader io.Reader, minioFilePath, return ErrMinioFailedToUploadData.Wrap(err) } - m.Logger.Debugf("Data uploaded successfully to %s in bucket %s", uploadInfo.Key, bucketName) + m.Logger.WithFields(logrus.Fields{ + "key": uploadInfo.Key, + "bucket": bucketName, + }).Debug("Data uploaded successfully") return nil } @@ -125,7 +128,10 @@ func (m *Minio) Delete(ctx context.Context, minioFilePath, bucketName string) er return ErrMinioFailedToDeleteFile.Wrap(err) } - m.Logger.Debugf("File %s deleted successfully from bucket %s", minioFilePath, bucketName) + m.Logger.WithFields(logrus.Fields{ + "key": minioFilePath, + "bucket": bucketName, + }).Debug("File deleted successfully") return nil } @@ -301,12 +307,12 @@ func (m *Minio) createOrUpdateService(ctx context.Context) error { // Check if Minio service already exists existingService, err := serviceClient.Get(ctx, ServiceName, metav1.GetOptions{}) if err == nil { - m.Logger.Debugf("Service `%s` already exists, updating.", ServiceName) + m.Logger.WithField("service", ServiceName).Debug("Service already exists, updating.") minioService.ResourceVersion = existingService.ResourceVersion // Retain the existing resource version if _, err := serviceClient.Update(ctx, minioService, metav1.UpdateOptions{}); err != nil { return ErrMinioFailedToUpdateService.Wrap(err) } - m.Logger.Debugf("Service %s updated successfully.", ServiceName) + m.Logger.WithField("service", ServiceName).Debug("Service updated successfully.") return nil } @@ -315,7 +321,7 @@ func (m *Minio) createOrUpdateService(ctx context.Context) error { return ErrMinioFailedToCreateService.Wrap(err) } - m.Logger.Debugf("Service %s created successfully.", ServiceName) + m.Logger.WithField("service", ServiceName).Debug("Service created successfully.") return nil } @@ -347,7 +353,7 @@ func (m *Minio) createBucketIfNotExists(ctx context.Context, bucketName string) if err := m.client.MakeBucket(ctx, bucketName, miniogo.MakeBucketOptions{}); err != nil { return err } - m.Logger.Debugf("Bucket `%s` created successfully.", bucketName) + m.Logger.WithField("bucket", bucketName).Debug("Bucket created successfully.") return nil } @@ -414,7 +420,7 @@ func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize resou // Check if PVC already exists _, err := pvcClient.Get(ctx, pvcName, metav1.GetOptions{}) if err == nil { - m.Logger.Debugf("PersistentVolumeClaim `%s` already exists.", pvcName) + m.Logger.WithField("pvc", pvcName).Debug("PersistentVolumeClaim already exists.") return nil } @@ -455,7 +461,7 @@ func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize resou return ErrMinioFailedToCreatePersistentVolume.Wrap(err) } } - m.Logger.Debugf("PersistentVolume `%s` created successfully.", existingPV.Name) + m.Logger.WithField("pv", existingPV.Name).Debug("PersistentVolume created successfully.") // Create PVC with the existing or newly created PV pvc := &v1.PersistentVolumeClaim{ @@ -478,6 +484,6 @@ func (m *Minio) createPVC(ctx context.Context, pvcName string, storageSize resou return ErrMinioFailedToCreatePersistentVolumeClaim.Wrap(err) } - m.Logger.Debugf("PersistentVolumeClaim `%s` created successfully.", pvcName) + m.Logger.WithField("pvc", pvcName).Debug("PersistentVolumeClaim created successfully.") return nil } diff --git a/pkg/traefik/traefik.go b/pkg/traefik/traefik.go index 3f4b19a..9606879 100644 --- a/pkg/traefik/traefik.go +++ b/pkg/traefik/traefik.go @@ -256,7 +256,7 @@ func (t *Traefik) createService(ctx context.Context) error { return ErrTraefikFailedToCreateService.Wrap(err) } - t.Logger.Debugf("Service %s created successfully.", traefikServiceName) + t.Logger.WithField("service", traefikServiceName).Debug("Service created successfully.") return nil } @@ -352,7 +352,7 @@ func (t *Traefik) createIngressRoute( func (t *Traefik) IsTraefikAPIAvailable(ctx context.Context) bool { apiResourceList, err := t.K8sClient.Clientset().Discovery().ServerResourcesForGroupVersion(traefikAPIGroupVersion) if err != nil { - t.Logger.Errorf("Failed to discover Traefik API resources: %v", err) + t.Logger.WithField("error", err).Error("Failed to discover Traefik API resources") return false } @@ -371,6 +371,6 @@ func (t *Traefik) IsTraefikAPIAvailable(ctx context.Context) bool { return true } - t.Logger.Warnf("Missing Traefik API resources: %v", requiredResources) + t.Logger.WithField("missing_resources", requiredResources).Warn("Missing Traefik API resources") return false } From 224383d20643532529d95e133444e7249c407e30 Mon Sep 17 00:00:00 2001 From: Moji Date: Wed, 4 Sep 2024 10:31:22 +0200 Subject: [PATCH 2/3] feat: show a message when the deployment is pending (#553) * feat: Show a message when the deployment is pending * chore: updated the max pending time * chore: update loggers to be consistent with the rest from main --- pkg/instance/execution.go | 5 +- pkg/instance/network.go | 27 +++--- pkg/k8s/custom_resource.go | 2 +- pkg/k8s/daemonset.go | 16 +++- pkg/k8s/errors.go | 2 + pkg/k8s/k8s.go | 21 +++-- pkg/k8s/namespace.go | 9 +- pkg/k8s/networkpolicy.go | 2 +- pkg/k8s/pod.go | 26 ++++-- pkg/k8s/pod_status.go | 135 ++++++++++++++++++++++++++++ pkg/k8s/pvc.go | 4 +- pkg/k8s/replicaset.go | 8 +- pkg/k8s/service.go | 16 +++- pkg/k8s/types.go | 3 + pkg/knuu/knuu.go | 2 +- pkg/log/logger.go | 5 +- pkg/sidecars/netshaper/helpers.go | 2 +- pkg/sidecars/netshaper/netshaper.go | 1 - pkg/traefik/traefik.go | 2 +- 19 files changed, 231 insertions(+), 57 deletions(-) create mode 100644 pkg/k8s/pod_status.go diff --git a/pkg/instance/execution.go b/pkg/instance/execution.go index bd390ac..620ccf7 100644 --- a/pkg/instance/execution.go +++ b/pkg/instance/execution.go @@ -73,10 +73,7 @@ func (e *execution) StartWithCallback(ctx context.Context, callback func()) erro go func() { err := e.WaitInstanceIsRunning(ctx) if err != nil { - e.instance.Logger.WithFields(logrus.Fields{ - "instance": e.instance.k8sName, - "error": err, - }).Errorf("waiting for instance to be running") + e.instance.Logger.WithError(err).WithField("instance", e.instance.k8sName).Error("waiting for instance to be running") return } callback() diff --git a/pkg/instance/network.go b/pkg/instance/network.go index ddbca44..10dd74f 100644 --- a/pkg/instance/network.go +++ b/pkg/instance/network.go @@ -83,14 +83,15 @@ func (n *network) PortForwardTCP(ctx context.Context, port int) (int, error) { if attempt == maxRetries { return -1, ErrForwardingPort.WithParams(maxRetries) } - n.instance.Logger.WithFields(logrus.Fields{ - "instance": n.instance.name, - "port": port, - "error": err, - "attempt": attempt, - "max": maxRetries, - "retry_interval": retryInterval.String(), - }).Debug("forwarding port failed, retrying") + n.instance.Logger. + WithError(err). + WithFields(logrus.Fields{ + "instance": n.instance.name, + "port": port, + "attempt": attempt, + "max": maxRetries, + "retry_interval": retryInterval.String(), + }).Debug("forwarding port failed, retrying") } return localPort, nil } @@ -295,10 +296,7 @@ func (n *network) deployOrPatchService(ctx context.Context, portsTCP, portsUDP [ func (n *network) enableIfDisabled(ctx context.Context) error { disableNetwork, err := n.IsDisabled(ctx) if err != nil { - n.instance.Logger.WithFields(logrus.Fields{ - "instance": n.instance.name, - "error": err, - }).Error("error checking network status for instance") + n.instance.Logger.WithError(err).WithField("instance", n.instance.k8sName).Error("error checking network status for instance") return ErrCheckingNetworkStatusForInstance.WithParams(n.instance.k8sName).Wrap(err) } @@ -306,10 +304,7 @@ func (n *network) enableIfDisabled(ctx context.Context) error { return nil } if err := n.Enable(ctx); err != nil { - n.instance.Logger.WithFields(logrus.Fields{ - "instance": n.instance.name, - "error": err, - }).Error("error enabling network for instance") + n.instance.Logger.WithError(err).WithField("instance", n.instance.k8sName).Error("error enabling network for instance") return ErrEnablingNetworkForInstance.WithParams(n.instance.k8sName).Wrap(err) } return nil diff --git a/pkg/k8s/custom_resource.go b/pkg/k8s/custom_resource.go index 09047cd..f174411 100644 --- a/pkg/k8s/custom_resource.go +++ b/pkg/k8s/custom_resource.go @@ -43,7 +43,7 @@ func (c *Client) CreateCustomResource( return ErrCreatingCustomResource.WithParams(gvr.Resource).Wrap(err) } - c.logger.Debugf("CustomResource %s created", name) + c.logger.WithField("name", name).Debug("customResource created") return nil } diff --git a/pkg/k8s/daemonset.go b/pkg/k8s/daemonset.go index f7ed8af..8f8c6c3 100644 --- a/pkg/k8s/daemonset.go +++ b/pkg/k8s/daemonset.go @@ -3,6 +3,7 @@ package k8s import ( "context" + "github.com/sirupsen/logrus" appv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" @@ -49,7 +50,10 @@ func (c *Client) CreateDaemonSet( if err != nil { return nil, ErrCreatingDaemonset.WithParams(name).Wrap(err) } - c.logger.Debugf("DaemonSet %s created in namespace %s", name, c.namespace) + c.logger.WithFields(logrus.Fields{ + "name": name, + "namespace": c.namespace, + }).Debug("daemonSet created") return created, nil } @@ -73,7 +77,10 @@ func (c *Client) UpdateDaemonSet(ctx context.Context, if err != nil { return nil, ErrUpdatingDaemonset.WithParams(name).Wrap(err) } - c.logger.Debugf("DaemonSet %s updated in namespace %s", name, c.namespace) + c.logger.WithFields(logrus.Fields{ + "name": name, + "namespace": c.namespace, + }).Debug("daemonSet updated") return updated, nil } @@ -82,7 +89,10 @@ func (c *Client) DeleteDaemonSet(ctx context.Context, name string) error { if err != nil { return ErrDeletingDaemonset.WithParams(name).Wrap(err) } - c.logger.Debugf("DaemonSet %s deleted in namespace %s", name, c.namespace) + c.logger.WithFields(logrus.Fields{ + "name": name, + "namespace": c.namespace, + }).Debug("daemonSet deleted") return nil } diff --git a/pkg/k8s/errors.go b/pkg/k8s/errors.go index 3a5b302..6892514 100644 --- a/pkg/k8s/errors.go +++ b/pkg/k8s/errors.go @@ -133,4 +133,6 @@ var ( ErrInvalidServiceAccountName = errors.New("InvalidServiceAccountName", "invalid service account name %s: %v") ErrInvalidClusterRoleBindingName = errors.New("InvalidClusterRoleBindingName", "invalid cluster role binding name %s: %v") ErrInvalidServiceName = errors.New("InvalidServiceName", "invalid service name %s: %v") + ErrListingPods = errors.New("ListingPods", "failed to list pods") + ErrGetPodStatus = errors.New("GetPodStatus", "failed to get pod status for pod %s") ) diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 912f1b1..4c7450c 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -29,6 +29,9 @@ const ( // retryInterval is the interval to wait between retries retryInterval = 100 * time.Millisecond + + // if any pod is pending for more than this duration, a warning is logged + defaultMaxPendingDuration = 60 * time.Second ) type Client struct { @@ -37,6 +40,8 @@ type Client struct { dynamicClient dynamic.Interface namespace string logger *logrus.Logger + // max duration for any pod to be in pending state, otherwise it triggers a notice to be shown + maxPendingDuration time.Duration } var _ KubeManager = &Client{} @@ -77,16 +82,18 @@ func NewClientCustom( logger *logrus.Logger, ) (*Client, error) { kc := &Client{ - clientset: cs, - discoveryClient: dc, - dynamicClient: dC, - namespace: namespace, - logger: logger, + clientset: cs, + discoveryClient: dc, + dynamicClient: dC, + namespace: namespace, + logger: logger, + maxPendingDuration: defaultMaxPendingDuration, } kc.namespace = SanitizeName(namespace) if err := kc.CreateNamespace(ctx, kc.namespace); err != nil { return nil, ErrCreatingNamespace.WithParams(kc.namespace).Wrap(err) } + kc.startPendingPodsWarningMonitor(ctx) return kc, nil } @@ -105,3 +112,7 @@ func (c *Client) Namespace() string { func (c *Client) DiscoveryClient() discovery.DiscoveryInterface { return c.discoveryClient } + +func (c *Client) SetMaxPendingDuration(duration time.Duration) { + c.maxPendingDuration = duration +} diff --git a/pkg/k8s/namespace.go b/pkg/k8s/namespace.go index 90c68e8..92caadd 100644 --- a/pkg/k8s/namespace.go +++ b/pkg/k8s/namespace.go @@ -24,10 +24,11 @@ func (c *Client) CreateNamespace(ctx context.Context, name string) error { if !apierrs.IsAlreadyExists(err) { return ErrCreatingNamespace.WithParams(name).Wrap(err) } - c.logger.Debugf("Namespace %s already exists, continuing.\n", name) + c.logger.WithField("name", name).Debug("namespace already exists, continuing") + return nil } - c.logger.Debugf("Namespace %s created.\n", name) + c.logger.WithField("name", name).Debug("namespace created") return nil } @@ -50,10 +51,10 @@ func (c *Client) NamespaceExists(ctx context.Context, name string) (bool, error) } if apierrs.IsNotFound(err) { - c.logger.Debugf("Namespace %s does not exist, err: %v", name, err) + c.logger.WithField("name", name).WithError(err).Debug("namespace does not exist") return false, nil } - c.logger.Errorf("Error getting namespace %s, err: %v", name, err) + c.logger.WithField("name", name).WithError(err).Error("getting namespace") return false, err } diff --git a/pkg/k8s/networkpolicy.go b/pkg/k8s/networkpolicy.go index d852f66..77321aa 100644 --- a/pkg/k8s/networkpolicy.go +++ b/pkg/k8s/networkpolicy.go @@ -97,7 +97,7 @@ func (c *Client) GetNetworkPolicy(ctx context.Context, name string) (*v1.Network func (c *Client) NetworkPolicyExists(ctx context.Context, name string) bool { _, err := c.GetNetworkPolicy(ctx, name) if err != nil { - c.logger.Debug("NetworkPolicy does not exist, err: ", err) + c.logger.WithField("name", name).WithError(err).Debug("getting networkPolicy") return false } diff --git a/pkg/k8s/pod.go b/pkg/k8s/pod.go index 832f9e0..10ae7d3 100644 --- a/pkg/k8s/pod.go +++ b/pkg/k8s/pod.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -105,7 +106,7 @@ func (c *Client) NewFile(source, dest string) *File { } func (c *Client) ReplacePodWithGracePeriod(ctx context.Context, podConfig PodConfig, gracePeriod *int64) (*v1.Pod, error) { - c.logger.Debugf("Replacing pod %s", podConfig.Name) + c.logger.WithField("name", podConfig.Name).Debug("replacing pod") if err := c.DeletePodWithGracePeriod(ctx, podConfig.Name, gracePeriod); err != nil { return nil, ErrDeletingPod.Wrap(err) @@ -127,13 +128,13 @@ func (c *Client) waitForPodDeletion(ctx context.Context, name string) error { for { select { case <-ctx.Done(): - c.logger.Errorf("Context cancelled while waiting for pod %s to delete", name) + c.logger.WithField("name", name).Error("context cancelled while waiting for pod to delete") return ctx.Err() case <-time.After(retryInterval): _, err := c.getPod(ctx, name) if err != nil { if apierrs.IsNotFound(err) { - c.logger.Debugf("Pod %s successfully deleted", name) + c.logger.WithField("name", name).Debug("pod successfully deleted") return nil } return ErrWaitingForPodDeletion.WithParams(name).Wrap(err) @@ -309,8 +310,11 @@ func (c *Client) PortForwardPod( if stderr.Len() > 0 { return ErrPortForwarding.WithParams(stderr.String()) } - c.logger.Debugf("Port forwarding from %d to %d", localPort, remotePort) - c.logger.Debugf("Port forwarding stdout: %v", stdout) + c.logger.WithFields(logrus.Fields{ + "local_port": localPort, + "remote_port": remotePort, + "stdout": stdout.String(), + }).Debug("port forwarding") // Start the port forwarding go func() { @@ -325,7 +329,10 @@ func (c *Client) PortForwardPod( select { case <-readyChan: // Ready to forward - c.logger.Debugf("Port forwarding ready from %d to %d", localPort, remotePort) + c.logger.WithFields(logrus.Fields{ + "local_port": localPort, + "remote_port": remotePort, + }).Debug("port forwarding ready") case err := <-errChan: // if there's an error, return it return ErrForwardingPorts.Wrap(err) @@ -491,7 +498,7 @@ func (c *Client) buildInitContainerCommand(volumes []*Volume, files []*File) []s fullCommand := strings.Join(cmds, "") commands = append(commands, fullCommand) - c.logger.Debugf("Init container command: %s", fullCommand) + c.logger.WithField("command", fullCommand).Debug("init container command") return commands } @@ -582,6 +589,9 @@ func (c *Client) preparePod(spec PodConfig, init bool) *v1.Pod { Spec: c.preparePodSpec(spec, init), } - c.logger.Debugf("Prepared pod %s in namespace %s", spec.Name, spec.Namespace) + c.logger.WithFields(logrus.Fields{ + "name": spec.Name, + "namespace": spec.Namespace, + }).Debug("prepared pod") return pod } diff --git a/pkg/k8s/pod_status.go b/pkg/k8s/pod_status.go new file mode 100644 index 0000000..f326503 --- /dev/null +++ b/pkg/k8s/pod_status.go @@ -0,0 +1,135 @@ +package k8s + +import ( + "context" + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type PodStatus struct { + Name string + Status corev1.PodPhase + PendingDuration time.Duration +} + +// AllPodsStatuses reports the status of pods in the current namespace. +func (c *Client) AllPodsStatuses(ctx context.Context) ([]PodStatus, error) { + pods, err := c.clientset.CoreV1(). + Pods(c.namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, ErrListingPods.Wrap(err) + } + + if len(pods.Items) == 0 { + return nil, nil + } + + output := make([]PodStatus, 0, len(pods.Items)) + for _, pod := range pods.Items { + pendingDuration := time.Duration(0) + if pod.Status.Phase == corev1.PodPending { + pendingDuration = time.Since(pod.CreationTimestamp.Time) + } + + output = append(output, PodStatus{ + Name: pod.Name, + Status: pod.Status.Phase, + PendingDuration: pendingDuration, + }) + } + return output, nil +} + +func (c *Client) PodStatus(ctx context.Context, name string) (PodStatus, error) { + pod, err := c.clientset.CoreV1(). + Pods(c.namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return PodStatus{}, ErrGetPodStatus.WithParams(name).Wrap(err) + } + + pendingDuration := time.Duration(0) + if pod.Status.Phase == corev1.PodPending { + pendingDuration = time.Since(pod.CreationTimestamp.Time) + } + + return PodStatus{ + Name: pod.Name, + Status: pod.Status.Phase, + PendingDuration: pendingDuration, + }, nil +} + +func (c *Client) PrintAllPodsStatuses(ctx context.Context) error { + statuses, err := c.AllPodsStatuses(ctx) + if err != nil { + return err + } + + for _, s := range statuses { + fmt.Printf("%-60s | %s\n", s.Name, s.Status) + } + return nil +} + +// reportLongPendingPods checks for pods that have been pending longer than the specified maxPendingDuration +// and logs a warning message with the pods that are pending for too long. +func (c *Client) reportLongPendingPods(ctx context.Context) error { + statuses, err := c.AllPodsStatuses(ctx) + if err != nil { + return err + } + + // Collect pods that have been pending longer than the allowed duration + longPendingPods := make([]string, 0) + for _, s := range statuses { + if s.Status == corev1.PodPending && s.PendingDuration > c.maxPendingDuration { + longPendingPods = append(longPendingPods, s.Name) + } + } + + // If there are no pods pending too long, return nil + if len(longPendingPods) == 0 { + return nil + } + + c.logger.WithField("pending_pods", strings.Join(longPendingPods, ", ")).Warn("Pods pending for too long") + c.logger.WithField("pod_statuses", generatePodsStatusSummary(statuses)).Info("Pod statuses") + return nil +} + +// startPendingPodsWarningMonitor starts a background process to periodically check for pods that have been pending longer than the maxPendingDuration. +func (c *Client) startPendingPodsWarningMonitor(ctx context.Context) { + go func() { + ticker := time.NewTicker(c.maxPendingDuration) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.reportLongPendingPods(ctx); err != nil { + c.logger.WithError(err).Error("failed to report long pending pods") + } + case <-ctx.Done(): + c.logger.Infof("Shutting down long pending pods monitor.") + return + } + } + }() +} + +func generatePodsStatusSummary(statuses []PodStatus) string { + summary := make(map[corev1.PodPhase]int) + for _, s := range statuses { + summary[s.Status]++ + } + + output := "" + for status, count := range summary { + output += fmt.Sprintf("%s: %d , ", status, count) + } + return strings.TrimSuffix(output, ", ") +} diff --git a/pkg/k8s/pvc.go b/pkg/k8s/pvc.go index 7af2ad2..a39b2ed 100644 --- a/pkg/k8s/pvc.go +++ b/pkg/k8s/pvc.go @@ -48,7 +48,7 @@ func (c *Client) CreatePersistentVolumeClaim( return ErrCreatingPersistentVolumeClaim.WithParams(name).Wrap(err) } - c.logger.Debugf("PersistentVolumeClaim %s created", name) + c.logger.WithField("name", name).Debug("PersistentVolumeClaim created") return nil } @@ -66,7 +66,7 @@ func (c *Client) DeletePersistentVolumeClaim(ctx context.Context, name string) e return ErrDeletingPersistentVolumeClaim.WithParams(name).Wrap(err) } - c.logger.Debugf("PersistentVolumeClaim %s deleted", name) + c.logger.WithField("name", name).Debug("PersistentVolumeClaim deleted") return nil } diff --git a/pkg/k8s/replicaset.go b/pkg/k8s/replicaset.go index bf3adf9..3749daf 100644 --- a/pkg/k8s/replicaset.go +++ b/pkg/k8s/replicaset.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/sirupsen/logrus" appv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -36,7 +37,7 @@ func (c *Client) CreateReplicaSet(ctx context.Context, rsConfig ReplicaSetConfig } func (c *Client) ReplaceReplicaSetWithGracePeriod(ctx context.Context, ReplicaSetConfig ReplicaSetConfig, gracePeriod *int64) (*appv1.ReplicaSet, error) { - c.logger.Debugf("Replacing ReplicaSet %s", ReplicaSetConfig.Name) + c.logger.WithField("name", ReplicaSetConfig.Name).Debug("replacing replicaSet") if err := c.DeleteReplicaSetWithGracePeriod(ctx, ReplicaSetConfig.Name, gracePeriod); err != nil { return nil, ErrDeletingReplicaSet.Wrap(err) @@ -174,6 +175,9 @@ func (c *Client) prepareReplicaSet(rsConf ReplicaSetConfig, init bool) *appv1.Re }, } - c.logger.Debugf("Prepared ReplicaSet %s in namespace %s", rsConf.Name, rsConf.Namespace) + c.logger.WithFields(logrus.Fields{ + "name": rsConf.Name, + "namespace": rsConf.Namespace, + }).Debug("prepared replicaSet") return rs } diff --git a/pkg/k8s/service.go b/pkg/k8s/service.go index 40dbc5e..7650ea6 100644 --- a/pkg/k8s/service.go +++ b/pkg/k8s/service.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,7 +46,10 @@ func (c *Client) CreateService( if err != nil { return nil, ErrCreatingService.WithParams(name).Wrap(err) } - c.logger.Debugf("Service %s created in namespace %s", name, c.namespace) + c.logger.WithFields(logrus.Fields{ + "name": name, + "namespace": c.namespace, + }).Debug("service created") return serv, nil } @@ -79,7 +83,10 @@ func (c *Client) PatchService( return nil, ErrPatchingService.WithParams(name).Wrap(err) } - c.logger.Debugf("Service %s patched in namespace %s", name, c.namespace) + c.logger.WithFields(logrus.Fields{ + "name": name, + "namespace": c.namespace, + }).Debug("service patched") return serv, nil } @@ -97,7 +104,10 @@ func (c *Client) DeleteService(ctx context.Context, name string) error { return ErrDeletingService.WithParams(name).Wrap(err) } - c.logger.Debugf("Service %s deleted in namespace %s", name, c.namespace) + c.logger.WithFields(logrus.Fields{ + "name": name, + "namespace": c.namespace, + }).Debug("service deleted") return nil } diff --git a/pkg/k8s/types.go b/pkg/k8s/types.go index 6e1381b..a6db04f 100644 --- a/pkg/k8s/types.go +++ b/pkg/k8s/types.go @@ -74,4 +74,7 @@ type KubeManager interface { UpdateDaemonSet(ctx context.Context, name string, labels map[string]string, initContainers []corev1.Container, containers []corev1.Container) (*appv1.DaemonSet, error) WaitForDeployment(ctx context.Context, name string) error WaitForService(ctx context.Context, name string) error + AllPodsStatuses(ctx context.Context) ([]PodStatus, error) + PodStatus(ctx context.Context, name string) (PodStatus, error) + PrintAllPodsStatuses(ctx context.Context) error } diff --git a/pkg/knuu/knuu.go b/pkg/knuu/knuu.go index 898b512..edebef9 100644 --- a/pkg/knuu/knuu.go +++ b/pkg/knuu/knuu.go @@ -88,7 +88,7 @@ func (k *Knuu) HandleStopSignal(ctx context.Context) { <-stop k.Logger.Info("Received signal to stop, cleaning up resources...") if err := k.CleanUp(ctx); err != nil { - k.Logger.WithField("error", err).Error("deleting namespace") + k.Logger.WithError(err).Error("deleting namespace") } }() } diff --git a/pkg/log/logger.go b/pkg/log/logger.go index 7fba226..41fd220 100644 --- a/pkg/log/logger.go +++ b/pkg/log/logger.go @@ -31,10 +31,7 @@ func DefaultLogger() *logrus.Logger { if customLevel := os.Getenv(envLogLevel); customLevel != "" { err := logger.Level.UnmarshalText([]byte(customLevel)) if err != nil { - logger.WithFields(logrus.Fields{ - "env_log_level": envLogLevel, - "error": err, - }). + logger.WithError(err).WithField("env_log_level", envLogLevel). Warn("Failed to parse env var LOG_LEVEL, defaulting to INFO") } } diff --git a/pkg/sidecars/netshaper/helpers.go b/pkg/sidecars/netshaper/helpers.go index 53c770e..c6fd108 100644 --- a/pkg/sidecars/netshaper/helpers.go +++ b/pkg/sidecars/netshaper/helpers.go @@ -10,7 +10,7 @@ import ( func (bt *NetShaper) setNewClientByURL(url string) { bt.client = sdk.NewClient(url) - bt.instance.Logger.Debugf("NetShaper (BitTwister) address '%s'", url) + bt.instance.Logger.WithField("address", url).Debug("NetShaper (BitTwister) address") } func (bt *NetShaper) SetPort(port int) { diff --git a/pkg/sidecars/netshaper/netshaper.go b/pkg/sidecars/netshaper/netshaper.go index 3cb0623..b2b5012 100644 --- a/pkg/sidecars/netshaper/netshaper.go +++ b/pkg/sidecars/netshaper/netshaper.go @@ -90,7 +90,6 @@ func (bt *NetShaper) PreStart(ctx context.Context) error { if err != nil { return err } - bt.instance.Logger.Debugf("BitTwister URL: %s", btURL) bt.setNewClientByURL(btURL) return nil diff --git a/pkg/traefik/traefik.go b/pkg/traefik/traefik.go index 9606879..e31456a 100644 --- a/pkg/traefik/traefik.go +++ b/pkg/traefik/traefik.go @@ -352,7 +352,7 @@ func (t *Traefik) createIngressRoute( func (t *Traefik) IsTraefikAPIAvailable(ctx context.Context) bool { apiResourceList, err := t.K8sClient.Clientset().Discovery().ServerResourcesForGroupVersion(traefikAPIGroupVersion) if err != nil { - t.Logger.WithField("error", err).Error("Failed to discover Traefik API resources") + t.Logger.WithError(err).Error("Failed to discover Traefik API resources") return false } From 60ca33ce3f2097a8be8a7a877707ccc3e98d7a7a Mon Sep 17 00:00:00 2001 From: Moji Date: Thu, 5 Sep 2024 07:40:51 +0200 Subject: [PATCH 3/3] make k8s the default builder (#541) * merge main into it * chore: refactored read file from image to use k8s instead of docker * chore: cleanup * filx: linter complain * Update pkg/container/docker.go Co-authored-by: Matthew Sevey <15232757+MSevey@users.noreply.github.com> * chore: merge fix conflicts * fix: remove duplicate test parallel * fix: set state after building image * fix: put the set git repo out of retry loop --------- Co-authored-by: Matthew Sevey <15232757+MSevey@users.noreply.github.com> --- e2e/system/build_from_git_test.go | 14 ++-- e2e/system/file_test.go | 38 +++++++++- e2e/system/suite_setup_test.go | 1 + pkg/container/docker.go | 119 ++---------------------------- pkg/instance/build.go | 22 ++---- pkg/instance/storage.go | 48 +++++++++++- 6 files changed, 102 insertions(+), 140 deletions(-) diff --git a/e2e/system/build_from_git_test.go b/e2e/system/build_from_git_test.go index a2fcb4d..add4297 100644 --- a/e2e/system/build_from_git_test.go +++ b/e2e/system/build_from_git_test.go @@ -65,14 +65,12 @@ func (s *Suite) TestBuildFromGitWithModifications() { s.Require().NoError(err) s.T().Log("Setting git repo") - err = s.RetryOperation(func() error { - return target.Build().SetGitRepo(ctx, builder.GitContext{ - Repo: gitRepo, - Branch: gitBranch, - Username: "", - Password: "", - }) - }, maxRetries) + err = target.Build().SetGitRepo(ctx, builder.GitContext{ + Repo: gitRepo, + Branch: gitBranch, + Username: "", + Password: "", + }) s.Require().NoError(err) s.Require().NoError(target.Build().SetStartCommand("sleep", "infinity")) diff --git a/e2e/system/file_test.go b/e2e/system/file_test.go index 7ea5f37..dcb20b5 100644 --- a/e2e/system/file_test.go +++ b/e2e/system/file_test.go @@ -74,13 +74,11 @@ func (s *Suite) TestDownloadFileFromRunningInstance() { namePrefix = "download-file-running" ) - // Setup - target, err := s.Knuu.NewInstance(namePrefix + "-target") s.Require().NoError(err) ctx := context.Background() - s.Require().NoError(target.Build().SetImage(ctx, "alpine:latest")) + s.Require().NoError(target.Build().SetImage(ctx, alpineImage)) s.Require().NoError(target.Build().SetArgs("tail", "-f", "/dev/null")) // Keep the container running s.Require().NoError(target.Build().Commit(ctx)) s.Require().NoError(target.Execution().Start(ctx)) @@ -100,6 +98,38 @@ func (s *Suite) TestDownloadFileFromRunningInstance() { s.Assert().Equal(fileContent, string(gotContent)) } +func (s *Suite) TestDownloadFileFromBuilder() { + const namePrefix = "download-file-builder" + + target, err := s.Knuu.NewInstance(namePrefix + "-target") + s.Require().NoError(err) + + ctx := context.Background() + s.Require().NoError(target.Build().SetImage(ctx, alpineImage)) + + s.T().Cleanup(func() { + if err := target.Execution().Destroy(ctx); err != nil { + s.T().Logf("error destroying instance: %v", err) + } + }) + + // Test logic + const ( + fileContent = "Hello World!" + filePath = "/hello.txt" + ) + + s.Require().NoError(target.Storage().AddFileBytes([]byte(fileContent), filePath, "0:0")) + + // The commit is required to make the changes persistent to the image + s.Require().NoError(target.Build().Commit(ctx)) + + // Now test if the file can be downloaded correctly from the built image + gotContent, err := target.Storage().GetFileBytes(ctx, filePath) + s.Require().NoError(err, "Error getting file bytes") + + s.Assert().Equal(fileContent, string(gotContent)) +} func (s *Suite) TestMinio() { const ( @@ -112,7 +142,7 @@ func (s *Suite) TestMinio() { s.Require().NoError(err) ctx := context.Background() - s.Require().NoError(target.Build().SetImage(ctx, "alpine:latest")) + s.Require().NoError(target.Build().SetImage(ctx, alpineImage)) s.Require().NoError(target.Build().SetArgs("tail", "-f", "/dev/null")) // Keep the container running s.Require().NoError(target.Build().Commit(ctx)) s.Require().NoError(target.Execution().Start(ctx)) diff --git a/e2e/system/suite_setup_test.go b/e2e/system/suite_setup_test.go index 19f423c..d851992 100644 --- a/e2e/system/suite_setup_test.go +++ b/e2e/system/suite_setup_test.go @@ -17,6 +17,7 @@ import ( const ( testTimeout = time.Minute * 15 // the same time that is used in the ci/cd pipeline + alpineImage = "alpine:latest" resourcesHTML = "resources/html" resourcesFileCMToFolder = "resources/file_cm_to_folder" ) diff --git a/pkg/container/docker.go b/pkg/container/docker.go index 43274e5..6457785 100644 --- a/pkg/container/docker.go +++ b/pkg/container/docker.go @@ -2,20 +2,13 @@ package container import ( - "archive/tar" - "bytes" "context" "crypto/sha256" "fmt" - "io" "os" - "os/exec" "path/filepath" "strings" - "time" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" "github.com/sirupsen/logrus" "github.com/celestiaorg/knuu/pkg/builder" @@ -26,24 +19,18 @@ type BuilderFactory struct { imageNameFrom string imageNameTo string imageBuilder builder.Builder - cli *client.Client dockerFileInstructions []string buildContext string } // NewBuilderFactory creates a new instance of BuilderFactory. func NewBuilderFactory(imageName, buildContext string, imageBuilder builder.Builder) (*BuilderFactory, error) { - cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return nil, ErrCreatingDockerClient.Wrap(err) - } - err = os.MkdirAll(buildContext, 0755) - if err != nil { + if err := os.MkdirAll(buildContext, 0755); err != nil { return nil, ErrFailedToCreateContextDir.Wrap(err) } + return &BuilderFactory{ imageNameFrom: imageName, - cli: cli, dockerFileInstructions: []string{"FROM " + imageName}, buildContext: buildContext, imageBuilder: imageBuilder, @@ -55,104 +42,24 @@ func (f *BuilderFactory) ImageNameFrom() string { return f.imageNameFrom } -// ExecuteCmdInBuilder runs the provided command in the context of the given builder. -// It returns the command's output or any error encountered. -func (f *BuilderFactory) ExecuteCmdInBuilder(command []string) (string, error) { +// AddCmdToBuilder adds the provided command to be run in the context of the builder. +func (f *BuilderFactory) AddCmdToBuilder(command []string) { f.dockerFileInstructions = append(f.dockerFileInstructions, "RUN "+strings.Join(command, " ")) - // FIXME: does not return expected output - return "", nil } // AddToBuilder adds a file from the source path to the destination path in the image, with the specified ownership. -func (f *BuilderFactory) AddToBuilder(srcPath, destPath, chown string) error { +func (f *BuilderFactory) AddToBuilder(srcPath, destPath, chown string) { f.dockerFileInstructions = append(f.dockerFileInstructions, "ADD --chown="+chown+" "+srcPath+" "+destPath) - return nil -} - -// ReadFileFromBuilder reads a file from the given builder's mount point. -// It returns the file's content or any error encountered. -func (f *BuilderFactory) ReadFileFromBuilder(filePath string) ([]byte, error) { - if f.imageNameTo == "" { - return nil, ErrNoImageNameProvided - } - containerConfig := &container.Config{ - Image: f.imageNameTo, - Cmd: []string{"tail", "-f", "/dev/null"}, // This keeps the container running - } - resp, err := f.cli.ContainerCreate( - context.Background(), - containerConfig, - nil, - nil, - nil, - "", - ) - if err != nil { - return nil, ErrFailedToCreateContainer.Wrap(err) - } - - defer func() { - // Stop the container - timeout := int(time.Duration(10) * time.Second) - stopOptions := container.StopOptions{ - Timeout: &timeout, - } - - if err := f.cli.ContainerStop(context.Background(), resp.ID, stopOptions); err != nil { - logrus.Warn(ErrFailedToStopContainer.Wrap(err)) - } - - // Remove the container - if err := f.cli.ContainerRemove(context.Background(), resp.ID, container.RemoveOptions{}); err != nil { - logrus.Warn(ErrFailedToRemoveContainer.Wrap(err)) - } - }() - - if err := f.cli.ContainerStart(context.Background(), resp.ID, container.StartOptions{}); err != nil { - return nil, ErrFailedToStartContainer.Wrap(err) - } - - // Now you can copy the file - reader, _, err := f.cli.CopyFromContainer(context.Background(), resp.ID, filePath) - if err != nil { - return nil, ErrFailedToCopyFileFromContainer.Wrap(err) - } - defer reader.Close() - - tarReader := tar.NewReader(reader) - - for { - header, err := tarReader.Next() - - if err == io.EOF { - break // End of archive - } - if err != nil { - return nil, ErrFailedToReadFromTar.Wrap(err) - } - - if header.Typeflag == tar.TypeReg { // if it's a file then extract it - data, err := io.ReadAll(tarReader) - if err != nil { - return nil, ErrFailedToReadFileFromTar.Wrap(err) - } - return data, nil - } - } - - return nil, ErrFileNotFoundInTar } // SetEnvVar sets the value of an environment variable in the builder. -func (f *BuilderFactory) SetEnvVar(name, value string) error { +func (f *BuilderFactory) SetEnvVar(name, value string) { f.dockerFileInstructions = append(f.dockerFileInstructions, "ENV "+name+"="+value) - return nil } // SetUser sets the user in the builder. -func (f *BuilderFactory) SetUser(user string) error { +func (f *BuilderFactory) SetUser(user string) { f.dockerFileInstructions = append(f.dockerFileInstructions, "USER "+user) - return nil } // Changed returns true if the builder has been modified, false otherwise. @@ -178,6 +85,7 @@ func (f *BuilderFactory) PushBuilderImage(ctx context.Context, imageName string) return ErrFailedToCreateContextDir.Wrap(err) } } + dockerFile := strings.Join(f.dockerFileInstructions, "\n") err := os.WriteFile(dockerFilePath, []byte(dockerFile), 0644) if err != nil { @@ -240,17 +148,6 @@ func (f *BuilderFactory) BuildImageFromGitRepo(ctx context.Context, gitCtx build return err } -func runCommand(cmd *exec.Cmd) error { // nolint: unused - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() - if err != nil { - return fmt.Errorf("command failed: %w\nstdout: %s\nstderr: %s", err, stdout.String(), stderr.String()) - } - return nil -} - // GenerateImageHash creates a hash value based on the contents of the Dockerfile instructions and all files in the build context. func (f *BuilderFactory) GenerateImageHash() (string, error) { hasher := sha256.New() diff --git a/pkg/instance/build.go b/pkg/instance/build.go index a15b1ba..0d2f085 100644 --- a/pkg/instance/build.go +++ b/pkg/instance/build.go @@ -119,10 +119,7 @@ func (b *build) ExecuteCommand(command ...string) error { return ErrAddingCommandNotAllowed.WithParams(b.instance.state.String()) } - _, err := b.builderFactory.ExecuteCmdInBuilder(command) - if err != nil { - return ErrExecutingCommandInInstance.WithParams(command, b.instance.name).Wrap(err) - } + b.builderFactory.AddCmdToBuilder(command) return nil } @@ -133,9 +130,7 @@ func (b *build) SetUser(user string) error { return ErrSettingUserNotAllowed.WithParams(b.instance.state.String()) } - if err := b.builderFactory.SetUser(user); err != nil { - return ErrSettingUser.WithParams(user, b.instance.name).Wrap(err) - } + b.builderFactory.SetUser(user) b.instance.Logger.WithFields(logrus.Fields{ "instance": b.instance.name, "user": user, @@ -224,14 +219,10 @@ func (b *build) getBuildDir() string { } // addFileToBuilder adds a file to the builder -func (b *build) addFileToBuilder(src, dest, chown string) error { - _ = src +func (b *build) addFileToBuilder(src, dest, chown string) { // dest is the same as src here, as we copy the file to the build dir with the subfolder structure of dest - err := b.builderFactory.AddToBuilder(dest, dest, chown) - if err != nil { - return ErrAddingFileToInstance.WithParams(dest, b.instance.name).Wrap(err) - } - return nil + _ = src + b.builderFactory.AddToBuilder(dest, dest, chown) } // SetEnvironmentVariable sets the given environment variable in the instance @@ -247,7 +238,8 @@ func (b *build) SetEnvironmentVariable(key, value string) error { }).Debugf("Setting environment variable") if b.instance.state == StatePreparing { - return b.builderFactory.SetEnvVar(key, value) + b.builderFactory.SetEnvVar(key, value) + return nil } b.env[key] = value return nil diff --git a/pkg/instance/storage.go b/pkg/instance/storage.go index 4cee01b..42d3d3f 100644 --- a/pkg/instance/storage.go +++ b/pkg/instance/storage.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/celestiaorg/knuu/pkg/k8s" + "github.com/celestiaorg/knuu/pkg/names" ) type storage struct { @@ -47,7 +48,8 @@ func (s *storage) AddFile(src string, dest string, chown string) error { switch s.instance.state { case StatePreparing: - return s.instance.build.addFileToBuilder(src, dest, chown) + s.instance.build.addFileToBuilder(src, dest, chown) + return nil case StateCommitted: return s.addFileToInstance(dstPath, dest, chown) } @@ -185,7 +187,7 @@ func (s *storage) GetFileBytes(ctx context.Context, file string) ([]byte, error) } if s.instance.state != StateStarted { - bytes, err := s.instance.build.builderFactory.ReadFileFromBuilder(file) + bytes, err := s.readFileFromImage(ctx, file) if err != nil { return nil, ErrGettingFile.WithParams(file, s.instance.name).Wrap(err) } @@ -370,6 +372,48 @@ func (s *storage) destroyFiles(ctx context.Context) error { return nil } +func (s *storage) readFileFromImage(ctx context.Context, filePath string) ([]byte, error) { + // Another way to implement this is to download all the layers of the image and then + // extract the file from them, but it seems hacky and will run on the user's machine. + // Therefore, we will use the tmp instance to get the file from the image + + tmpName, err := names.NewRandomK8("tmp-dl") + if err != nil { + return nil, err + } + + ti, err := New(tmpName, s.instance.SystemDependencies) + if err != nil { + return nil, err + } + if err := ti.build.SetImage(ctx, s.instance.build.ImageName()); err != nil { + return nil, err + } + + if err := ti.build.SetStartCommand("sleep", "infinity"); err != nil { + return nil, err + } + + if err := ti.build.Commit(ctx); err != nil { + return nil, err + } + + if err := ti.execution.Start(ctx); err != nil { + return nil, err + } + defer func() { + if err := ti.execution.Destroy(ctx); err != nil { + ti.Logger.Errorf("failed to destroy tmp instance %s: %v", ti.k8sName, err) + } + }() + + output, err := ti.execution.ExecuteCommand(ctx, "cat", filePath) + if err != nil { + return nil, err + } + return []byte(output), nil +} + func (s *storage) clone() *storage { if s == nil { return nil