Skip to content

Commit

Permalink
infinite stream json
Browse files Browse the repository at this point in the history
  • Loading branch information
dudo committed Nov 29, 2023
1 parent aea7b5d commit 69e853a
Showing 1 changed file with 41 additions and 20 deletions.
61 changes: 41 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package main

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"

"px.dev/pxapi"
Expand Down Expand Up @@ -40,39 +42,59 @@ func main() {
}
pxl := string(content)

// Execute the PxL script.
// Execute the PxL script and get the resultSet
resultSet, err := vz.ExecuteScript(ctx, pxl, tm)
if err != nil && err != io.EOF {
if err != nil {
panic(err)
}

// Receive the PxL script results.
defer resultSet.Close()
if err := resultSet.Stream(); err != nil {
if errdefs.IsCompilationError(err) {
fmt.Printf("Got compiler error: \n %s\n", err.Error())
} else {
fmt.Printf("Got error : %+v, while streaming\n", err)

// Loop to receive the PxL script results.
for {
err := resultSet.Stream()
if err != nil {
if err == io.EOF {
// End of stream
break
}
if errdefs.IsCompilationError(err) {
log.Printf("Error compiling stream: %s", err.Error())
break
}
// Handle other kinds of runtime errors

log.Printf("Error streaming: %+v", err)
}
}

// Get the execution stats for the script execution.
stats := resultSet.Stats()
fmt.Printf("Execution Time: %v\n", stats.ExecutionTime)
fmt.Printf("Bytes received: %v\n", stats.TotalBytes)
}

// Satisfies the TableRecordHandler interface.
type tablePrinter struct{}
type tablePrinter struct {
columnNames []string // A slice of strings to hold column names
}

func (t *tablePrinter) HandleInit(ctx context.Context, metadata types.TableMetadata) error {
// Store column names in order
for _, col := range metadata.ColInfo {
t.columnNames = append(t.columnNames, col.Name)
}
return nil
}

func (t *tablePrinter) HandleRecord(ctx context.Context, r *types.Record) error {
for _, d := range r.Data {
fmt.Printf("%s ", d.String())
recordMap := make(map[string]interface{})

for i, d := range r.Data {
recordMap[t.columnNames[i]] = d
}
fmt.Printf("\n")

jsonRecord, err := json.Marshal(recordMap)
if err != nil {
log.Printf("Error marshaling record to JSON: %s", err)
return err
}

fmt.Println(string(jsonRecord))
return nil
}

Expand All @@ -81,8 +103,7 @@ func (t *tablePrinter) HandleDone(ctx context.Context) error {
}

// Satisfies the TableMuxer interface.
type tableMux struct {
}
type tableMux struct{}

func (s *tableMux) AcceptTable(ctx context.Context, metadata types.TableMetadata) (pxapi.TableRecordHandler, error) {
return &tablePrinter{}, nil
Expand Down

0 comments on commit 69e853a

Please sign in to comment.