Skip to content

Commit

Permalink
Some comments about the code
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Oct 5, 2023
1 parent 3cf5825 commit 6a6890e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
17 changes: 15 additions & 2 deletions cmd/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ func app(ctx context.Context) error {

doActionsHandler := playground.NewDoActionsHandler(fd.DefaultPluginRegistry, lg)

mux := http.NewServeMux()
mux.Handle("/api/v1/do-actions", doActionsHandler)
mux := router(doActionsHandler)

registry := prometheus.NewRegistry()
srvr := defaultServer(ctx, *flagAddr, promhttp.InstrumentMetricHandler(registry, mux))

lg.Warn("starting to listen", zap.String("addr", *flagAddr), zap.String("debug_addr", *flagDebugAddr))

go func() {
<-ctx.Done()
_ = srvr.Close()
Expand Down Expand Up @@ -100,3 +101,15 @@ func defaultServer(ctx context.Context, addr string, mux http.Handler) *http.Ser
},
}
}

func router(doActions *playground.DoActionsHandler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/do-actions":
doActions.ServeHTTP(w, r)
return
default:
http.Error(w, "", http.StatusNotFound)
}
})
}
16 changes: 10 additions & 6 deletions playground/handler.go → playground/doactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"runtime"
"strings"
"time"

Expand All @@ -27,8 +28,7 @@ import (
var jsoniterConfig = jsoniter.ConfigCompatibleWithStandardLibrary

const (
pipelineCapacity = 1
pipelineProcessors = 1
pipelineCapacity = 1
)

type DoActionsRequest struct {
Expand Down Expand Up @@ -93,6 +93,7 @@ func (h *DoActionsHandler) doActions(ctx context.Context, req DoActionsRequest)
newActions := make([]json.RawMessage, 0, len(req.Actions)*2)

for _, action := range req.Actions {
// trying to get the name of the next action
var actionWithType = struct {
Type string `json:"type"`
}{}
Expand All @@ -101,6 +102,7 @@ func (h *DoActionsHandler) doActions(ctx context.Context, req DoActionsRequest)
}
actionType := jsonEscape(actionWithType.Type)

// wrap plugin with the debug actions
debugActionBefore := json.RawMessage(fmt.Sprintf(`{"type": "debug", "message": "before %s"}`, actionType))
debugActionAfter := json.RawMessage(fmt.Sprintf(`{"type": "debug", "message": "after %s"}`, actionType))
newActions = append(newActions, debugActionBefore, action, debugActionAfter)
Expand All @@ -109,7 +111,7 @@ func (h *DoActionsHandler) doActions(ctx context.Context, req DoActionsRequest)
req.Actions = newActions
}

// stdout buffer
// pipeline stdout buffer
stdoutBuf := new(bytes.Buffer)
stdoutBuf.Grow(1 << 10)
stdout := preparePipelineLogger(stdoutBuf)
Expand All @@ -125,6 +127,7 @@ func (h *DoActionsHandler) doActions(ctx context.Context, req DoActionsRequest)
IsStrict: false,
}, metricsRegistry, stdout)

// callback to collect output events
events := make(chan json.RawMessage, len(req.Events))
outputCb := func(event *pipeline.Event) {
events <- event.Root.EncodeToByte()
Expand All @@ -135,9 +138,10 @@ func (h *DoActionsHandler) doActions(ctx context.Context, req DoActionsRequest)
}

p.Start()
// push events

// push events to the pipeline
for i, event := range req.Events {
p.In(pipeline.SourceID(h.requests.Inc()), "fake", int64(i), event, true)
p.In(pipeline.SourceID(h.requests.Inc()), "fake", int64(i+1), event, true)
}

// collect result
Expand Down Expand Up @@ -186,7 +190,7 @@ func (h *DoActionsHandler) setupPipeline(p *pipeline.Pipeline, req DoActionsRequ
}
values := map[string]int{
"capacity": pipelineCapacity,
"gomaxprocs": pipelineProcessors,
"gomaxprocs": runtime.GOMAXPROCS(0),
}
if err := fd.SetupActions(p, h.plugins, actionsRaw, values); err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions playground/fakeclock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import (
"go.uber.org/zap/zapcore"
)

// ZeroClock reports time since "start".
// Used to print relative time rather than absolute.
type ZeroClock struct {
start time.Time
}

var _ zapcore.Clock = ZeroClock{}

func NewZeroClock(now time.Time) *ZeroClock {
return &ZeroClock{start: now}
}

var _ zapcore.Clock = ZeroClock{}

func (z ZeroClock) Now() time.Time {
diff := time.Since(z.start)
return time.Time{}.Add(diff)
Expand Down

0 comments on commit 6a6890e

Please sign in to comment.