diff --git a/canary/looper.go b/canary/looper.go index dfc15ecf9..81d9ad63a 100644 --- a/canary/looper.go +++ b/canary/looper.go @@ -28,20 +28,20 @@ type LooperResults struct { } func ReportLooperResults(results []*LooperResults) { - totalXfer := uint64(0) + totalBytes := uint64(0) + totalXferRate := uint64(0) totalErrors := uint(0) totalMismatches := uint(0) totalLoops := uint(0) for i, result := range results { + totalBytes += result.Bytes deltaSeconds := result.StopTime.Sub(result.StartTime).Seconds() - xfer := uint64(float64(result.Bytes) / deltaSeconds) - totalXfer += xfer + xferRate := uint64(float64(result.Bytes) / deltaSeconds) + totalXferRate += xferRate totalErrors += result.Errors totalMismatches += result.Mismatches - xferSec := util.BytesToSize(int64(xfer)) totalLoops += result.Loops - logrus.Infof("looper #%d: %d loops, %d errors, %d mismatches, %s/sec", i, result.Loops, result.Errors, result.Mismatches, xferSec) + logrus.Infof("looper #%d: %d loops, %v, %d errors, %d mismatches, %s/sec", i, result.Loops, util.BytesToSize(int64(result.Bytes)), result.Errors, result.Mismatches, util.BytesToSize(int64(xferRate))) } - totalXferSec := util.BytesToSize(int64(totalXfer)) - logrus.Infof("total: %d loops, %d errors, %d mismatches, %s/sec", totalLoops, totalErrors, totalMismatches, totalXferSec) + logrus.Infof("total: %d loops, %v, %d errors, %d mismatches, %s/sec", totalLoops, util.BytesToSize(int64(totalBytes)), totalErrors, totalMismatches, util.BytesToSize(int64(totalXferRate))) } diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go index 9cc929d1a..1fa520e5f 100644 --- a/canary/publicHttpLooper.go +++ b/canary/publicHttpLooper.go @@ -42,12 +42,6 @@ func NewPublicHttpLooper(id uint, frontend string, opt *LooperOptions, root env_ func (l *PublicHttpLooper) Run() { defer close(l.done) defer logrus.Infof("#%d stopping", l.id) - defer func() { - if r := recover(); r != nil { - logrus.Errorf("#%d: %v", l.id, r) - panic(r) - } - }() logrus.Infof("#%d starting", l.id) if err := l.startup(); err != nil { @@ -122,7 +116,7 @@ func (l *PublicHttpLooper) bindListener() error { go func() { if err := http.Serve(l.listener, l); err != nil { - logrus.Errorf("#%d error starting http listener: %v", l.id, err) + logrus.Errorf("#%d error in http listener: %v", l.id, err) } }() diff --git a/cmd/zrok/main.go b/cmd/zrok/main.go index 5b2c85522..edf8a9db8 100644 --- a/cmd/zrok/main.go +++ b/cmd/zrok/main.go @@ -25,7 +25,6 @@ func init() { adminCmd.AddCommand(adminListCmd) adminCmd.AddCommand(adminUpdateCmd) testCmd.AddCommand(testCanaryCmd) - testCmd.AddCommand(testLoopCmd) rootCmd.AddCommand(adminCmd) rootCmd.AddCommand(configCmd) rootCmd.AddCommand(modifyCmd) @@ -104,12 +103,6 @@ var testCanaryCmd = &cobra.Command{ Short: "Utilities for performance management", } -var testLoopCmd = &cobra.Command{ - Use: "loopback", - Aliases: []string{"loop"}, - Short: "Loopback testing utilities", -} - func main() { if err := rootCmd.Execute(); err != nil { if panicInstead { diff --git a/cmd/zrok/testCanaryPeriodic.go b/cmd/zrok/testCanaryPeriodic.go index 3e8354de7..0e2fcc035 100644 --- a/cmd/zrok/testCanaryPeriodic.go +++ b/cmd/zrok/testCanaryPeriodic.go @@ -88,6 +88,7 @@ func (cmd *testCanaryPeriodicCommand) run(_ *cobra.Command, _ []string) { looper.Abort() } }() + for _, l := range loopers { <-l.Done() } diff --git a/cmd/zrok/testLoopPublic.go b/cmd/zrok/testLoopPublic.go deleted file mode 100644 index 0248c064a..000000000 --- a/cmd/zrok/testLoopPublic.go +++ /dev/null @@ -1,292 +0,0 @@ -package main - -import ( - "bytes" - "encoding/base64" - "fmt" - "github.com/go-openapi/runtime" - httptransport "github.com/go-openapi/runtime/client" - "github.com/openziti/sdk-golang/ziti" - "github.com/openziti/sdk-golang/ziti/edge" - "github.com/openziti/zrok/environment" - "github.com/openziti/zrok/environment/env_core" - "github.com/openziti/zrok/rest_client_zrok" - "github.com/openziti/zrok/rest_client_zrok/share" - "github.com/openziti/zrok/rest_model_zrok" - "github.com/openziti/zrok/sdk/golang/sdk" - "github.com/openziti/zrok/tui" - "github.com/openziti/zrok/util" - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "io" - "math/rand" - "net/http" - "os" - "os/signal" - "syscall" - "time" -) - -func init() { - testLoopCmd.AddCommand(newTestLoopPublicCommand().cmd) -} - -type testLoopPublicCommand struct { - cmd *cobra.Command - loopers int - iterations int - statusEvery int - timeoutSeconds int - minPayload int - maxPayload int - minDwellMs int - maxDwellMs int - minPacingMs int - maxPacingMs int - frontendSelection []string -} - -func newTestLoopPublicCommand() *testLoopPublicCommand { - cmd := &cobra.Command{ - Use: "public", - Short: "Start a loop agent testing public proxy shares", - Args: cobra.NoArgs, - } - command := &testLoopPublicCommand{cmd: cmd} - cmd.Run = command.run - cmd.Flags().IntVarP(&command.loopers, "loopers", "l", 1, "Number of current loopers to start") - cmd.Flags().IntVarP(&command.iterations, "iterations", "i", 1, "Number of iterations per looper") - cmd.Flags().IntVarP(&command.statusEvery, "status-every", "E", 100, "Show status every # iterations") - cmd.Flags().IntVarP(&command.timeoutSeconds, "timeout-seconds", "T", 30, "Time out after # seconds when sending http requests") - cmd.Flags().IntVar(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") - cmd.Flags().IntVar(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") - cmd.Flags().IntVar(&command.minDwellMs, "min-dwell-ms", 1000, "Minimum dwell time in milliseconds") - cmd.Flags().IntVar(&command.maxDwellMs, "max-dwell-ms", 1000, "Maximum dwell time in milliseconds") - cmd.Flags().IntVar(&command.minPacingMs, "min-pacing-ms", 0, "Minimum pacing in milliseconds") - cmd.Flags().IntVar(&command.maxPacingMs, "max-pacing-ms", 0, "Maximum pacing in milliseconds") - cmd.Flags().StringArrayVar(&command.frontendSelection, "frontends", []string{"public"}, "Selected frontends to use for the share") - return command -} - -func (cmd *testLoopPublicCommand) run(_ *cobra.Command, _ []string) { - var loopers []*looper - for i := 0; i < cmd.loopers; i++ { - l := newLooper(i, cmd) - loopers = append(loopers, l) - go l.run() - } - c := make(chan os.Signal) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - for _, looper := range loopers { - looper.stop = true - } - }() - for _, l := range loopers { - <-l.done - } - totalMismatches := 0 - totalXfer := int64(0) - totalLoops := int64(0) - for _, l := range loopers { - deltaSeconds := l.stopTime.Sub(l.startTime).Seconds() - xfer := int64(float64(l.bytes) / deltaSeconds) - totalXfer += xfer - totalMismatches += l.mismatches - xferSec := util.BytesToSize(xfer) - totalLoops += l.loops - logrus.Infof("looper #%d: %d loops, %d mismatches, %s/sec", l.id, l.loops, l.mismatches, xferSec) - } - totalXferSec := util.BytesToSize(totalXfer) - logrus.Infof("total: %d loops, %d mismatches, %s/sec", totalLoops, totalMismatches, totalXferSec) - os.Exit(0) -} - -type looper struct { - id int - cmd *testLoopPublicCommand - env *env_core.Environment - done chan struct{} - listener edge.Listener - zif string - zrok *rest_client_zrok.Zrok - shrToken string - proxyEndpoint string - auth runtime.ClientAuthInfoWriter - mismatches int - bytes int64 - loops int64 - startTime time.Time - stopTime time.Time - stop bool -} - -func newLooper(id int, cmd *testLoopPublicCommand) *looper { - return &looper{ - id: id, - cmd: cmd, - done: make(chan struct{}), - } -} - -func (l *looper) run() { - defer close(l.done) - defer logrus.Infof("stopping #%d", l.id) - - l.startup() - logrus.Infof("looper #%d, shrToken: %v, frontend: %v", l.id, l.shrToken, l.proxyEndpoint) - if l.serviceListener() { - l.dwell() - l.iterate() - } - logrus.Infof("looper #%d: complete", l.id) - l.shutdown() -} - -func (l *looper) serviceListener() bool { - zcfg, err := ziti.NewConfigFromFile(l.zif) - if err != nil { - logrus.Errorf("error opening ziti config '%v': %v", l.zif, err) - return false - } - options := ziti.ListenOptions{ - ConnectTimeout: 5 * time.Minute, - WaitForNEstablishedListeners: 1, - } - zctx, err := ziti.NewContext(zcfg) - if err != nil { - logrus.Errorf("error loading ziti context: %v", err) - return false - } - - if l.listener, err = zctx.ListenWithOptions(l.shrToken, &options); err != nil { - logrus.Errorf("looper #%d, error listening: %v", l.id, err) - return false - } - - go func() { - if err := http.Serve(l.listener, l); err != nil { - logrus.Errorf("looper #%d, error serving: %v", l.id, err) - } - }() - - return true -} - -func (l *looper) ServeHTTP(w http.ResponseWriter, r *http.Request) { - buf := new(bytes.Buffer) - io.Copy(buf, r.Body) - w.Write(buf.Bytes()) -} - -func (l *looper) startup() { - logrus.Infof("starting #%d", l.id) - - env, err := environment.LoadRoot() - if err != nil { - panic(err) - } - - if !env.IsEnabled() { - tui.Error("unable to load environment; did you 'zrok enable'?", nil) - } - l.env = env.Environment() - - l.zif, err = env.ZitiIdentityNamed(env.EnvironmentIdentityName()) - if err != nil { - panic(err) - } - l.zrok, err = env.Client() - if err != nil { - panic(err) - } - l.auth = httptransport.APIKeyAuth("x-token", "header", l.env.Token) - tunnelReq := share.NewShareParams() - tunnelReq.Body = &rest_model_zrok.ShareRequest{ - EnvZID: l.env.ZitiIdentity, - ShareMode: string(sdk.PublicShareMode), - FrontendSelection: l.cmd.frontendSelection, - BackendMode: string(sdk.ProxyBackendMode), - BackendProxyEndpoint: fmt.Sprintf("looper#%d", l.id), - AuthScheme: string(sdk.None), - } - tunnelReq.SetTimeout(60 * time.Second) - tunnelResp, err := l.zrok.Share.Share(tunnelReq, l.auth) - if err != nil { - panic(err) - } - l.shrToken = tunnelResp.Payload.ShrToken - l.proxyEndpoint = tunnelResp.Payload.FrontendProxyEndpoints[0] -} - -func (l *looper) dwell() { - dwell := l.cmd.minDwellMs - if l.cmd.maxDwellMs-l.cmd.minDwellMs > 0 { - dwell = rand.Intn(l.cmd.maxDwellMs-l.cmd.minDwellMs) + l.cmd.minDwellMs - } - time.Sleep(time.Duration(dwell) * time.Millisecond) -} - -func (l *looper) iterate() { - l.startTime = time.Now() - defer func() { l.stopTime = time.Now() }() - - for i := 0; i < l.cmd.iterations && !l.stop; i++ { - if i > 0 && i%l.cmd.statusEvery == 0 { - logrus.Infof("looper #%d: iteration #%d", l.id, i) - } - sz := l.cmd.maxPayload - if l.cmd.maxPayload-l.cmd.minPayload > 0 { - sz = rand.Intn(l.cmd.maxPayload-l.cmd.minPayload) + l.cmd.minPayload - } - outpayload := make([]byte, sz) - outbase64 := base64.StdEncoding.EncodeToString(outpayload) - rand.Read(outpayload) - if req, err := http.NewRequest("POST", l.proxyEndpoint, bytes.NewBufferString(outbase64)); err == nil { - client := &http.Client{Timeout: time.Second * time.Duration(l.cmd.timeoutSeconds)} - if resp, err := client.Do(req); err == nil { - if resp.StatusCode != 200 { - logrus.Errorf("looper #%d unexpected response status code %v!", l.id, resp.StatusCode) - } - inpayload := new(bytes.Buffer) - io.Copy(inpayload, resp.Body) - inbase64 := inpayload.String() - if inbase64 != outbase64 { - logrus.Errorf("looper #%d payload mismatch!", l.id) - l.mismatches++ - } else { - l.bytes += int64(len(outbase64)) - logrus.Debugf("looper #%d payload match", l.id) - } - } else { - logrus.Errorf("looper #%d error: %v", l.id, err) - } - } else { - logrus.Errorf("looper #%d error creating request: %v", l.id, err) - } - pacingMs := l.cmd.maxPayload - if l.cmd.maxPacingMs-l.cmd.minPacingMs > 0 { - pacingMs = rand.Intn(l.cmd.maxPacingMs-l.cmd.minPacingMs) + l.cmd.minPacingMs - time.Sleep(time.Duration(pacingMs) * time.Millisecond) - } - l.loops++ - } -} - -func (l *looper) shutdown() { - if l.listener != nil { - if err := l.listener.Close(); err != nil { - logrus.Errorf("looper #%d error closing listener: %v", l.id, err) - } - } - - untunnelReq := share.NewUnshareParams() - untunnelReq.Body = &rest_model_zrok.UnshareRequest{ - EnvZID: l.env.ZitiIdentity, - ShrToken: l.shrToken, - } - if _, err := l.zrok.Share.Unshare(untunnelReq, l.auth); err != nil { - logrus.Errorf("error shutting down looper #%d: %v", l.id, err) - } -} diff --git a/cmd/zrok/testWebsocket.go b/cmd/zrok/testWebsocket.go deleted file mode 100644 index 0dd3e6b53..000000000 --- a/cmd/zrok/testWebsocket.go +++ /dev/null @@ -1,121 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "net" - "net/http" - "os" - "strings" - "time" - - "github.com/openziti/sdk-golang/ziti" - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "nhooyr.io/websocket" - "nhooyr.io/websocket/wsjson" -) - -func init() { - testCmd.AddCommand(newTestWebsocketCommand().cmd) -} - -type testWebsocketCommand struct { - cmd *cobra.Command - - identityJsonFile string - serviceName string - enableZiti bool -} - -func newTestWebsocketCommand() *testWebsocketCommand { - cmd := &cobra.Command{ - Use: "websocket", - Args: cobra.RangeArgs(0, 1), - } - - command := &testWebsocketCommand{cmd: cmd} - - cmd.Flags().BoolVar(&command.enableZiti, "ziti", false, "Enable the usage of a ziti network") - cmd.Flags().StringVar(&command.identityJsonFile, "ziti-identity", "", "Path to Ziti Identity json file") - cmd.Flags().StringVar(&command.serviceName, "ziti-name", "", "Name of the Ziti Service") - - cmd.Run = command.run - return command -} - -func (cmd *testWebsocketCommand) run(_ *cobra.Command, args []string) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*6) - defer cancel() - opts := &websocket.DialOptions{} - var addr string - if cmd.enableZiti { - identityJsonBytes, err := os.ReadFile(cmd.identityJsonFile) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: failed to read identity config JSON from file %s: %s\n", cmd.identityJsonFile, err) - os.Exit(1) - } - if len(identityJsonBytes) == 0 { - fmt.Fprintf(os.Stderr, "Error: When running a ziti enabled service must have ziti identity provided\n\n") - flag.Usage() - os.Exit(1) - } - - cfg := &ziti.Config{} - err = json.Unmarshal(identityJsonBytes, cfg) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to load ziti configuration JSON: %v", err) - os.Exit(1) - } - zitiContext, err := ziti.NewContext(cfg) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to load ziti context: %v", err) - os.Exit(1) - } - dial := func(_ context.Context, _, addr string) (net.Conn, error) { - service := strings.Split(addr, ":")[0] - return zitiContext.DialWithOptions(service, &ziti.DialOptions{ConnectTimeout: 30 * time.Second}) - } - - zitiTransport := http.DefaultTransport.(*http.Transport).Clone() - zitiTransport.DialContext = dial - - opts.HTTPClient = &http.Client{Transport: zitiTransport} - - addr = cmd.serviceName - } else { - if len(args) == 0 { - logrus.Error("address required if not using ziti") - flag.Usage() - os.Exit(1) - } - addr = args[0] - } - - logrus.Info(fmt.Sprintf("http://%s/echo", addr)) - c, _, err := websocket.Dial(ctx, fmt.Sprintf("http://%s/echo", addr), opts) - if err != nil { - logrus.Error(err) - return - } - defer c.Close(websocket.StatusInternalError, "the sky is falling") - - logrus.Info("writing to server...") - err = wsjson.Write(ctx, c, "hi") - if err != nil { - logrus.Error(err) - return - } - logrus.Info("reading response...") - typ, dat, err := c.Read(ctx) - if err != nil { - logrus.Error(err) - return - } - logrus.Info(typ) - logrus.Info(string(dat)) - - c.Close(websocket.StatusNormalClosure, "") -}