From cdcae4cc32f8ae86ff5644fb90731d06f8ef4650 Mon Sep 17 00:00:00 2001 From: Mathew Merrick Date: Thu, 18 Jul 2024 10:30:40 -0700 Subject: [PATCH] feedback --- Makefile | 2 +- pkg/module/metrics/metrics_module.go | 3 +- .../windows/pktmon/pktmon_plugin_windows.go | 137 +++++++++++------- 3 files changed, 84 insertions(+), 58 deletions(-) diff --git a/Makefile b/Makefile index a35a0dfcf9b..fb3c46e423a 100644 --- a/Makefile +++ b/Makefile @@ -454,6 +454,7 @@ helm-install-advanced-local-context: manifests helm-install-hubble: helm upgrade --install retina ./deploy/hubble/manifests/controller/helm/retina/ \ --namespace kube-system \ + --set os.windows=true \ --set operator.enabled=true \ --set operator.repository=$(IMAGE_REGISTRY)/$(RETINA_OPERATOR_IMAGE) \ --set operator.tag=$(HELM_IMAGE_TAG) \ @@ -528,4 +529,3 @@ quick-deploy-hubble: .PHONY: simplify-dashboards simplify-dashboards: cd deploy/legacy/graphana/dashboards && go test . -tags=dashboard,simplifydashboard -v && cd $(REPO_ROOT) - diff --git a/pkg/module/metrics/metrics_module.go b/pkg/module/metrics/metrics_module.go index 9fbb7af4238..2e193ad8f1f 100644 --- a/pkg/module/metrics/metrics_module.go +++ b/pkg/module/metrics/metrics_module.go @@ -31,6 +31,7 @@ const ( tcp string = "tcp" nodeApiserver string = "node_apiserver" dns string = "dns" + pktmon string = "pktmon" metricModuleReq filtermanager.Requestor = "metricModule" interval time.Duration = 1 * time.Second @@ -237,7 +238,7 @@ func (m *Module) updateMetricsContexts(spec *api.MetricsSpec) { if lm != nil { m.registry[nodeApiserver] = lm } - case strings.Contains(ctxOption.MetricName, dns): + case strings.Contains(ctxOption.MetricName, dns) || strings.Contains(ctxOption.MetricName, pktmon): dm := NewDNSMetrics(&ctxOption, m.l, ctxType) if dm != nil { m.registry[ctxOption.MetricName] = dm diff --git a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go index 91da5df4479..6a3099e8fb5 100644 --- a/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go +++ b/pkg/plugin/windows/pktmon/pktmon_plugin_windows.go @@ -2,6 +2,7 @@ package pktmon import ( "context" + "encoding/json" "errors" "fmt" "os" @@ -56,20 +57,29 @@ type Client struct { } func NewClient() (*Client, error) { - retryPolicy := `{ - "methodConfig": [{ - "waitForReady": true, - "retryPolicy": { - "MaxAttempts": 4, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - } - }] - }` + retryPolicy := map[string]any{ + "methodConfig": []map[string]any{ + { + "waitForReady": true, + "retryPolicy": map[string]any{ + "MaxAttempts": 4, + "InitialBackoff": ".01s", + "MaxBackoff": ".01s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": []string{"UNAVAILABLE"}, + }, + }, + }, + } - conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) + bytes, err := json.Marshal(retryPolicy) + if err != nil { + return nil, fmt.Errorf("failed to marshal retry policy: %w", err) + } + + retryPolicyStr := string(bytes) + + conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr)) if err != nil { return nil, fmt.Errorf("failed to dial pktmon server: %w", err) } @@ -79,19 +89,15 @@ func NewClient() (*Client, error) { func (p *Plugin) RunPktMonServer() error { p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel} - p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} defer p.stdWriter.Close() + p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel} + defer p.errWriter.Close() p.pktmonCmd = exec.Command("controller-pktmon.exe") p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket) p.pktmonCmd.Env = os.Environ() p.pktmonCmd.Stdout = p.stdWriter p.pktmonCmd.Stderr = p.errWriter - p.l.Info("setting up enricher since pod level is enabled \n") - p.enricher = enricher.Instance() - if p.enricher == nil { - return ErrNilEnricher - } p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String())) @@ -107,6 +113,10 @@ func (p *Plugin) RunPktMonServer() error { func (p *Plugin) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) + defer cancel() + + + go func() { err := p.RunPktMonServer() if err != nil { @@ -122,7 +132,7 @@ func (p *Plugin) Start(ctx context.Context) error { p.l.Info("creating pktmon client") client, err := NewClient() if err != nil { - return err + return fmt.Errorf("failed to create pktmon client before getting flows: %w", err) } str, err = client.GetFlows(ctx, &observerv1.GetFlowsRequest{}) @@ -141,53 +151,61 @@ func (p *Plugin) Start(ctx context.Context) error { case <-ctx.Done(): return fmt.Errorf("pktmon context cancelled: %w", ctx.Err()) default: - event, err := str.Recv() + err := p.GetFlow(str) if err != nil { - return fmt.Errorf("failed to receive pktmon event: %w", err) + return fmt.Errorf("failed to get flow from observer: %w", err) } + } + } +} - fl := event.GetFlow() - if fl == nil { - p.l.Error("received nil flow, flow proto mismatch from client/server?") - continue - } +func (p *Plugin) GetFlow(str observerv1.Observer_GetFlowsClient) error { + event, err := str.Recv() + if err != nil { + return fmt.Errorf("failed to receive pktmon event: %w", err) + } - ev := &v1.Event{ - Event: event.GetFlow(), - Timestamp: event.GetFlow().GetTime(), - } + fl := event.GetFlow() + if fl == nil { + p.l.Error("received nil flow, flow proto mismatch from client/server?") + return nil + } - if fl.GetType() == flow.FlowType_L7 { - dns := fl.GetL7().GetDns() - if dns != nil { - query := dns.GetQuery() - ans := dns.GetIps() - if dns.GetQtypes()[0] == "Q" { - p.l.Sugar().Debugf("query from %s to %s: request %s\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), query) - } else { - p.l.Sugar().Debugf("answer from %s to %s: result: %+v\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), ans) - } - } - } + ev := &v1.Event{ + Event: event.GetFlow(), + Timestamp: event.GetFlow().GetTime(), + } - if p.enricher != nil { - p.enricher.Write(ev) + if fl.GetType() == flow.FlowType_L7 { + dns := fl.GetL7().GetDns() + if dns != nil { + query := dns.GetQuery() + ans := dns.GetIps() + if dns.GetQtypes()[0] == "Q" { + p.l.Sugar().Debugf("query from %s to %s: request %s\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), query) } else { - p.l.Error("enricher is nil when writing event") + p.l.Sugar().Debugf("answer from %s to %s: result: %+v\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), ans) } + } + } - // Write the event to the external channel. - if p.externalChannel != nil { - select { - case p.externalChannel <- ev: - default: - // Channel is full, drop the event. - // We shouldn't slow down the reader. - metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc() - } - } + if p.enricher != nil { + p.enricher.Write(ev) + } else { + p.l.Error("enricher is nil when writing event") + } + + // Write the event to the external channel. + if p.externalChannel != nil { + select { + case p.externalChannel <- ev: + default: + // Channel is full, drop the event. + // We shouldn't slow down the reader. + metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc() } } + return nil } func (p *Plugin) SetupChannel(ch chan *v1.Event) error { @@ -202,6 +220,13 @@ func New(_ *kcfg.Config) api.Plugin { } func (p *Plugin) Stop() error { + if p.pktmonCmd != nil { + err := p.pktmonCmd.Process.Kill() + if err != nil { + return fmt.Errorf("failed to kill pktmon server during stop: %w", err) + } + } + return nil }