Skip to content

Commit

Permalink
plugin server management
Browse files Browse the repository at this point in the history
  • Loading branch information
matmerr committed May 29, 2024
1 parent bb0f414 commit c9af76b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 28 deletions.
1 change: 0 additions & 1 deletion controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func init() {
}

func main() {
fmt.Printf("starting Retina %v\n", version)
var metricsAddr string
var enableLeaderElection bool
var probeAddr string
Expand Down
2 changes: 1 addition & 1 deletion pkg/module/metrics/drops.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (d *DropCountMetrics) processLocalCtxFlow(flow *v1.Flow) {
if labelValuesMap == nil {
return
}
dropReason := metrics.GetDropTypeFlowDropReason(flow.GetDropReasonDesc())
dropReason := utils.DropReasonDescription(flow)

// Ingress values
if l := len(labelValuesMap[ingress]); l > 0 {
Expand Down
70 changes: 45 additions & 25 deletions pkg/plugin/windows/pktmon/pktmon_plugin_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@ import (
)

var (
ErrNilEnricher error = errors.New("enricher is nil")
client *pktMonClient
socket = "/tmp/retina-pktmon.sock"
ErrNilEnricher error = errors.New("enricher is nil")
ErrUnexpectedExit error = errors.New("unexpected exit")

socket = "/tmp/retina-pktmon.sock"
)

const (
Name = "pktmon"
Name = "pktmon"
connectionRetryAttempts = 3
)

type PktMonPlugin struct {
enricher enricher.EnricherInterface
externalChannel chan *v1.Event
pkt PktMon
l *log.ZapLogger
pktmonCmd *exec.Cmd
stdWriter *zapio.Writer
errWriter *zapio.Writer
}

func (p *PktMonPlugin) Init() error {

return nil
}

Expand Down Expand Up @@ -67,19 +68,20 @@ func NewClient() (*pktMonClient, error) {

conn, err := grpc.Dial(fmt.Sprintf("%s:%s", "unix", socket), grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to dial pktmon server: %w", err)
}

return &pktMonClient{observerv1.NewObserverClient(conn)}, nil
}

func (p *PktMonPlugin) Start(ctx context.Context) error {
func (p *PktMonPlugin) RunPktMonServer() error {
p.stdWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.InfoLevel}
p.errWriter = &zapio.Writer{Log: p.l.Logger, Level: zap.ErrorLevel}
defer p.stdWriter.Close()
p.pktmonCmd = exec.Command("controller-pktmon.exe")
p.pktmonCmd.Args = append(p.pktmonCmd.Args, "--socketpath", socket)
p.pktmonCmd.Stdout = p.stdWriter
p.pktmonCmd.Stderr = p.stdWriter
p.pktmonCmd.Stderr = p.errWriter

p.l.Info("setting up enricher since pod level is enabled \n")
p.enricher = enricher.Instance()
Expand All @@ -88,37 +90,56 @@ func (p *PktMonPlugin) Start(ctx context.Context) error {
}

p.l.Info("calling start on pktmon stream server", zap.String("cmd", p.pktmonCmd.String()))
err := p.pktmonCmd.Start()

// block this thread, and should it ever return, it's a problem
err := p.pktmonCmd.Run()
if err != nil {
return fmt.Errorf("failed to start pktmon stream server: %w", err)
return fmt.Errorf("pktmon server exited when it should not have: %w", err)
}

p.l.Info("creating pktmon client")
// we never want to return happy from this
return fmt.Errorf("pktmon server exited unexpectedly: %w", ErrUnexpectedExit)
}

func (p *PktMonPlugin) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
go func() {
err := p.RunPktMonServer()
if err != nil {
p.l.Error("failed to run pktmon server", zap.Error(err))
}

// if the pktmon server process exits, cancel the context, we need to crash
cancel()
}()

var str observerv1.Observer_GetFlowsClient
fn := func() error {
client, err = NewClient()
p.l.Info("creating pktmon client")
client, err := NewClient()
if err != nil {
return err
}

str, err = client.GetFlows(ctx, &observerv1.GetFlowsRequest{})
if err != nil {
return fmt.Errorf("failed to open pktmon stream: %w", err)
}
return nil
}
err = utils.Retry(fn, 10)
err := utils.Retry(fn, connectionRetryAttempts)
if err != nil {
return fmt.Errorf("failed to create pktmon client: %w", err)
}

str, err := client.GetFlows(ctx, &observerv1.GetFlowsRequest{})
if err != nil {
return fmt.Errorf("failed to open pktmon stream: %w", err)
}

for {
select {
case <-ctx.Done():
return fmt.Errorf("pktmon context cancelled: %v", ctx.Err())
return fmt.Errorf("pktmon context cancelled: %w", ctx.Err())
default:
event, err := str.Recv()
if err != nil {
p.l.Error("failed to receive pktmon event", zap.Error(err))
return fmt.Errorf("failed to receive pktmon event: %w", err)
}

fl := event.GetFlow()
Expand All @@ -129,7 +150,7 @@ func (p *PktMonPlugin) Start(ctx context.Context) error {

ev := &v1.Event{
Event: event.GetFlow(),
Timestamp: event.GetFlow().Time,
Timestamp: event.GetFlow().GetTime(),
}

if p.enricher != nil {
Expand Down Expand Up @@ -164,9 +185,8 @@ func New(cfg *kcfg.Config) api.Plugin {
}

func (p *PktMonPlugin) Stop() error {
//p.pktmonCmd.Process.Kill()
//p.pktmonCmd.Wait()
//p.stdWriter.Close()
// p.pktmonCmd.Wait()

Check failure on line 188 in pkg/plugin/windows/pktmon/pktmon_plugin_windows.go

View workflow job for this annotation

GitHub Actions / Lint (windows, arm64)

commentedOutCode: may want to remove commented-out code (gocritic)
// p.stdWriter.Close()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Retry(f func() error, retry int) (err error) {
return nil
}
t := int64(math.Pow(2, float64(i)))
fmt.Printf("failed with %v, retrying in %d seconds...\n", err, t)
time.Sleep(time.Duration(t) * time.Second)
}
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/utils_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func GetDropReasonDesc(dr DropReason) flow.DropReason {
fmt.Printf("getting drop reason description for %v\n", dr)
switch dr {
switch dr { //nolint:exhaustive // We are handling all the cases.
case DropReason_Drop_INET_FinWait2:
fmt.Printf("setting drop as %v\n", flow.DropReason_UNKNOWN_CONNECTION_TRACKING_STATE)
return flow.DropReason_UNKNOWN_CONNECTION_TRACKING_STATE
Expand Down

0 comments on commit c9af76b

Please sign in to comment.