Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
matmerr committed Jul 18, 2024
1 parent e53c80a commit da0410c
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ helm-install-advanced-local-context: manifests
helm-install-hubble:
helm upgrade --install retina ./deploy/hubble/manifests/controller/helm/retina/ \
--namespace kube-system \
--set os.windows=true \
--set operator.enabled=true \
--set operator.repository=$(IMAGE_REGISTRY)/$(RETINA_OPERATOR_IMAGE) \
--set operator.tag=$(HELM_IMAGE_TAG) \
Expand Down Expand Up @@ -528,4 +529,3 @@ quick-deploy-hubble:
.PHONY: simplify-dashboards
simplify-dashboards:
cd deploy/legacy/graphana/dashboards && go test . -tags=dashboard,simplifydashboard -v && cd $(REPO_ROOT)

3 changes: 2 additions & 1 deletion pkg/module/metrics/metrics_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
tcp string = "tcp"
nodeApiserver string = "node_apiserver"
dns string = "dns"
pktmon string = "pktmon"

metricModuleReq filtermanager.Requestor = "metricModule"
interval time.Duration = 1 * time.Second
Expand Down Expand Up @@ -237,7 +238,7 @@ func (m *Module) updateMetricsContexts(spec *api.MetricsSpec) {
if lm != nil {
m.registry[nodeApiserver] = lm
}
case strings.Contains(ctxOption.MetricName, dns):
case strings.Contains(ctxOption.MetricName, dns) || strings.Contains(ctxOption.MetricName, pktmon):
dm := NewDNSMetrics(&ctxOption, m.l, ctxType)
if dm != nil {
m.registry[ctxOption.MetricName] = dm
Expand Down
143 changes: 85 additions & 58 deletions pkg/plugin/windows/pktmon/pktmon_plugin_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pktmon

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -26,7 +27,7 @@ var (
ErrNilEnricher = errors.New("enricher is nil")
ErrUnexpectedExit = errors.New("unexpected exit")

socket = "/tmp/retina-pktmon.sock"
socket = "/temp/retina-pktmon.sock"
)

const (
Expand Down Expand Up @@ -56,20 +57,29 @@ type Client struct {
}

func NewClient() (*Client, error) {
retryPolicy := `{
"methodConfig": [{
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}`
retryPolicy := map[string]any{
"methodConfig": []map[string]any{
{
"waitForReady": true,
"retryPolicy": map[string]any{
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": []string{"UNAVAILABLE"},
},
},
},
}

bytes, err := json.Marshal(retryPolicy)
if err != nil {
return nil, fmt.Errorf("failed to marshal retry policy: %w", err)
}

retryPolicyStr := string(bytes)

conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicyStr))
if err != nil {
return nil, fmt.Errorf("failed to dial pktmon server: %w", err)
}
Expand All @@ -79,20 +89,15 @@ func NewClient() (*Client, error) {

func (p *Plugin) RunPktMonServer() error {
p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel}
p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel}
defer p.stdWriter.Close()
p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel}
defer p.errWriter.Close()
p.pktmonCmd = exec.Command("controller-pktmon.exe")
p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket)
p.pktmonCmd.Env = os.Environ()
p.pktmonCmd.Stdout = p.stdWriter
p.pktmonCmd.Stderr = p.errWriter

p.l.Info("setting up enricher since pod level is enabled \n")
p.enricher = enricher.Instance()
if p.enricher == nil {
return ErrNilEnricher
}

p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String()))

// block this thread, and should it ever return, it's a problem
Expand All @@ -107,6 +112,13 @@ func (p *Plugin) RunPktMonServer() error {

func (p *Plugin) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

p.enricher = enricher.Instance()
if p.enricher == nil {
return ErrNilEnricher
}

go func() {
err := p.RunPktMonServer()
if err != nil {
Expand All @@ -122,7 +134,7 @@ func (p *Plugin) Start(ctx context.Context) error {
p.l.Info("creating pktmon client")
client, err := NewClient()
if err != nil {
return err
return fmt.Errorf("failed to create pktmon client before getting flows: %w", err)
}

str, err = client.GetFlows(ctx, &observerv1.GetFlowsRequest{})
Expand All @@ -141,53 +153,61 @@ func (p *Plugin) Start(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("pktmon context cancelled: %w", ctx.Err())
default:
event, err := str.Recv()
err := p.GetFlow(str)
if err != nil {
return fmt.Errorf("failed to receive pktmon event: %w", err)
return fmt.Errorf("failed to get flow from observer: %w", err)
}
}
}
}

fl := event.GetFlow()
if fl == nil {
p.l.Error("received nil flow, flow proto mismatch from client/server?")
continue
}
func (p *Plugin) GetFlow(str observerv1.Observer_GetFlowsClient) error {
event, err := str.Recv()
if err != nil {
return fmt.Errorf("failed to receive pktmon event: %w", err)
}

ev := &v1.Event{
Event: event.GetFlow(),
Timestamp: event.GetFlow().GetTime(),
}
fl := event.GetFlow()
if fl == nil {
p.l.Error("received nil flow, flow proto mismatch from client/server?")
return nil
}

if fl.GetType() == flow.FlowType_L7 {
dns := fl.GetL7().GetDns()
if dns != nil {
query := dns.GetQuery()
ans := dns.GetIps()
if dns.GetQtypes()[0] == "Q" {
p.l.Sugar().Debugf("query from %s to %s: request %s\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), query)
} else {
p.l.Sugar().Debugf("answer from %s to %s: result: %+v\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), ans)
}
}
}
ev := &v1.Event{
Event: fl,
Timestamp: fl.GetTime(),
}

if p.enricher != nil {
p.enricher.Write(ev)
if fl.GetType() == flow.FlowType_L7 {
dns := fl.GetL7().GetDns()
if dns != nil {
query := dns.GetQuery()
ans := dns.GetIps()
if dns.GetQtypes()[0] == "Q" {
p.l.Sugar().Debugf("query from %s to %s: request %s\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), query)
} else {
p.l.Error("enricher is nil when writing event")
p.l.Sugar().Debugf("answer from %s to %s: result: %+v\n", fl.GetIP().GetSource(), fl.GetIP().GetDestination(), ans)
}
}
}

// Write the event to the external channel.
if p.externalChannel != nil {
select {
case p.externalChannel <- ev:
default:
// Channel is full, drop the event.
// We shouldn't slow down the reader.
metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc()
}
}
if p.enricher != nil {
p.enricher.Write(ev)
} else {
p.l.Error("enricher is nil when writing event")
}

// Write the event to the external channel.
if p.externalChannel != nil {
select {
case p.externalChannel <- ev:
default:
// Channel is full, drop the event.
// We shouldn't slow down the reader.
metrics.LostEventsCounter.WithLabelValues(utils.ExternalChannel, string(Name)).Inc()
}
}
return nil
}

func (p *Plugin) SetupChannel(ch chan *v1.Event) error {
Expand All @@ -202,6 +222,13 @@ func New(_ *kcfg.Config) api.Plugin {
}

func (p *Plugin) Stop() error {
if p.pktmonCmd != nil {
err := p.pktmonCmd.Process.Kill()
if err != nil {
return fmt.Errorf("failed to kill pktmon server during stop: %w", err)
}
}

return nil
}

Expand Down

0 comments on commit da0410c

Please sign in to comment.