Skip to content

Commit

Permalink
Remove logging and handle issues with goroutines and channels
Browse files Browse the repository at this point in the history
Signed-off-by: Dom Del Nano <[email protected]>
  • Loading branch information
ddelnano committed Dec 22, 2024
1 parent 1044b13 commit da21ad7
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 62 deletions.
1 change: 0 additions & 1 deletion src/pixie_cli/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ go_library(
"@com_github_fatih_color//:color",
"@in_gopkg_yaml_v2//:yaml_v2",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@com_github_sirupsen_logrus//:logrus",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_sync//errgroup",
],
Expand Down
5 changes: 0 additions & 5 deletions src/pixie_cli/pkg/utils/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
package utils

import (
"fmt"

log "github.com/sirupsen/logrus"

"golang.org/x/sync/errgroup"

"px.dev/pixie/src/pixie_cli/pkg/components"
Expand Down Expand Up @@ -53,7 +49,6 @@ func (s *SerialTaskRunner) RunAndMonitor() error {
for _, t := range s.tasks {
ti := st.AddTask(t.Name())
err := t.Run()
log.Warn(fmt.Sprintf("Task %s completed with error: %v", t.Name(), err))
ti.Complete(err)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion src/pixie_cli/pkg/vizier/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ func (c *LogCollector) CollectPixieLogs(fName string) error {
outputCh, err := RunSimpleHealthCheckScript(c.br, c.cloudAddr, clusterID)

if err != nil {
log.WithError(err).Warn("failed to run health check script")
entry := log.WithError(err)
if _, ok := err.(*HealthCheckWarning); ok {
entry.Warn("health check script returned warning")
} else {
entry.Warn("failed to run health check script")
}
}

return c.LogOutputToZipFile(zf, "px_agent_diagnostics.txt", <-outputCh)
Expand Down
92 changes: 40 additions & 52 deletions src/pixie_cli/pkg/vizier/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"io"
"math"
"strings"
"sync"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -288,8 +287,6 @@ func evaluateHealthCheckResult(output string) error {
if err != nil {
return err
}
log.Warnf("Health check output: %v\n", jsonData)

if v, ok := jsonData[headersInstalledPercColumn]; ok {
switch t := v.(type) {
case float64:
Expand All @@ -304,90 +301,81 @@ func evaluateHealthCheckResult(output string) error {
return nil
}

type healthCheckData struct {
line string
err error
}

// Runs the health check script on the specified vizier. The script's output is evaluated with
// the evaluateHealthCheckResult function to determine if the cluster is healthy. Only a single
// line of output will be parsed from the script.
func runHealthCheckScript(v *Connector, execScript *script.ExecutableScript) (chan string, error) {
output := make(chan string, 1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var encOpts, decOpts *vizierpb.ExecuteScriptRequest_EncryptionOptions

resp, err := RunScript(ctx, []*Connector{v}, execScript, encOpts)
if err != nil {
return output, err
return nil, err
}

reader, writer := io.Pipe()
defer writer.Close()
defer reader.Close()
factoryFunc := func(md *vizierpb.ExecuteScriptResponse_MetaData) components.OutputStreamWriter {
return components.CreateStreamWriter("json", writer)
}
tw := NewStreamOutputAdapterWithFactory(ctx, resp, "json", decOpts, factoryFunc)

bufReader := bufio.NewReader(reader)
errCh := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
streamCh := make(chan healthCheckData, 1)
outputCh := make(chan string, 1)
go func() {
defer wg.Done()
defer writer.Close()
err = tw.WaitForCompletion()

if err != nil {
log.Warnf("Error on tw.WaitForCompletion: %v", err)
errCh <- err
return
defer close(streamCh)
for {
line, err := bufReader.ReadString('\n')
streamCh <- healthCheckData{line, err}
if err != nil {
return
}
}
}()

wg.Add(1)
// Consumes the first line of output from the stream or the error from the context.
// The px/agent_status_diagnostics script only outputs one line, but even in the case
// that the fallback (px/agent_status) is used, a single line informs whether the output
// can be processed properly.
go func() {
defer wg.Done()
defer close(output)
var prevLine string
defer close(errCh)
defer close(outputCh)
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
default:
if ctx.Err() != nil {
errCh <- ctx.Err()
return
}
line, err := bufReader.ReadString('\n')
if err != nil {
if err == io.EOF {
log.Warn("EOF reached while reading health check script output")
output <- prevLine
errCh <- err
return
}
errCh <- err
return
}
// Capture the last line of output. This ensures
// that the EOF case returns the actual output instead of
// an EOF string.
prevLine = line
err = evaluateHealthCheckResult(line)
if err != nil {
log.Warn("evaluateHealthCheckResult err")
output <- prevLine
errCh <- err
return
case data := <-streamCh:
line := data.line
err := data.err
if err == nil {
err = evaluateHealthCheckResult(line)
}
outputCh <- line
errCh <- err
return

}
}
}()

go func() {
wg.Wait()
close(errCh)
}()
err = tw.WaitForCompletion()

if err != nil {
return outputCh, err
}
err = <-errCh

return output, err
return outputCh, err
}

// RunSimpleHealthCheckScript runs a diagnostic pxl script to verify query serving works.
Expand Down
3 changes: 0 additions & 3 deletions src/pixie_cli/pkg/vizier/stream_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,8 @@ func (v *StreamOutputAdapter) handleStream(ctx context.Context, stream chan *Exe
var err error
switch res := msg.Resp.Result.(type) {
case *vizierpb.ExecuteScriptResponse_MetaData:
log.Warn("v.handleMetadata")
err = v.handleMetadata(ctx, res)
case *vizierpb.ExecuteScriptResponse_Data:
log.Warn("v.handleData")
err = v.handleData(ctx, res)
default:
err = fmt.Errorf("unhandled response type" + reflect.TypeOf(msg.Resp.Result).String())
Expand Down Expand Up @@ -416,7 +414,6 @@ func (v *StreamOutputAdapter) handleData(ctx context.Context, d *vizierpb.Execut
}
ti := v.tableNameToInfo[tableName]
if err := ti.w.Write(rec); err != nil {
log.WithError(err).Error("Failed to write to table info stream")
return err
}
}
Expand Down

0 comments on commit da21ad7

Please sign in to comment.