From 9af203e7a6582670e2a243524e9eae39a7413278 Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Thu, 2 Jun 2022 11:54:04 -0400 Subject: [PATCH 1/4] Introduce finite test runs, repeatability, reporting, and fine-tuned controls --- cmd/basic/main.go | 382 +++++++++++++++++++++---------------- cmd/basic/report.go | 43 +++++ cmd/server/loader_basic.go | 46 +++-- cmd/server/main.go | 6 +- protocol.go | 118 +++++++----- protocol_test.go | 6 +- support.go | 5 +- 7 files changed, 374 insertions(+), 232 deletions(-) create mode 100644 cmd/basic/report.go diff --git a/cmd/basic/main.go b/cmd/basic/main.go index 2f25d5b..0fcaa74 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -1,14 +1,23 @@ package main import ( + "bytes" + "context" "flag" "fmt" - pcgr "github.com/dgryski/go-pcgr" - mct "github.com/dormando/mctester" + "math/big" "math/rand" "os" "runtime/pprof" "time" + + "github.com/dgryski/go-pcgr" + "github.com/jamiealquiza/tachymeter" + mct "github.com/memcached/mctester" + + "golang.org/x/sync/errgroup" + + "go.uber.org/ratelimit" ) var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file") @@ -17,57 +26,52 @@ var memprofile = flag.String("memprofile", "", "dump memory profile") func main() { fmt.Println("starting") + clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss") connCount := flag.Int("conncount", 1, "number of client connections to establish") - reqPerSleep := flag.Int("reqpersleep", 1, "number of requests to issue when client wakes up") - reqBundlePerConn := flag.Int("reqbundles", 1, "number of times to wake up and send requests before disconnecting (-1 for unlimited)") - sleepPerBundle := flag.Duration("sleepperbundle", time.Millisecond*1, "time to sleep between request bundles (accepts Ns, Nms, etc)") - deletePercent := flag.Int("deletepercent", 0, "percentage of queries to issue as deletes instead of gets (0-1000)") + duration := flag.Duration("duration", 0, "length of time that the test will run (0 for unlimited)") + keyLength := flag.Int("keylength", 10, "number of random characters to append to key") keyPrefix := flag.String("keyprefix", "mctester:", "prefix to append to all generated keys") keySpace := flag.Int("keyspace", 1000, "number of unique keys to generate") - keyLength := flag.Int("keylength", 10, "number of random characters to append to key") + pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) number of GET requests to stack within the same syscall") + delRatio := flag.Int("ratiodel", 0, "proportion of requests that should be sent as `deletes`") + getRatio := flag.Int("ratioget", 90, "proportion of requests that should be sent as `gets`") + setRatio := flag.Int("ratioset", 10, "proportion of requests that should be sent as `sets`") + rngSeed := flag.Int64("rngseed", time.Now().UnixNano(), "seed value used when initializing RNG") + rps := flag.Int("rps", 0, "target number of requests per second (0 for unlimited)") + server := flag.String("server", "127.0.0.1:11211", "`ip:port` for Memcached instance under test") + socket := flag.String("socket", "", "domain socket used for connections") + stripKeyPrefix := flag.Bool("stripkeyprefix", false, "remove key prefix before comparing with response") keyTTL := flag.Uint("ttl", 180, "TTL to set with new items") + valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") + warmPercent := flag.Int("warm", 90, "percent of keys to `set` in Memcached before testing begins") useZipf := flag.Bool("zipf", false, "use Zipf instead of uniform randomness (slow)") zipfS := flag.Float64("zipfS", 1.01, "zipf S value (general pull toward zero) must be > 1.0") zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number") - valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") - clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss") flag.Parse() - /* - // example for testing zipf/random string code. - prand := pcgr.New(time.Now().UnixNano(), 0) - // s (> 1, generally 1.01-2) pulls the power curve toward 0 - // v (anything) puts the main part of the curve before this number, - // biasing loads below it more. - // imax is the highest number that will be seen. - var src = rand.NewZipf(rand.New(&prand), 2, 5, 100) - subRS := rand.NewSource(1) - for i := 1; i < 100000; i++ { - seed := src.Uint64() - subRS.Seed(int64(seed)) - //fmt.Printf("%d: %s\n", seed, mct.RandString(subRS, 30)) - //fmt.Printf("%d\n", seed) - } - os.Exit(0) - */ - - bl := &BasicLoader{ - servers: []string{"127.0.0.1:11211"}, - desiredConnCount: *connCount, - requestsPerSleep: *reqPerSleep, - requestBundlesPerConn: *reqBundlePerConn, - sleepPerBundle: *sleepPerBundle, - deletePercent: *deletePercent, - keyLength: *keyLength, - keyPrefix: *keyPrefix, - keySpace: *keySpace, - keyTTL: *keyTTL, - useZipf: *useZipf, - zipfS: *zipfS, - zipfV: *zipfV, - valueSize: *valueSize, - clientFlags: *clientFlags, + testConfig := &Config{ + ClientFlags: *clientFlags, + ConnCount: *connCount, + DelRatio: *delRatio, + Duration: *duration, + GetRatio: *getRatio, + KeyLength: *keyLength, + KeyPrefix: *keyPrefix, + KeySpace: *keySpace, + KeyTTL: *keyTTL, + Pipelines: *pipelines, + RngSeed: *rngSeed, + RPS: *rps, + Servers: []string{*server}, + SetRatio: *setRatio, + Socket: *socket, + StripKeyPrefix: *stripKeyPrefix, + UseZipf: *useZipf, + ValueSize: *valueSize, + WarmPercent: *warmPercent, + ZipfS: *zipfS, + ZipfV: *zipfV, } if *cpuprofile != "" { @@ -78,152 +82,204 @@ func main() { } pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() - // TODO: Use a real timer with channel. - now := time.Now() - bl.stopAfter = now.Add(time.Second * 10) - fmt.Printf("time: %v\n", bl.stopAfter) } - bl.Run() + testConfig.Run() } -// Basic persistent load test, using text protocol: -// - list of servers to connect to, pct of each. -// - zipf or uniform random -// - requests per connect (-1 for unlim) -// - gets per etc -// - multiget or not -// - set or add to replace -// - delete frequency -// - set size range -// - variances: peak/antipeak load -// - variances: how often to change item sizes -type BasicLoader struct { - servers []string - stopAfter time.Time - desiredConnCount int - requestsPerSleep int - requestBundlesPerConn int - sleepPerBundle time.Duration - setValueSizes []int - deletePercent int - keyLength int - keyPrefix string - keySpace int - keyTTL uint - useZipf bool - zipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) - zipfV float64 // v (< keySpace) puts the main part of the curve before this number - valueSize uint - clientFlags uint +type Config struct { + ClientFlags uint + ConnCount int + DelRatio int + Duration time.Duration + GetRatio int + KeyLength int + KeyPrefix string + KeySpace int + KeyTTL uint + Pipelines uint + RngSeed int64 + RPS int + Servers []string + SetRatio int + Socket string + StripKeyPrefix bool + UseZipf bool + ValueSize uint + WarmPercent int + ZipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) + ZipfV float64 // v (< keySpace) puts the main part of the curve before this number + tachymeter *tachymeter.Tachymeter } -func (l *BasicLoader) Run() { - var runners int - // TODO: should be method of surfacing errors. - doneChan := make(chan int, 50) +func (conf *Config) Run() (err error) { + g, _ := errgroup.WithContext(context.Background()) - for { - for runners < l.desiredConnCount { - go l.Worker(doneChan) - runners++ - } - res := <-doneChan - if res == 0 { - //fmt.Println("That's a bingo!") - } - runners-- - if *cpuprofile != "" && time.Now().After(l.stopAfter) { + samples := conf.RPS * conf.ConnCount + if samples < 1000 { + samples = 1000 + } + + if conf.WarmPercent > 0 { + err = conf.WarmCache() + if err != nil { return } } + + threadStats := make(chan Stats, conf.ConnCount) + conf.tachymeter = tachymeter.New(&tachymeter.Config{Size: samples}) + startTime := time.Now() + + for worker := 0; worker < conf.ConnCount; worker++ { + index := worker + g.Go(func() error { + return conf.Worker(index, threadStats) + }) + } + + err = g.Wait() + endTime := time.Now() + if err != nil { + return + } + + conf.tachymeter.SetWallTime(time.Since(startTime)) + close(threadStats) + testStats := &Stats{} + for stats := range threadStats { + testStats.Add(&stats) + } + + report := &Report{ + StartTime: startTime, + EndTime: endTime, + Config: conf, + Metrics: conf.tachymeter.Calc(), + Stats: testStats, + } + err = report.PrettyPrint() + + return } -func (l *BasicLoader) Timer(tag string, start time.Time) { - duration := time.Since(start) - if duration > time.Millisecond * 10 { - fmt.Printf("%s [%d]\n", tag, int64(time.Since(start) / time.Microsecond)) +func (conf *Config) WarmCache() error { + mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + + rs := pcgr.New(conf.RngSeed, 0) + randR := rand.New(&rs) + subRS := pcgr.New(1, 0) + + for keyIndex := 0; keyIndex < conf.KeySpace; keyIndex++ { + if randR.Intn(100) < conf.WarmPercent { + subRS.Seed(conf.RngSeed + int64(keyIndex)) + key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) + + valSeed := new(big.Int).SetBytes([]byte(key)).Int64() + subRS.Seed(valSeed) + value := mct.RandBytes(&subRS, int(conf.ValueSize)) + + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + } } + + return nil } -// TODO: use sync.Pool for Item/etc? -// pool.Put() items back before sleep. -// may also be able to cache mc's bufio's this way. -func (l *BasicLoader) Worker(doneChan chan<- int) { - // FIXME: selector. - host := l.servers[0] - mc := mct.NewClient(host) - bundles := l.requestBundlesPerConn +func (conf *Config) Worker(index int, results chan Stats) error { + workerSeed := conf.RngSeed + int64(index*conf.KeySpace) + stats := Stats{} + mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) - rs := pcgr.New(time.Now().UnixNano(), 0) + rs := pcgr.New(workerSeed, 0) + randR := rand.New(&rs) var zipRS *rand.Zipf - randR := rand.New(&rs) // main randomizer, so we can use the random interface. - if l.useZipf { - zipRS = rand.NewZipf(randR, l.zipfS, l.zipfV, uint64(l.keySpace)) + if conf.UseZipf { + zipRS = rand.NewZipf(randR, conf.ZipfS, conf.ZipfV, uint64(conf.KeySpace)) if zipRS == nil { - fmt.Printf("bad arguments to zipf: S: %f V: %f\n", l.zipfS, l.zipfV) - return + fmt.Printf("bad arguments to zipf: S: %f V: %f\n", conf.ZipfS, conf.ZipfV) + return nil } } + subRS := pcgr.New(1, 0) + + var rl ratelimit.Limiter + if conf.RPS > 0 { + rl = ratelimit.New(conf.RPS) + } else { + rl = ratelimit.NewUnlimited() + } - subRS := pcgr.New(1, 0) // randomizer is re-seeded for random strings. - var res int - defer func() { doneChan <- res }() + for start := time.Now(); ; { + iterStart := time.Now() + if iterStart.Sub(start) > conf.Duration { + break + } - for bundles == -1 || bundles > 0 { - if bundles != -1 { - bundles-- + if conf.UseZipf { + subRS.Seed(int64(zipRS.Uint64())) + } else { + subRS.Seed(workerSeed + int64(randR.Intn(conf.KeySpace))) } - for i := l.requestsPerSleep; i > 0; i-- { - // generate keys - // TODO: Allow min/max length for keys. - // The random key needs to stick with the random length, or we end - // up with keySpace * (max-min) number of unique keys. - // Need to pull the randomizer exactly once (then just modulus for - // a poor-but-probably-fine random value), then build the random - // string from the rest. - // Could also re-seed it twice, pull once Intn for length, - // re-seed, then again for key space. - - keyLen := l.keyLength - if l.useZipf { - subRS.Seed(int64(zipRS.Uint64())) - } else { - subRS.Seed(int64(randR.Intn(l.keySpace))) + + key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) + valSeed := new(big.Int).SetBytes([]byte(key)).Int64() + subRS.Seed(valSeed) + + switch rng := randR.Intn(conf.DelRatio + conf.SetRatio + conf.GetRatio); { + case rng < conf.DelRatio: + rl.Take() + code, err := mc.Delete(key) + if err != nil { + fmt.Println(err) + return err } - // TODO: might be nice to pass (by ref?) prefix in here to make - // use of string.Builder. - key := l.keyPrefix + mct.RandString(&subRS, keyLen) - // chance we issue a delete instead. - delChance := randR.Intn(1000) - if l.deletePercent != 0 && delChance < l.deletePercent { - _, err := mc.Delete(key) - if err != nil { - fmt.Println(err) - res = -1 - return - } - } else { - // issue gets - start := time.Now() - _, _, code, err := mc.Get(key) - l.Timer("get", start) - // validate responses - if err != nil { - fmt.Println(err) - res = -1 - return - } - // set missing values - if code == mct.McMISS { - // TODO: random sizing - value := mct.RandBytes(&rs, int(l.valueSize)) - start := time.Now() - mc.Set(key, uint32(l.clientFlags), uint32(l.keyTTL), value) - l.Timer("set", start) + + switch code { + case mct.McDELETED: + stats.DeleteHits++ + case mct.McNOT_FOUND: + stats.DeleteMisses++ + } + case rng < (conf.DelRatio + conf.SetRatio): + value := mct.RandBytes(&subRS, int(conf.ValueSize)) + rl.Take() + _, err := mc.Set(key, uint32(conf.ClientFlags), uint32(conf.KeyTTL), value) + if err != nil { + fmt.Println(err) + return err + } + + stats.SetsTotal++ + default: + expectedValue := mct.RandBytes(&subRS, int(conf.ValueSize)) + rl.Take() + _, value, code, err := mc.Get(key) + if err != nil { + fmt.Println(err, value) + return err + } + + switch code { + case mct.McHIT: + stats.GetHits++ + + if !bytes.Equal(value, expectedValue) { + stats.KeyCollisions++ + fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) } + case mct.McMISS: + stats.GetMisses++ } } - time.Sleep(l.sleepPerBundle) + + conf.tachymeter.AddTime(time.Since(iterStart)) } + + results <- stats + return nil } diff --git a/cmd/basic/report.go b/cmd/basic/report.go new file mode 100644 index 0000000..0669956 --- /dev/null +++ b/cmd/basic/report.go @@ -0,0 +1,43 @@ +package main + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/jamiealquiza/tachymeter" +) + +type Report struct { + StartTime time.Time + EndTime time.Time + Config *Config + Metrics *tachymeter.Metrics + Stats *Stats +} + +func (report *Report) PrettyPrint() (err error) { + jsonReport, err := json.MarshalIndent(report, "", "\t") + if err == nil { + fmt.Println(string(jsonReport)) + } + return +} + +type Stats struct { + DeleteHits int + DeleteMisses int + GetHits int + GetMisses int + KeyCollisions int + SetsTotal int +} + +func (stats *Stats) Add(other *Stats) { + stats.DeleteHits += other.DeleteHits + stats.DeleteMisses += other.DeleteMisses + stats.GetHits += other.GetHits + stats.GetMisses += other.GetMisses + stats.KeyCollisions += other.KeyCollisions + stats.SetsTotal += other.SetsTotal +} diff --git a/cmd/server/loader_basic.go b/cmd/server/loader_basic.go index 07703d1..2a1ab09 100644 --- a/cmd/server/loader_basic.go +++ b/cmd/server/loader_basic.go @@ -2,35 +2,42 @@ package main import ( "fmt" - pcgr "github.com/dgryski/go-pcgr" - mct "github.com/dormando/mctester" "math/rand" "time" + + "github.com/dgryski/go-pcgr" + mct "github.com/memcached/mctester" ) // Basic persistent load test, using text protocol: type BasicLoader struct { - Servers []string `json:"servers"` - stopAfter time.Time - DesiredConnCount int `json:"conncount"` - RequestsPerSleep int `json:"reqpersleep"` - RequestBundlesPerConn int `json:"reqbundlesperconn"` + Servers []string `json:"servers"` + Socket string `json:"socket"` + Pipelines uint `json:"pipelines"` + StripKeyPrefix bool `json:"stripkeyprefix"` + StopAfter time.Time `json:"stopafter"` + DesiredConnCount int `json:"conncount"` + RequestsPerSleep int `json:"reqpersleep"` + RequestBundlesPerConn int `json:"reqbundlesperconn"` SleepPerBundle time.Duration `json:"sleepperbundle"` - DeletePercent int `json:"deletepercent"` - KeyLength int `json:"keylength"` - KeyPrefix string `json:"keyprefix"` - KeySpace int `json:"keyspace"` - KeyTTL uint `json:"keyttl"` - UseZipf bool `json:"zipf"` - ZipfS float64 `json:"zipfS"` // (> 1, generally 1.01-2) pulls the power curve toward 0) - ZipfV float64 `json:"zipfV"` // v (< KeySpace) puts the main part of the curve before this number - ValueSize uint `json:"valuesize"` - ClientFlags uint `json:"clientflags"` + DeletePercent int `json:"deletepercent"` + KeyLength int `json:"keylength"` + KeyPrefix string `json:"keyprefix"` + KeySpace int `json:"keyspace"` + KeyTTL uint `json:"keyttl"` + UseZipf bool `json:"zipf"` + ZipfS float64 `json:"zipfS"` // (> 1, generally 1.01-2) pulls the power curve toward 0) + ZipfV float64 `json:"zipfV"` // v (< KeySpace) puts the main part of the curve before this number + ValueSize uint `json:"valuesize"` + ClientFlags uint `json:"clientflags"` } func newBasicLoader() *BasicLoader { return &BasicLoader{ Servers: []string{"127.0.0.1:11211"}, + Socket: "", + Pipelines: 1, + StripKeyPrefix: false, DesiredConnCount: 1, RequestsPerSleep: 1, RequestBundlesPerConn: 1, @@ -116,7 +123,7 @@ func runBasicLoader(Update <-chan interface{}, worker interface{}) { func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l *BasicLoader) { // TODO: server selector. host := l.Servers[0] - mc := mct.NewClient(host) + mc := mct.NewClient(host, l.Socket, l.Pipelines, l.KeyPrefix, l.StripKeyPrefix) bundles := l.RequestBundlesPerConn rs := pcgr.New(time.Now().UnixNano(), 0) @@ -149,7 +156,6 @@ func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l // Could also re-seed it twice, pull once Intn for length, // re-seed, then again for key space. - keyLen := l.KeyLength if l.UseZipf { subRS.Seed(int64(zipRS.Uint64())) } else { @@ -157,7 +163,7 @@ func basicWorker(id int, doneChan chan<- int, updateChan <-chan *BasicLoader, l } // TODO: might be nice to pass (by ref?) prefix in here to make // use of string.Builder. - key := l.KeyPrefix + mct.RandString(&subRS, keyLen) + key := mct.RandString(&subRS, l.KeyLength, l.KeyPrefix) // chance we issue a delete instead. if l.DeletePercent != 0 && randR.Intn(1000) < l.DeletePercent { _, err := mc.Delete(key) diff --git a/cmd/server/main.go b/cmd/server/main.go index 6a0edb8..5130ba4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/json" "flag" "fmt" @@ -10,7 +11,6 @@ import ( "os" "runtime/pprof" "time" - "bytes" ) // TODO: think we can pass this to loaderManager() from main()? @@ -111,7 +111,7 @@ func main() { os.Exit(1) } } - resp, err := http.Post(*setAddr + "/set", "Content-Type: application/json", bytes.NewReader(data)) + resp, err := http.Post(*setAddr+"/set", "Content-Type: application/json", bytes.NewReader(data)) if err != nil { fmt.Println("Error sending loader config to server:", err) os.Exit(1) @@ -135,7 +135,7 @@ func main() { log.Fatal(err) } - resp, err := http.Post(*delAddr + "/delete", "Content-Type: application/json", bytes.NewReader(data)) + resp, err := http.Post(*delAddr+"/delete", "Content-Type: application/json", bytes.NewReader(data)) if resp.StatusCode == http.StatusOK { fmt.Printf("successfully send loader update\n") } else { diff --git a/protocol.go b/protocol.go index 4a48820..d19b1d7 100644 --- a/protocol.go +++ b/protocol.go @@ -17,6 +17,7 @@ import ( "io" "net" "strconv" + "strings" "time" ) @@ -36,7 +37,13 @@ type mcConn struct { } func (c *Client) connectToMc() (*mcConn, error) { - conn, err := net.DialTimeout("tcp", c.Host, c.ConnectTimeout) + var conn net.Conn + var err error + if c.socket != "" { + conn, err = net.DialTimeout("unix", c.socket, c.ConnectTimeout) + } else { + conn, err = net.DialTimeout("tcp", c.Host, c.ConnectTimeout) + } if err != nil { return nil, err } @@ -50,6 +57,7 @@ type Client struct { // read or write timeout NetTimeout time.Duration Host string + socket string cn *mcConn WBufSize int RBufSize int @@ -57,12 +65,20 @@ type Client struct { // binprot structure cache. binpkt *packet opaque uint32 // just for binprot? + + pipelines int + keyPrefix string + stripKeyPrefix bool } -func NewClient(host string) (client *Client) { +func NewClient(host string, socket string, pipelines uint, keyPrefix string, stripKeyPrefix bool) (client *Client) { client = &Client{ - Host: host, - binpkt: &packet{}, + Host: host, + socket: socket, + pipelines: int(pipelines), + keyPrefix: keyPrefix, + stripKeyPrefix: stripKeyPrefix, + binpkt: &packet{}, } //client.rs = rand.NewSource(time.Now().UnixNano()) return client @@ -319,56 +335,72 @@ func (c *Client) MetaDebug(key string) (err error) { ////////////////////////////////////////////// func (c *Client) Get(key string) (flags uint64, value []byte, code McCode, err error) { + pipelines := c.pipelines + // Expected key from response + respKey := key + if c.stripKeyPrefix { + respKey = strings.TrimPrefix(key, c.keyPrefix) + } + err = c.runNow(key, len(key)+6, func() error { b := c.cn.b - b.WriteString("get ") - b.WriteString(key) - b.WriteString("\r\n") - err = b.Flush() - - if err != nil { - return err + for i := 0; i < pipelines; i++ { + b.WriteString("get ") + b.WriteString(key) + b.WriteString("\r\n") } - - line, err := b.ReadBytes('\n') + err = b.Flush() if err != nil { return err } - if bytes.Equal(line, []byte("END\r\n")) { - code = McMISS - } else { - parts := bytes.Split(line[:len(line)-2], []byte(" ")) - if !bytes.Equal(parts[0], []byte("VALUE")) { - // TODO: This should look for ERROR/SERVER_ERROR/etc - return ErrUnexpectedResponse - } - if len(parts) != 4 { - return ErrUnexpectedResponse - } - if !bytes.Equal(parts[1], []byte(key)) { - // FIXME: how do we embed the received vs expected in here? - // use the brand-new golang error wrapping thing? - return ErrKeyDoesNotMatch - } - flags, _ = ParseUint(parts[2]) - size, _ := ParseUint(parts[3]) - - value = make([]byte, size+2) - _, err := io.ReadFull(b, value) + for i := 0; i < pipelines; i++ { + line, err := b.ReadBytes('\n') if err != nil { return err } - if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) { - return ErrCorruptValue - } - code = McHIT - value = value[:size] - - line, err = b.ReadBytes('\n') - if !bytes.Equal(line, []byte("END\r\n")) { - return ErrUnexpectedResponse + if bytes.Equal(line, []byte("END\r\n")) { + code = McMISS + } else { + parts := bytes.Split(line[:len(line)-2], []byte(" ")) + if !bytes.Equal(parts[0], []byte("VALUE")) { + // TODO: This should look for ERROR/SERVER_ERROR/etc + fmt.Print("Unexpected Response: ", string(line), "\n") + continue + } + if len(parts) != 4 { + fmt.Print("Unexpected Response: ", "parts not 4", "\n") + continue + } + if !bytes.Equal(parts[1], []byte(respKey)) { + fmt.Print("Unmatched Key: ", string(parts[1]), " and ", respKey, "\n") + // FIXME: how do we embed the received vs expected in here? + // use the brand-new golang error wrapping thing? + continue + } + flags, _ = ParseUint(parts[2]) + size, _ := ParseUint(parts[3]) + + value = make([]byte, size+2) + _, err := io.ReadFull(b, value) + if err != nil { + fmt.Print("io ReadFull error, return", "\n") + return err + } + + if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) { + fmt.Print("Unmatched Value", "\n") + continue + } + code = McHIT + value = value[:size] + + line, err = b.ReadBytes('\n') + if !bytes.Equal(line, []byte("END\r\n")) { + fmt.Print("Unmatched Reponse: ", string(line), " is not END\r\n") + continue + } } } diff --git a/protocol_test.go b/protocol_test.go index 67542e7..357558c 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -11,9 +11,13 @@ import ( // tests expect a recent memcached to be running at this address. const hostname = "127.0.0.1:11211" +const socket = "" +const pipelines = 1 +const keyPrefix = "mctester:" +const stripKeyPrefix = false func newcli() *Client { - mc := NewClient(hostname) + mc := NewClient(hostname, socket, pipelines, keyPrefix, stripKeyPrefix) mc.ConnectTimeout = 3 * time.Second mc.NetTimeout = time.Second mc.WBufSize = 64 * 1024 diff --git a/support.go b/support.go index 31a3225..2c9ae65 100644 --- a/support.go +++ b/support.go @@ -25,9 +25,10 @@ const ( // randomized keys! // TODO: is sb reusable? -func RandString(src rand.Source, n int) string { +func RandString(src rand.Source, n int, prefix string) string { sb := strings.Builder{} - sb.Grow(n) + sb.Grow(len(prefix) + n) + sb.WriteString(prefix) // A src.Int63() generates 63 random bits, enough for letterIdxMax characters! for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; { if remain == 0 { From 0f3347ffd6d9b76522078a313f1cc4e68167299c Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Thu, 2 Jun 2022 16:05:45 -0400 Subject: [PATCH 2/4] Generate keys using the global seed instead of the worker seed --- cmd/basic/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index 0fcaa74..52ac63f 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -191,7 +191,7 @@ func (conf *Config) WarmCache() error { } func (conf *Config) Worker(index int, results chan Stats) error { - workerSeed := conf.RngSeed + int64(index*conf.KeySpace) + workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) stats := Stats{} mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) @@ -223,7 +223,7 @@ func (conf *Config) Worker(index int, results chan Stats) error { if conf.UseZipf { subRS.Seed(int64(zipRS.Uint64())) } else { - subRS.Seed(workerSeed + int64(randR.Intn(conf.KeySpace))) + subRS.Seed(conf.RngSeed + int64(randR.Intn(conf.KeySpace))) } key := mct.RandString(&subRS, conf.KeyLength, conf.KeyPrefix) From 9dd544f52674ca8df69dbdc2f68367a117ef5ffe Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Tue, 7 Jun 2022 17:06:04 -0400 Subject: [PATCH 3/4] Add flag for enabling value comparisons after `gets`. --- cmd/basic/main.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index 52ac63f..1e5c473 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -42,6 +42,7 @@ func main() { socket := flag.String("socket", "", "domain socket used for connections") stripKeyPrefix := flag.Bool("stripkeyprefix", false, "remove key prefix before comparing with response") keyTTL := flag.Uint("ttl", 180, "TTL to set with new items") + validateGets := flag.Bool("validate", false, "compare the value returned from a `get` to what was initially `set`") valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") warmPercent := flag.Int("warm", 90, "percent of keys to `set` in Memcached before testing begins") useZipf := flag.Bool("zipf", false, "use Zipf instead of uniform randomness (slow)") @@ -68,6 +69,7 @@ func main() { Socket: *socket, StripKeyPrefix: *stripKeyPrefix, UseZipf: *useZipf, + ValidateGets: *validateGets, ValueSize: *valueSize, WarmPercent: *warmPercent, ZipfS: *zipfS, @@ -105,11 +107,13 @@ type Config struct { Socket string StripKeyPrefix bool UseZipf bool + ValidateGets bool ValueSize uint WarmPercent int ZipfS float64 // (> 1, generally 1.01-2) pulls the power curve toward 0) ZipfV float64 // v (< keySpace) puts the main part of the curve before this number - tachymeter *tachymeter.Tachymeter + + tachymeter *tachymeter.Tachymeter } func (conf *Config) Run() (err error) { @@ -150,6 +154,9 @@ func (conf *Config) Run() (err error) { for stats := range threadStats { testStats.Add(&stats) } + if !conf.ValidateGets { + testStats.KeyCollisions = -1 + } report := &Report{ StartTime: startTime, @@ -256,7 +263,6 @@ func (conf *Config) Worker(index int, results chan Stats) error { stats.SetsTotal++ default: - expectedValue := mct.RandBytes(&subRS, int(conf.ValueSize)) rl.Take() _, value, code, err := mc.Get(key) if err != nil { @@ -268,9 +274,12 @@ func (conf *Config) Worker(index int, results chan Stats) error { case mct.McHIT: stats.GetHits++ - if !bytes.Equal(value, expectedValue) { - stats.KeyCollisions++ - fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) + if conf.ValidateGets { + expectedValue := mct.RandBytes(&subRS, int(conf.ValueSize)) + if !bytes.Equal(value, expectedValue) { + stats.KeyCollisions++ + fmt.Printf("Unexpected value found for key `%s`\n\tExpected Value: %s\n\tActual Value: %s\n", key, expectedValue, value) + } } case mct.McMISS: stats.GetMisses++ From 433f036a9ddda24a27112c923c7a7349d231f5fe Mon Sep 17 00:00:00 2001 From: Alexandre Desjardins Date: Wed, 8 Jun 2022 11:19:22 -0400 Subject: [PATCH 4/4] Add global RNG seed when using Zipf --- cmd/basic/main.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index 1e5c473..35bdfff 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -198,12 +198,13 @@ func (conf *Config) WarmCache() error { } func (conf *Config) Worker(index int, results chan Stats) error { - workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) stats := Stats{} mc := mct.NewClient(conf.Servers[0], conf.Socket, conf.Pipelines, conf.KeyPrefix, conf.StripKeyPrefix) + workerSeed := conf.RngSeed + int64(index) + int64(conf.KeySpace) rs := pcgr.New(workerSeed, 0) randR := rand.New(&rs) + var zipRS *rand.Zipf if conf.UseZipf { zipRS = rand.NewZipf(randR, conf.ZipfS, conf.ZipfV, uint64(conf.KeySpace)) @@ -228,7 +229,7 @@ func (conf *Config) Worker(index int, results chan Stats) error { } if conf.UseZipf { - subRS.Seed(int64(zipRS.Uint64())) + subRS.Seed(conf.RngSeed + int64(zipRS.Uint64())) } else { subRS.Seed(conf.RngSeed + int64(randR.Intn(conf.KeySpace))) }