From 3c00c223ad4c9b39c8abad4786ad807039f2b46a Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Fri, 1 Nov 2024 11:14:17 -0400 Subject: [PATCH] initial public http looper implementation (#771) --- canary/looper.go | 163 +++++++------------------- canary/looperOptions.go | 14 --- canary/publicHttpLooper.go | 202 +++++++++++++++++++++++++++++++++ cmd/zrok/testCanaryPeriodic.go | 79 ++++++++++++- 4 files changed, 317 insertions(+), 141 deletions(-) delete mode 100644 canary/looperOptions.go create mode 100644 canary/publicHttpLooper.go diff --git a/canary/looper.go b/canary/looper.go index 6c59b766b..dfc15ecf9 100644 --- a/canary/looper.go +++ b/canary/looper.go @@ -1,132 +1,47 @@ package canary import ( - "bytes" - "github.com/openziti/sdk-golang/ziti" - "github.com/openziti/sdk-golang/ziti/edge" - "github.com/openziti/zrok/environment/env_core" - "github.com/openziti/zrok/sdk/golang/sdk" - "github.com/pkg/errors" + "github.com/openziti/zrok/util" "github.com/sirupsen/logrus" - "io" - "math/rand" - "net/http" "time" ) -type PublicHttpLooper struct { - id int - frontend string - opt *LooperOptions - root env_core.Root - shr *sdk.Share - listener edge.Listener - done chan struct{} -} - -func NewPublicHttpLooper(id int, frontend string, opt *LooperOptions, root env_core.Root) *PublicHttpLooper { - return &PublicHttpLooper{ - id: id, - frontend: frontend, - root: root, - done: make(chan struct{}), - } -} - -func (l *PublicHttpLooper) Run() { - defer close(l.done) - defer logrus.Infof("stopping #%d", l.id) - logrus.Infof("starting #%d", l.id) - - if err := l.startup(); err != nil { - logrus.Fatalf("#%d error starting: %v", l.id, err) - } - - if err := l.bindListener(); err != nil { - logrus.Fatalf("#%d error binding listener: %v", l.id, err) - } - - l.dwell() - - logrus.Infof("completed #%d", l.id) - if err := l.shutdown(); err != nil { - logrus.Fatalf("error shutting down #%d: %v", l.id, err) - } -} - -func (l *PublicHttpLooper) startup() error { - shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ - ShareMode: sdk.PublicShareMode, - BackendMode: sdk.ProxyBackendMode, - Target: "canary.PublicHttpLooper", - Frontends: []string{l.frontend}, - PermissionMode: sdk.ClosedPermissionMode, - }) - if err != nil { - return err - } - l.shr = shr - logrus.Infof("#%d allocated share '%v'", l.id, l.shr) - - return nil -} - -func (l *PublicHttpLooper) bindListener() error { - zif, err := l.root.ZitiIdentityNamed(l.root.EnvironmentIdentityName()) - if err != nil { - return errors.Wrapf(err, "#%d error getting identity", l.id) - } - zcfg, err := ziti.NewConfigFromFile(zif) - if err != nil { - return errors.Wrapf(err, "#%d error loading ziti config", l.id) - } - options := ziti.ListenOptions{ - ConnectTimeout: 5 * time.Minute, - WaitForNEstablishedListeners: 1, - } - zctx, err := ziti.NewContext(zcfg) - if err != nil { - return errors.Wrapf(err, "#%d error creating ziti context", l.id) - } - - if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { - return errors.Wrapf(err, "#%d error binding listener", l.id) - } - - go func() { - if err := http.Serve(l.listener, l); err != nil { - logrus.Errorf("#%d error starting http listener: %v", l.id, err) - } - }() - - return nil -} - -func (l *PublicHttpLooper) ServeHTTP(w http.ResponseWriter, r *http.Request) { - buf := new(bytes.Buffer) - io.Copy(buf, r.Body) - w.Write(buf.Bytes()) -} - -func (l *PublicHttpLooper) dwell() { - dwell := l.opt.MinDwell.Milliseconds() - dwelta := l.opt.MaxDwell.Milliseconds() - l.opt.MinDwell.Milliseconds() - if dwelta > 0 { - dwell = int64(rand.Intn(int(dwelta)) + int(l.opt.MinDwell.Milliseconds())) - } - time.Sleep(time.Duration(dwell) * time.Millisecond) -} - -func (l *PublicHttpLooper) shutdown() error { - if l.listener != nil { - if err := l.listener.Close(); err != nil { - logrus.Errorf("#%d error closing listener: %v", l.id, err) - } - } - - if err := sdk.DeleteShare(l.root, l.shr); err != nil { - return errors.Wrapf(err, "#%d error deleting share", l.id) - } - - return nil +type LooperOptions struct { + Iterations uint + StatusInterval uint + Timeout time.Duration + MinPayload uint64 + MaxPayload uint64 + MinDwell time.Duration + MaxDwell time.Duration + MinPacing time.Duration + MaxPacing time.Duration +} + +type LooperResults struct { + StartTime time.Time + StopTime time.Time + Loops uint + Errors uint + Mismatches uint + Bytes uint64 +} + +func ReportLooperResults(results []*LooperResults) { + totalXfer := uint64(0) + totalErrors := uint(0) + totalMismatches := uint(0) + totalLoops := uint(0) + for i, result := range results { + deltaSeconds := result.StopTime.Sub(result.StartTime).Seconds() + xfer := uint64(float64(result.Bytes) / deltaSeconds) + totalXfer += xfer + totalErrors += result.Errors + totalMismatches += result.Mismatches + xferSec := util.BytesToSize(int64(xfer)) + totalLoops += result.Loops + logrus.Infof("looper #%d: %d loops, %d errors, %d mismatches, %s/sec", i, result.Loops, result.Errors, result.Mismatches, xferSec) + } + totalXferSec := util.BytesToSize(int64(totalXfer)) + logrus.Infof("total: %d loops, %d errors, %d mismatches, %s/sec", totalLoops, totalErrors, totalMismatches, totalXferSec) } diff --git a/canary/looperOptions.go b/canary/looperOptions.go deleted file mode 100644 index 7d47aecc0..000000000 --- a/canary/looperOptions.go +++ /dev/null @@ -1,14 +0,0 @@ -package canary - -import "time" - -type LooperOptions struct { - Iterations uint - Timeout time.Duration - MinPayload uint64 - MaxPayload uint64 - MinDwell time.Duration - MaxDwell time.Duration - MinPacing time.Duration - MaxPacing time.Duration -} diff --git a/canary/publicHttpLooper.go b/canary/publicHttpLooper.go new file mode 100644 index 000000000..e7ef4e8f2 --- /dev/null +++ b/canary/publicHttpLooper.go @@ -0,0 +1,202 @@ +package canary + +import ( + "bytes" + cryptorand "crypto/rand" + "encoding/base64" + "github.com/openziti/sdk-golang/ziti" + "github.com/openziti/sdk-golang/ziti/edge" + "github.com/openziti/zrok/environment/env_core" + "github.com/openziti/zrok/sdk/golang/sdk" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "io" + "math/rand" + "net/http" + "time" +) + +type PublicHttpLooper struct { + id uint + frontend string + opt *LooperOptions + root env_core.Root + shr *sdk.Share + listener edge.Listener + abort bool + done chan struct{} + results *LooperResults +} + +func NewPublicHttpLooper(id uint, frontend string, opt *LooperOptions, root env_core.Root) *PublicHttpLooper { + return &PublicHttpLooper{ + id: id, + frontend: frontend, + root: root, + done: make(chan struct{}), + results: &LooperResults{}, + } +} + +func (l *PublicHttpLooper) Run() { + defer close(l.done) + defer logrus.Infof("stopping #%d", l.id) + logrus.Infof("starting #%d", l.id) + + if err := l.startup(); err != nil { + logrus.Fatalf("#%d error starting: %v", l.id, err) + } + + if err := l.bindListener(); err != nil { + logrus.Fatalf("#%d error binding listener: %v", l.id, err) + } + + l.dwell() + + logrus.Infof("completed #%d", l.id) + if err := l.shutdown(); err != nil { + logrus.Fatalf("error shutting down #%d: %v", l.id, err) + } +} + +func (l *PublicHttpLooper) Abort() { + l.abort = true +} + +func (l *PublicHttpLooper) Done() <-chan struct{} { + return l.done +} + +func (l *PublicHttpLooper) Results() *LooperResults { + return l.results +} + +func (l *PublicHttpLooper) startup() error { + shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{ + ShareMode: sdk.PublicShareMode, + BackendMode: sdk.ProxyBackendMode, + Target: "canary.PublicHttpLooper", + Frontends: []string{l.frontend}, + PermissionMode: sdk.ClosedPermissionMode, + }) + if err != nil { + return err + } + l.shr = shr + logrus.Infof("#%d allocated share '%v'", l.id, l.shr) + + return nil +} + +func (l *PublicHttpLooper) bindListener() error { + zif, err := l.root.ZitiIdentityNamed(l.root.EnvironmentIdentityName()) + if err != nil { + return errors.Wrapf(err, "#%d error getting identity", l.id) + } + zcfg, err := ziti.NewConfigFromFile(zif) + if err != nil { + return errors.Wrapf(err, "#%d error loading ziti config", l.id) + } + options := ziti.ListenOptions{ + ConnectTimeout: 5 * time.Minute, + WaitForNEstablishedListeners: 1, + } + zctx, err := ziti.NewContext(zcfg) + if err != nil { + return errors.Wrapf(err, "#%d error creating ziti context", l.id) + } + + if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil { + return errors.Wrapf(err, "#%d error binding listener", l.id) + } + + go func() { + if err := http.Serve(l.listener, l); err != nil { + logrus.Errorf("#%d error starting http listener: %v", l.id, err) + } + }() + + return nil +} + +func (l *PublicHttpLooper) ServeHTTP(w http.ResponseWriter, r *http.Request) { + buf := new(bytes.Buffer) + io.Copy(buf, r.Body) + w.Write(buf.Bytes()) +} + +func (l *PublicHttpLooper) dwell() { + dwell := l.opt.MinDwell.Milliseconds() + dwelta := l.opt.MaxDwell.Milliseconds() - l.opt.MinDwell.Milliseconds() + if dwelta > 0 { + dwell = int64(rand.Intn(int(dwelta)) + int(l.opt.MinDwell.Milliseconds())) + } + time.Sleep(time.Duration(dwell) * time.Millisecond) +} + +func (l *PublicHttpLooper) iterate() { + l.results.StartTime = time.Now() + defer func() { l.results.StopTime = time.Now() }() + + for i := uint(0); i < l.opt.Iterations && !l.abort; i++ { + if i > 0 && i%l.opt.StatusInterval == 0 { + logrus.Infof("#%d: iteration %d", l.id, i) + } + + payloadSize := l.opt.MaxPayload + if l.opt.MaxPayload-l.opt.MinPayload > 0 { + payloadSize = rand.Uint64() % (l.opt.MaxPayload - l.opt.MinPayload) + } + outPayload := make([]byte, payloadSize) + cryptorand.Read(outPayload) + outBase64 := base64.StdEncoding.EncodeToString(outPayload) + + if req, err := http.NewRequest("POST", l.shr.FrontendEndpoints[0], bytes.NewBufferString(outBase64)); err == nil { + client := &http.Client{Timeout: l.opt.Timeout} + if resp, err := client.Do(req); err == nil { + if resp.StatusCode != 200 { + logrus.Errorf("#%d: unexpected status code: %v", l.id, resp.StatusCode) + l.results.Errors++ + } + inPayload := new(bytes.Buffer) + io.Copy(inPayload, resp.Body) + inBase64 := inPayload.String() + if inBase64 != outBase64 { + logrus.Errorf("#%d: payload mismatch", l.id) + l.results.Mismatches++ + } else { + l.results.Bytes += uint64(len(outBase64)) + logrus.Debugf("#%d: payload match") + } + } else { + logrus.Errorf("#%d: error: %v", l.id, err) + l.results.Errors++ + } + } else { + logrus.Errorf("#%d: error creating request: %v", l.id, err) + l.results.Errors++ + } + + pacingMs := l.opt.MaxPacing.Milliseconds() + pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds() + if pacingDelta > 0 { + pacingMs = rand.Int63() % pacingDelta + time.Sleep(time.Duration(pacingMs) * time.Millisecond) + } + l.results.Loops++ + } +} + +func (l *PublicHttpLooper) shutdown() error { + if l.listener != nil { + if err := l.listener.Close(); err != nil { + logrus.Errorf("#%d error closing listener: %v", l.id, err) + } + } + + if err := sdk.DeleteShare(l.root, l.shr); err != nil { + return errors.Wrapf(err, "#%d error deleting share", l.id) + } + + return nil +} diff --git a/cmd/zrok/testCanaryPeriodic.go b/cmd/zrok/testCanaryPeriodic.go index 428df8477..3e8354de7 100644 --- a/cmd/zrok/testCanaryPeriodic.go +++ b/cmd/zrok/testCanaryPeriodic.go @@ -1,8 +1,14 @@ package main import ( + "github.com/openziti/zrok/canary" + "github.com/openziti/zrok/environment" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" + "time" ) func init() { @@ -10,7 +16,18 @@ func init() { } type testCanaryPeriodicCommand struct { - cmd *cobra.Command + cmd *cobra.Command + loopers uint + iterations uint + statusInterval uint + timeout time.Duration + minPayload uint64 + maxPayload uint64 + minDwell time.Duration + maxDwell time.Duration + minPacing time.Duration + maxPacing time.Duration + frontendSelection string } func newTestCanaryPeriodicCommand() *testCanaryPeriodicCommand { @@ -21,9 +38,65 @@ func newTestCanaryPeriodicCommand() *testCanaryPeriodicCommand { } command := &testCanaryPeriodicCommand{cmd: cmd} cmd.Run = command.run + cmd.Flags().UintVarP(&command.loopers, "loopers", "l", 1, "Number of concurrent loopers to start") + cmd.Flags().UintVarP(&command.iterations, "iterations", "i", 1, "Number of iterations") + cmd.Flags().UintVarP(&command.statusInterval, "status-interval", "S", 100, "Show status every # iterations") + cmd.Flags().DurationVarP(&command.timeout, "timeout", "T", 30*time.Second, "Timeout when sending HTTP requests") + cmd.Flags().Uint64Var(&command.minPayload, "min-payload", 64, "Minimum payload size in bytes") + cmd.Flags().Uint64Var(&command.maxPayload, "max-payload", 10240, "Maximum payload size in bytes") + cmd.Flags().DurationVar(&command.minDwell, "min-dwell", 1*time.Second, "Minimum dwell time") + cmd.Flags().DurationVar(&command.maxDwell, "max-dwell", 1*time.Second, "Maximum dwell time") + cmd.Flags().DurationVar(&command.minPacing, "min-pacing", 0, "Minimum pacing time") + cmd.Flags().DurationVar(&command.maxPacing, "max-pacing", 0, "Maximum pacing time") + cmd.Flags().StringVar(&command.frontendSelection, "frontend-selection", "public", "Select frontend selection") return command } -func (c *testCanaryPeriodicCommand) run(_ *cobra.Command, _ []string) { - logrus.Info("periodic") +func (cmd *testCanaryPeriodicCommand) run(_ *cobra.Command, _ []string) { + root, err := environment.LoadRoot() + if err != nil { + panic(err) + } + + if !root.IsEnabled() { + logrus.Fatal("unable to load environment; did you 'zrok enable'?") + } + + var loopers []*canary.PublicHttpLooper + for i := uint(0); i < cmd.loopers; i++ { + looperOpts := &canary.LooperOptions{ + Iterations: cmd.iterations, + StatusInterval: cmd.statusInterval, + Timeout: cmd.timeout, + MinPayload: cmd.minPayload, + MaxPayload: cmd.maxPayload, + MinDwell: cmd.minDwell, + MaxDwell: cmd.maxDwell, + MinPacing: cmd.minPacing, + MaxPacing: cmd.maxPacing, + } + looper := canary.NewPublicHttpLooper(i, cmd.frontendSelection, looperOpts, root) + loopers = append(loopers, looper) + go looper.Run() + } + + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + for _, looper := range loopers { + looper.Abort() + } + }() + for _, l := range loopers { + <-l.Done() + } + + results := make([]*canary.LooperResults, 0) + for i := uint(0); i < cmd.loopers; i++ { + results = append(results, loopers[i].Results()) + } + canary.ReportLooperResults(results) + + os.Exit(0) }