From 69e853a5770da2f1f902c36c5295545928a07fcf Mon Sep 17 00:00:00 2001 From: "Brett C. Dudo" Date: Tue, 28 Nov 2023 20:02:47 -0800 Subject: [PATCH] infinite stream json --- main.go | 61 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/main.go b/main.go index ff7b0f4..deaea8d 100644 --- a/main.go +++ b/main.go @@ -2,8 +2,10 @@ package main import ( "context" + "encoding/json" "fmt" "io" + "log" "os" "px.dev/pxapi" @@ -40,39 +42,59 @@ func main() { } pxl := string(content) - // Execute the PxL script. + // Execute the PxL script and get the resultSet resultSet, err := vz.ExecuteScript(ctx, pxl, tm) - if err != nil && err != io.EOF { + if err != nil { panic(err) } - - // Receive the PxL script results. defer resultSet.Close() - if err := resultSet.Stream(); err != nil { - if errdefs.IsCompilationError(err) { - fmt.Printf("Got compiler error: \n %s\n", err.Error()) - } else { - fmt.Printf("Got error : %+v, while streaming\n", err) + + // Loop to receive the PxL script results. + for { + err := resultSet.Stream() + if err != nil { + if err == io.EOF { + // End of stream + break + } + if errdefs.IsCompilationError(err) { + log.Printf("Error compiling stream: %s", err.Error()) + break + } + // Handle other kinds of runtime errors + + log.Printf("Error streaming: %+v", err) } } - - // Get the execution stats for the script execution. - stats := resultSet.Stats() - fmt.Printf("Execution Time: %v\n", stats.ExecutionTime) - fmt.Printf("Bytes received: %v\n", stats.TotalBytes) } // Satisfies the TableRecordHandler interface. -type tablePrinter struct{} +type tablePrinter struct { + columnNames []string // A slice of strings to hold column names +} func (t *tablePrinter) HandleInit(ctx context.Context, metadata types.TableMetadata) error { + // Store column names in order + for _, col := range metadata.ColInfo { + t.columnNames = append(t.columnNames, col.Name) + } return nil } + func (t *tablePrinter) HandleRecord(ctx context.Context, r *types.Record) error { - for _, d := range r.Data { - fmt.Printf("%s ", d.String()) + recordMap := make(map[string]interface{}) + + for i, d := range r.Data { + recordMap[t.columnNames[i]] = d } - fmt.Printf("\n") + + jsonRecord, err := json.Marshal(recordMap) + if err != nil { + log.Printf("Error marshaling record to JSON: %s", err) + return err + } + + fmt.Println(string(jsonRecord)) return nil } @@ -81,8 +103,7 @@ func (t *tablePrinter) HandleDone(ctx context.Context) error { } // Satisfies the TableMuxer interface. -type tableMux struct { -} +type tableMux struct{} func (s *tableMux) AcceptTable(ctx context.Context, metadata types.TableMetadata) (pxapi.TableRecordHandler, error) { return &tablePrinter{}, nil