diff --git a/src/pixie_cli/pkg/utils/BUILD.bazel b/src/pixie_cli/pkg/utils/BUILD.bazel index 278d4674847..f8026845e32 100644 --- a/src/pixie_cli/pkg/utils/BUILD.bazel +++ b/src/pixie_cli/pkg/utils/BUILD.bazel @@ -39,7 +39,6 @@ go_library( "@com_github_fatih_color//:color", "@in_gopkg_yaml_v2//:yaml_v2", "@io_k8s_apimachinery//pkg/apis/meta/v1:meta", - "@com_github_sirupsen_logrus//:logrus", "@org_golang_google_grpc//:grpc", "@org_golang_x_sync//errgroup", ], diff --git a/src/pixie_cli/pkg/utils/job_runner.go b/src/pixie_cli/pkg/utils/job_runner.go index 1c184ed43fc..29f7ca42d54 100644 --- a/src/pixie_cli/pkg/utils/job_runner.go +++ b/src/pixie_cli/pkg/utils/job_runner.go @@ -19,10 +19,6 @@ package utils import ( - "fmt" - - log "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" "px.dev/pixie/src/pixie_cli/pkg/components" @@ -53,7 +49,6 @@ func (s *SerialTaskRunner) RunAndMonitor() error { for _, t := range s.tasks { ti := st.AddTask(t.Name()) err := t.Run() - log.Warn(fmt.Sprintf("Task %s completed with error: %v", t.Name(), err)) ti.Complete(err) if err != nil { return err diff --git a/src/pixie_cli/pkg/vizier/logs.go b/src/pixie_cli/pkg/vizier/logs.go index 440b2c094ee..d968848a622 100644 --- a/src/pixie_cli/pkg/vizier/logs.go +++ b/src/pixie_cli/pkg/vizier/logs.go @@ -132,7 +132,12 @@ func (c *LogCollector) CollectPixieLogs(fName string) error { outputCh, err := RunSimpleHealthCheckScript(c.br, c.cloudAddr, clusterID) if err != nil { - log.WithError(err).Warn("failed to run health check script") + entry := log.WithError(err) + if _, ok := err.(*HealthCheckWarning); ok { + entry.Warn("health check script returned warning") + } else { + entry.Warn("failed to run health check script") + } } return c.LogOutputToZipFile(zf, "px_agent_diagnostics.txt", <-outputCh) diff --git a/src/pixie_cli/pkg/vizier/script.go b/src/pixie_cli/pkg/vizier/script.go index 87aa5e1ab3d..92732989379 100644 --- a/src/pixie_cli/pkg/vizier/script.go +++ b/src/pixie_cli/pkg/vizier/script.go @@ -27,7 +27,6 @@ import ( "io" "math" "strings" - "sync" "time" "github.com/gofrs/uuid" @@ -288,8 +287,6 @@ func evaluateHealthCheckResult(output string) error { if err != nil { return err } - log.Warnf("Health check output: %v\n", jsonData) - if v, ok := jsonData[headersInstalledPercColumn]; ok { switch t := v.(type) { case float64: @@ -304,8 +301,15 @@ func evaluateHealthCheckResult(output string) error { return nil } +type healthCheckData struct { + line string + err error +} + +// Runs the health check script on the specified vizier. The script's output is evaluated with +// the evaluateHealthCheckResult function to determine if the cluster is healthy. Only a single +// line of output will be parsed from the script. func runHealthCheckScript(v *Connector, execScript *script.ExecutableScript) (chan string, error) { - output := make(chan string, 1) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -313,10 +317,11 @@ func runHealthCheckScript(v *Connector, execScript *script.ExecutableScript) (ch resp, err := RunScript(ctx, []*Connector{v}, execScript, encOpts) if err != nil { - return output, err + return nil, err } reader, writer := io.Pipe() + defer writer.Close() defer reader.Close() factoryFunc := func(md *vizierpb.ExecuteScriptResponse_MetaData) components.OutputStreamWriter { return components.CreateStreamWriter("json", writer) @@ -324,70 +329,53 @@ func runHealthCheckScript(v *Connector, execScript *script.ExecutableScript) (ch tw := NewStreamOutputAdapterWithFactory(ctx, resp, "json", decOpts, factoryFunc) bufReader := bufio.NewReader(reader) - errCh := make(chan error, 2) - var wg sync.WaitGroup - wg.Add(1) + errCh := make(chan error, 1) + streamCh := make(chan healthCheckData, 1) + outputCh := make(chan string, 1) go func() { - defer wg.Done() - defer writer.Close() - err = tw.WaitForCompletion() - - if err != nil { - log.Warnf("Error on tw.WaitForCompletion: %v", err) - errCh <- err - return + defer close(streamCh) + for { + line, err := bufReader.ReadString('\n') + streamCh <- healthCheckData{line, err} + if err != nil { + return + } } }() - - wg.Add(1) + // Consumes the first line of output from the stream or the error from the context. + // The px/agent_status_diagnostics script only outputs one line, but even in the case + // that the fallback (px/agent_status) is used, a single line informs whether the output + // can be processed properly. go func() { - defer wg.Done() - defer close(output) - var prevLine string + defer close(errCh) + defer close(outputCh) for { select { case <-ctx.Done(): errCh <- ctx.Err() return - default: - if ctx.Err() != nil { - errCh <- ctx.Err() - return - } - line, err := bufReader.ReadString('\n') - if err != nil { - if err == io.EOF { - log.Warn("EOF reached while reading health check script output") - output <- prevLine - errCh <- err - return - } - errCh <- err - return - } - // Capture the last line of output. This ensures - // that the EOF case returns the actual output instead of - // an EOF string. - prevLine = line - err = evaluateHealthCheckResult(line) - if err != nil { - log.Warn("evaluateHealthCheckResult err") - output <- prevLine - errCh <- err - return + case data := <-streamCh: + line := data.line + err := data.err + if err == nil { + err = evaluateHealthCheckResult(line) } + outputCh <- line + errCh <- err + return + } } }() - go func() { - wg.Wait() - close(errCh) - }() + err = tw.WaitForCompletion() + if err != nil { + return outputCh, err + } err = <-errCh - return output, err + return outputCh, err } // RunSimpleHealthCheckScript runs a diagnostic pxl script to verify query serving works. diff --git a/src/pixie_cli/pkg/vizier/stream_adapter.go b/src/pixie_cli/pkg/vizier/stream_adapter.go index b869e302c20..ea2b80b97c5 100644 --- a/src/pixie_cli/pkg/vizier/stream_adapter.go +++ b/src/pixie_cli/pkg/vizier/stream_adapter.go @@ -250,10 +250,8 @@ func (v *StreamOutputAdapter) handleStream(ctx context.Context, stream chan *Exe var err error switch res := msg.Resp.Result.(type) { case *vizierpb.ExecuteScriptResponse_MetaData: - log.Warn("v.handleMetadata") err = v.handleMetadata(ctx, res) case *vizierpb.ExecuteScriptResponse_Data: - log.Warn("v.handleData") err = v.handleData(ctx, res) default: err = fmt.Errorf("unhandled response type" + reflect.TypeOf(msg.Resp.Result).String()) @@ -416,7 +414,6 @@ func (v *StreamOutputAdapter) handleData(ctx context.Context, d *vizierpb.Execut } ti := v.tableNameToInfo[tableName] if err := ti.w.Write(rec); err != nil { - log.WithError(err).Error("Failed to write to table info stream") return err } }