diff --git a/README.md b/README.md index e20b5de..2ccd04a 100644 --- a/README.md +++ b/README.md @@ -38,17 +38,17 @@ Flags: --help Show context-sensitive help (also try --help-long and --help-man). -l, --list Show available speedtest.net servers. -s, --server=SERVER ... Select server id to speedtest. - --custom-url=CUSTOM-URL Specify the url of the server instead of getting a list from speedtest.net. + --custom-url=CUSTOM-URL Specify the url of the server instead of fetching from speedtest.net. --saving-mode Test with few resources, though low accuracy (especially > 30Mbps). --json Output results in json format. - --location=LOCATION Change the location with a precise coordinate. + --location=LOCATION Change the location with a precise coordinate (format: lat,lon). --city=CITY Change the location with a predefined city label. --city-list List all predefined city labels. --proxy=PROXY Set a proxy(http[s] or socks) for the speedtest. eg: --proxy=socks://10.20.0.101:7890 eg: --proxy=http://10.20.0.101:7890 --source=SOURCE Bind a source interface for the speedtest. - --dns-bind-source DNS request binding source.(Experimental) + --dns-bind-source DNS request binding source (experimental). eg: --source=10.20.0.101 -m --multi Enable multi-server mode. -t --thread=THREAD Set the number of concurrent connections. @@ -56,7 +56,9 @@ Flags: --ua Set the user-agent header for the speedtest. --no-download Disable download test. --no-upload Disable upload test. - --ping-mode Select a method for Ping. (support icmp/tcp/http) + --ping-mode Select a method for Ping (support icmp/tcp/http). + -u --unit Set human-readable and auto-scaled rate units for output + (options: decimal-bits/decimal-bytes/binary-bits/binary-bytes). -d --debug Enable debug mode. --version Show application version. ``` @@ -68,15 +70,15 @@ Simply use `speedtest` command. The closest server is selected by default. Use t ```bash $ speedtest - speedtest-go v1.6.5 @showwin + speedtest-go v1.7.0 @showwin ✓ ISP: 124.27.199.165 (Fujitsu) [34.9769, 138.3831] ✓ Found 20 Public Servers ✓ Test Server: [6691] 9.03km Shizuoka (Japan) by sudosan -✓ Latency: 24.15396ms Jitter: 777.465µs Min: 22.8926ms Max: 25.5387ms -✓ Download: 73.30Mbps (used: 101.48MB) -✓ Upload: 35.26Mbps (used: 47.33MB) +✓ Latency: 4.452963ms Jitter: 41.271µs Min: 4.395179ms Max: 4.517576ms +✓ Download: 115.52 Mbps (Used: 135.75MB) (Latency: 4ms Jitter: 0ms Min: 4ms Max: 4ms) +✓ Upload: 4.02 Mbps (Used: 6.85MB) (Latency: 4ms Jitter: 1ms Min: 3ms Max: 8ms) ``` #### Test with Other Servers @@ -98,23 +100,20 @@ and select them by id. ```bash $ speedtest --server 6691 --server 6087 - speedtest-go v1.6.5 @showwin + speedtest-go v1.7.0 @showwin ✓ ISP: 124.27.199.165 (Fujitsu) [34.9769, 138.3831] -✓ Found 20 Public Servers +✓ Found 2 Specified Public Server(s) ✓ Test Server: [6691] 9.03km Shizuoka (Japan) by sudosan ✓ Latency: 21.424ms Jitter: 1.644ms Min: 19.142ms Max: 23.926ms -✓ Download: 65.82Mbps (used: 75.48MB) -✓ Upload: 27.00Mbps (used: 36.33MB) +✓ Download: 65.82Mbps (Used: 75.48MB) (Latency: 22ms Jitter: 2ms Min: 17ms Max: 24ms) +✓ Upload: 27.00Mbps (Used: 36.33MB) (Latency: 23ms Jitter: 2ms Min: 18ms Max: 25ms) ✓ Test Server: [6087] 120.55km Fussa-shi (Japan) by Allied Telesis Capital Corporation ✓ Latency: 38.694699ms Jitter: 2.724ms Min: 36.443ms Max: 39.953ms -✓ Download: 72.24Mbps (used: 83.72MB) -✓ Upload: 29.56Mbps (used: 47.64MB) - -Download Avg: 69.03 Mbit/s -Upload Avg: 28.28 Mbit/s +✓ Download: 72.24Mbps (Used: 83.72MB) (Latency: 37ms Jitter: 3ms Min: 36ms Max: 40ms) +✓ Upload: 29.56Mbps (Used: 47.64MB) (Latency: 38ms Jitter: 3ms Min: 37ms Max: 41ms) ``` #### Test with a virtual location @@ -195,7 +194,8 @@ func main() { s.PingTest(nil) s.DownloadTest() s.UploadTest() - fmt.Printf("Latency: %s, Download: %f, Upload: %f\n", s.Latency, s.DLSpeed, s.ULSpeed) + // Note: The unit of s.DLSpeed, s.ULSpeed is bytes per second, this is a float64. + fmt.Printf("Latency: %s, Download: %s, Upload: %s\n", s.Latency, s.DLSpeed, s.ULSpeed) s.Context.Reset() // reset counter } } diff --git a/example/main.go b/example/main.go index 95cc224..c549f0f 100644 --- a/example/main.go +++ b/example/main.go @@ -31,7 +31,8 @@ func main() { checkError(s.DownloadTest()) checkError(s.UploadTest()) - fmt.Printf("Latency: %s, Download: %f, Upload: %f\n", s.Latency, s.DLSpeed, s.ULSpeed) + // Note: The unit of s.DLSpeed, s.ULSpeed is bytes per second, this is a float64. + fmt.Printf("Latency: %s, Download: %s, Upload: %s\n", s.Latency, s.DLSpeed, s.ULSpeed) s.Context.Reset() } } diff --git a/speedtest.go b/speedtest.go index cf21fc4..d6c8d5b 100644 --- a/speedtest.go +++ b/speedtest.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" "github.com/showwin/speedtest-go/speedtest" @@ -15,22 +16,23 @@ import ( var ( showList = kingpin.Flag("list", "Show available speedtest.net servers.").Short('l').Bool() serverIds = kingpin.Flag("server", "Select server id to run speedtest.").Short('s').Ints() - customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from speedtest.net.").String() + customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of fetching from speedtest.net.").String() savingMode = kingpin.Flag("saving-mode", "Test with few resources, though low accuracy (especially > 30Mbps).").Bool() jsonOutput = kingpin.Flag("json", "Output results in json format.").Bool() - location = kingpin.Flag("location", "Change the location with a precise coordinate. Format: lat,lon").String() + location = kingpin.Flag("location", "Change the location with a precise coordinate (format: lat,lon).").String() city = kingpin.Flag("city", "Change the location with a predefined city label.").String() showCityList = kingpin.Flag("city-list", "List all predefined city labels.").Bool() proxy = kingpin.Flag("proxy", "Set a proxy(http[s] or socks) for the speedtest.").String() source = kingpin.Flag("source", "Bind a source interface for the speedtest.").String() - dnsBindSource = kingpin.Flag("dns-bind-source", "DNS request binding source.(Experimental)").Bool() + dnsBindSource = kingpin.Flag("dns-bind-source", "DNS request binding source (experimental).").Bool() multi = kingpin.Flag("multi", "Enable multi-server mode.").Short('m').Bool() thread = kingpin.Flag("thread", "Set the number of concurrent connections.").Short('t').Int() search = kingpin.Flag("search", "Fuzzy search servers by a keyword.").String() userAgent = kingpin.Flag("ua", "Set the user-agent header for the speedtest.").String() noDownload = kingpin.Flag("no-download", "Disable download test.").Bool() noUpload = kingpin.Flag("no-upload", "Disable upload test.").Bool() - pingMode = kingpin.Flag("ping-mode", "Select a method for Ping. (support icmp/tcp/http)").Default("http").String() + pingMode = kingpin.Flag("ping-mode", "Select a method for Ping (support icmp/tcp/http).").Default("http").String() + unit = kingpin.Flag("unit", "Set human-readable and auto-scaled rate units for output (options: decimal-bits/decimal-bytes/binary-bits/binary-bytes).").Short('u').String() debug = kingpin.Flag("debug", "Enable debug mode.").Short('d').Bool() ) @@ -40,23 +42,25 @@ func main() { kingpin.Parse() AppInfo() + speedtest.SetUnit(parseUnit(*unit)) + // 0. speed test setting var speedtestClient = speedtest.New(speedtest.WithUserConfig( &speedtest.UserConfig{ - UserAgent: *userAgent, - Proxy: *proxy, - Source: *source, - DnsBindSource: *dnsBindSource, - Debug: *debug, - PingMode: parseProto(*pingMode), // TCP as default - SavingMode: *savingMode, - CityFlag: *city, - LocationFlag: *location, - Keyword: *search, - NoDownload: *noDownload, - NoUpload: *noUpload, + UserAgent: *userAgent, + Proxy: *proxy, + Source: *source, + DnsBindSource: *dnsBindSource, + Debug: *debug, + PingMode: parseProto(*pingMode), // TCP as default + SavingMode: *savingMode, + MaxConnections: *thread, + CityFlag: *city, + LocationFlag: *location, + Keyword: *search, + NoDownload: *noDownload, + NoUpload: *noUpload, })) - speedtestClient.SetNThread(*thread) if *showCityList { speedtest.PrintCityList() @@ -124,29 +128,15 @@ func main() { task.Printf("Latency: %v Jitter: %v Min: %v Max: %v", server.Latency, server.Jitter, server.MinLatency, server.MaxLatency) task.Complete() }) - + accEcho := newAccompanyEcho(server, time.Millisecond*500) taskManager.Run("Download", func(task *Task) { - var latencies []int64 - var lc int64 - quit := false - go func() { - for { - if quit { - return - } - latency, err1 := server.HTTPPing(context.Background(), 1, time.Millisecond*500, nil) - if err1 != nil { - continue - } - lc = latency[0] - latencies = append(latencies, latency...) - } - }() - ticker := speedtestClient.CallbackDownloadRate(func(downRate float64) { + accEcho.Run() + speedtestClient.SetCallbackDownload(func(downRate speedtest.ByteRate) { + lc := accEcho.CurrentLatency() if lc == 0 { - task.Printf("Download: %.2fMbps (latency: --)", downRate) + task.Printf("Download: %s (Latency: --)", downRate) } else { - task.Printf("Download: %.2fMbps (latency: %dms)", downRate, lc/1000000) + task.Printf("Download: %s (Latency: %dms)", downRate, lc/1000000) } }) if *multi { @@ -154,34 +144,20 @@ func main() { } else { task.CheckError(server.DownloadTest()) } - ticker.Stop() - mean, _, std, minL, maxL := speedtest.StandardDeviation(latencies) - task.Printf("Download: %.2fMbps (used: %.2fMB) (latency: %dms jitter: %dms min: %dms max: %dms)", server.DLSpeed, float64(server.Context.Manager.GetTotalDownload())/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000) + accEcho.Stop() + mean, _, std, minL, maxL := speedtest.StandardDeviation(accEcho.Latencies()) + task.Printf("Download: %s (Used: %.2fMB) (Latency: %dms Jitter: %dms Min: %dms Max: %dms)", server.DLSpeed, float64(server.Context.Manager.GetTotalDownload())/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000) task.Complete() }) taskManager.Run("Upload", func(task *Task) { - var latencies []int64 - var lc int64 - quit := false - go func() { - for { - if quit { - return - } - latency, err1 := server.HTTPPing(context.Background(), 1, time.Millisecond*500, nil) - if err1 != nil { - continue - } - lc = latency[0] - latencies = append(latencies, latency...) - } - }() - ticker := speedtestClient.CallbackUploadRate(func(upRate float64) { + accEcho.Run() + speedtestClient.SetCallbackUpload(func(upRate speedtest.ByteRate) { + lc := accEcho.CurrentLatency() if lc == 0 { - task.Printf("Upload: %.2fMbps (latency: --)", upRate) + task.Printf("Upload: %s (Latency: --)", upRate) } else { - task.Printf("Upload: %.2fMbps (latency: %dms)", upRate, lc/1000000) + task.Printf("Upload: %s (Latency: %dms)", upRate, lc/1000000) } }) if *multi { @@ -189,10 +165,9 @@ func main() { } else { task.CheckError(server.UploadTest()) } - ticker.Stop() - quit = true - mean, _, std, minL, maxL := speedtest.StandardDeviation(latencies) - task.Printf("Upload: %.2fMbps (used: %.2fMB) (latency: %dms jitter: %dms min: %dms max: %dms)", server.ULSpeed, float64(server.Context.Manager.GetTotalUpload())/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000) + accEcho.Stop() + mean, _, std, minL, maxL := speedtest.StandardDeviation(accEcho.Latencies()) + task.Printf("Upload: %s (Used: %.2fMB) (Latency: %dms Jitter: %dms Min: %dms Max: %dms)", server.ULSpeed, float64(server.Context.Manager.GetTotalUpload())/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000) task.Complete() }) taskManager.Reset() @@ -210,6 +185,54 @@ func main() { } } +type AccompanyEcho struct { + stopEcho chan bool + server *speedtest.Server + currentLatency int64 + interval time.Duration + latencies []int64 +} + +func newAccompanyEcho(server *speedtest.Server, interval time.Duration) *AccompanyEcho { + return &AccompanyEcho{ + server: server, + interval: interval, + stopEcho: make(chan bool), + } +} + +func (ae *AccompanyEcho) Run() { + ae.latencies = make([]int64, 0) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for { + select { + case <-ae.stopEcho: + cancel() + return + default: + latency, _ := ae.server.HTTPPing(ctx, 1, ae.interval, nil) + if len(latency) > 0 { + atomic.StoreInt64(&ae.currentLatency, latency[0]) + ae.latencies = append(ae.latencies, latency[0]) + } + } + } + }() +} + +func (ae *AccompanyEcho) Stop() { + ae.stopEcho <- false +} + +func (ae *AccompanyEcho) CurrentLatency() int64 { + return atomic.LoadInt64(&ae.currentLatency) +} + +func (ae *AccompanyEcho) Latencies() []int64 { + return ae.latencies +} + func showServerList(servers speedtest.Servers) { for _, s := range servers { fmt.Printf("[%5s] %9.2fkm ", s.ID, s.Distance) @@ -223,6 +246,21 @@ func showServerList(servers speedtest.Servers) { } } +func parseUnit(str string) speedtest.UnitType { + str = strings.ToLower(str) + if str == "decimal-bits" { + return speedtest.UnitTypeDecimalBits + } else if str == "decimal-bytes" { + return speedtest.UnitTypeDecimalBytes + } else if str == "binary-bits" { + return speedtest.UnitTypeBinaryBits + } else if str == "binary-bytes" { + return speedtest.UnitTypeBinaryBytes + } else { + return speedtest.UnitTypeDefaultMbps + } +} + func parseProto(str string) speedtest.Proto { str = strings.ToLower(str) if str == "icmp" { diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index a56bb66..fac4a34 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "github.com/showwin/speedtest-go/speedtest/internal" "io" "math" "runtime" @@ -26,11 +27,14 @@ type Manager interface { GetAvgDownloadRate() float64 GetAvgUploadRate() float64 - CallbackDownloadRate(callback func(downRate float64)) *time.Ticker - CallbackUploadRate(callback func(upRate float64)) *time.Ticker + GetEWMADownloadRate() float64 + GetEWMAUploadRate() float64 - RegisterDownloadHandler(fn func()) *funcGroup - RegisterUploadHandler(fn func()) *funcGroup + SetCallbackDownload(callback func(downRate ByteRate)) + SetCallbackUpload(callback func(upRate ByteRate)) + + RegisterDownloadHandler(fn func()) *TestDirection + RegisterUploadHandler(fn func()) *TestDirection // Wait for the upload or download task to end to avoid errors caused by core occupation Wait() @@ -55,13 +59,18 @@ const readChunkSize = 1024 * 32 // 32 KBytes type DataType int32 -const typeEmptyChunk = 0 -const typeDownload = 1 -const typeUpload = 2 +const ( + typeEmptyChunk = iota + typeDownload + typeUpload +) + +var ( + ErrorUninitializedManager = errors.New("uninitialized manager") +) type funcGroup struct { - fns []func() - manager *DataManager + fns []func() } func (f *funcGroup) Add(fn func()) { @@ -69,12 +78,6 @@ func (f *funcGroup) Add(fn func()) { } type DataManager struct { - totalDownload int64 - totalUpload int64 - - DownloadRateSequence []int64 - UploadRateSequence []int64 - SnapshotStore *Snapshots Snapshot *Snapshot sync.Mutex @@ -87,43 +90,52 @@ type DataManager struct { running bool - dFn *funcGroup - uFn *funcGroup + download *TestDirection + upload *TestDirection +} + +type TestDirection struct { + TestType int // test type + manager *DataManager // manager + totalDataVolume int64 // total send/receive data volume + RateSequence []int64 // rate history sequence + welford *internal.Welford // std/EWMA/mean + captureCallback func(realTimeRate ByteRate) // user callback + closeFunc func() // close func + *funcGroup // actually exec function +} + +func (dm *DataManager) NewDataDirection(testType int) *TestDirection { + return &TestDirection{ + TestType: testType, + manager: dm, + funcGroup: &funcGroup{}, + } } func NewDataManager() *DataManager { ret := &DataManager{ nThread: runtime.NumCPU(), - captureTime: time.Second * 10, - rateCaptureFrequency: time.Millisecond * 100, + captureTime: time.Second * 15, + rateCaptureFrequency: time.Millisecond * 50, Snapshot: &Snapshot{}, } - ret.dFn = &funcGroup{manager: ret} - ret.uFn = &funcGroup{manager: ret} - ret.SnapshotStore = newRecentSnapshots(maxSnapshotSize) + ret.download = ret.NewDataDirection(typeDownload) + ret.upload = ret.NewDataDirection(typeUpload) + ret.SnapshotStore = newHistorySnapshots(maxSnapshotSize) return ret } -func (dm *DataManager) CallbackDownloadRate(callback func(downRate float64)) *time.Ticker { - ticker := time.NewTicker(dm.rateCaptureFrequency) - go func() { - sTime := time.Now() - for range ticker.C { - callback((float64(dm.GetTotalDownload()) * 8 / 1000000) / float64(time.Since(sTime).Milliseconds()) * 1000) - } - }() - return ticker +func (dm *DataManager) SetCallbackDownload(callback func(downRate ByteRate)) { + if dm.download != nil { + dm.download.captureCallback = callback + } } -func (dm *DataManager) CallbackUploadRate(callback func(upRate float64)) *time.Ticker { - ticker := time.NewTicker(dm.rateCaptureFrequency) - go func() { - sTime := time.Now() - for range ticker.C { - callback((float64(dm.GetTotalUpload()) * 8 / 1000000) / float64(time.Since(sTime).Milliseconds()) * 1000) - } - }() - return ticker +func (dm *DataManager) SetCallbackUpload(callback func(upRate ByteRate)) { + if dm.upload != nil { + dm.upload.captureCallback = callback + } } func (dm *DataManager) Wait() { @@ -143,65 +155,72 @@ func (dm *DataManager) Wait() { } } -func (dm *DataManager) RegisterUploadHandler(fn func()) *funcGroup { - if len(dm.uFn.fns) < dm.nThread { - dm.uFn.Add(fn) +func (dm *DataManager) RegisterUploadHandler(fn func()) *TestDirection { + if len(dm.upload.fns) < dm.nThread { + dm.upload.Add(fn) } - return dm.uFn + return dm.upload } -func (dm *DataManager) RegisterDownloadHandler(fn func()) *funcGroup { - if len(dm.dFn.fns) < dm.nThread { - dm.dFn.Add(fn) +func (dm *DataManager) RegisterDownloadHandler(fn func()) *TestDirection { + if len(dm.download.fns) < dm.nThread { + dm.download.Add(fn) } - return dm.dFn + return dm.download } -func (f *funcGroup) Start(cancel context.CancelFunc, mainRequestHandlerIndex int) { - if len(f.fns) == 0 { +func (td *TestDirection) Start(cancel context.CancelFunc, mainRequestHandlerIndex int) { + if len(td.fns) == 0 { panic("empty task stack") } - if mainRequestHandlerIndex > len(f.fns)-1 { + if mainRequestHandlerIndex > len(td.fns)-1 { mainRequestHandlerIndex = 0 } mainLoadFactor := 0.1 // When the number of processor cores is equivalent to the processing program, // the processing efficiency reaches the highest level (VT is not considered). - mainN := int(mainLoadFactor * float64(len(f.fns))) + mainN := int(mainLoadFactor * float64(len(td.fns))) if mainN == 0 { mainN = 1 } - if len(f.fns) == 1 { - mainN = f.manager.nThread + if len(td.fns) == 1 { + mainN = td.manager.nThread } - auxN := f.manager.nThread - mainN - dbg.Printf("Available fns: %d\n", len(f.fns)) + auxN := td.manager.nThread - mainN + dbg.Printf("Available fns: %d\n", len(td.fns)) dbg.Printf("mainN: %d\n", mainN) dbg.Printf("auxN: %d\n", auxN) wg := sync.WaitGroup{} - f.manager.running = true - stopCapture := f.manager.rateCapture() - time.AfterFunc(f.manager.captureTime, func() { - stopCapture <- true - close(stopCapture) - f.manager.running = false - cancel() - dbg.Println("FuncGroup: Stop") - }) + td.manager.running = true + stopCapture := td.rateCapture() + + // refresh once function + once := sync.Once{} + td.closeFunc = func() { + once.Do(func() { + stopCapture <- true + close(stopCapture) + td.manager.running = false + cancel() + dbg.Println("FuncGroup: Stop") + }) + } + + time.AfterFunc(td.manager.captureTime, td.closeFunc) for i := 0; i < mainN; i++ { wg.Add(1) go func() { defer wg.Done() for { - if !f.manager.running { + if !td.manager.running { return } - f.fns[mainRequestHandlerIndex]() + td.fns[mainRequestHandlerIndex]() } }() } for j := 0; j < auxN; { - for i := range f.fns { + for i := range td.fns { if j == auxN { break } @@ -213,10 +232,10 @@ func (f *funcGroup) Start(cancel context.CancelFunc, mainRequestHandlerIndex int go func() { defer wg.Done() for { - if !f.manager.running { + if !td.manager.running { return } - f.fns[t]() + td.fns[t]() } }() j++ @@ -225,27 +244,31 @@ func (f *funcGroup) Start(cancel context.CancelFunc, mainRequestHandlerIndex int wg.Wait() } -func (dm *DataManager) rateCapture() chan bool { - ticker := time.NewTicker(dm.rateCaptureFrequency) - oldTotalDownload := dm.totalDownload - oldTotalUpload := dm.totalUpload +func (td *TestDirection) rateCapture() chan bool { + ticker := time.NewTicker(td.manager.rateCaptureFrequency) + var prevTotalDataVolume int64 = 0 stopCapture := make(chan bool) + td.welford = internal.NewWelford(int(5 * time.Second / td.manager.rateCaptureFrequency)) + sTime := time.Now() go func(t *time.Ticker) { defer t.Stop() for { select { case <-t.C: - newTotalDownload := dm.totalDownload - newTotalUpload := dm.totalUpload - deltaDownload := newTotalDownload - oldTotalDownload - deltaUpload := newTotalUpload - oldTotalUpload - oldTotalDownload = newTotalDownload - oldTotalUpload = newTotalUpload - if deltaDownload != 0 { - dm.DownloadRateSequence = append(dm.DownloadRateSequence, deltaDownload) + newTotalDataVolume := td.totalDataVolume + deltaDataVolume := newTotalDataVolume - prevTotalDataVolume + prevTotalDataVolume = newTotalDataVolume + if deltaDataVolume != 0 { + td.RateSequence = append(td.RateSequence, deltaDataVolume) + } + // anyway we update the measuring instrument + globalAvg := (float64(td.totalDataVolume)) / float64(time.Since(sTime).Milliseconds()) * 1000 + if td.welford.Update(globalAvg) { + go td.closeFunc() } - if deltaUpload != 0 { - dm.UploadRateSequence = append(dm.UploadRateSequence, deltaUpload) + // reports the current rate at the given rate + if td.captureCallback != nil { + td.captureCallback(ByteRate(td.welford.EWMA())) } case stop := <-stopCapture: if stop { @@ -267,19 +290,19 @@ func (dm *DataManager) NewChunk() Chunk { } func (dm *DataManager) AddTotalDownload(value int64) { - atomic.AddInt64(&dm.totalDownload, value) + atomic.AddInt64(&dm.download.totalDataVolume, value) } func (dm *DataManager) AddTotalUpload(value int64) { - atomic.AddInt64(&dm.totalUpload, value) + atomic.AddInt64(&dm.upload.totalDataVolume, value) } func (dm *DataManager) GetTotalDownload() int64 { - return dm.totalDownload + return dm.download.totalDataVolume } func (dm *DataManager) GetTotalUpload() int64 { - return dm.totalUpload + return dm.upload.totalDataVolume } func (dm *DataManager) SetRateCaptureFrequency(duration time.Duration) Manager { @@ -306,24 +329,34 @@ func (dm *DataManager) Snapshots() *Snapshots { } func (dm *DataManager) Reset() { - dm.totalDownload = 0 - dm.totalUpload = 0 dm.SnapshotStore.push(dm.Snapshot) dm.Snapshot = &Snapshot{} - dm.DownloadRateSequence = []int64{} - dm.UploadRateSequence = []int64{} - dm.dFn.fns = []func(){} - dm.uFn.fns = []func(){} + dm.download = dm.NewDataDirection(typeDownload) + dm.upload = dm.NewDataDirection(typeUpload) } func (dm *DataManager) GetAvgDownloadRate() float64 { unit := float64(dm.captureTime / time.Millisecond) - return float64(dm.totalDownload*8/1000) / unit + return float64(dm.download.totalDataVolume*8/1000) / unit +} + +func (dm *DataManager) GetEWMADownloadRate() float64 { + if dm.download.welford != nil { + return dm.download.welford.EWMA() + } + return 0 } func (dm *DataManager) GetAvgUploadRate() float64 { unit := float64(dm.captureTime / time.Millisecond) - return float64(dm.totalUpload*8/1000) / unit + return float64(dm.upload.totalDataVolume*8/1000) / unit +} + +func (dm *DataManager) GetEWMAUploadRate() float64 { + if dm.upload.welford != nil { + return dm.upload.welford.EWMA() + } + return 0 } type DataChunk struct { @@ -379,7 +412,7 @@ func (dc *DataChunk) DownloadHandler(r io.Reader) error { rs := int64(readSize) dc.remainOrDiscardSize += rs - atomic.AddInt64(&dc.manager.totalDownload, rs) + atomic.AddInt64(&dc.manager.download.totalDataVolume, rs) if dc.err != nil { if dc.err == io.EOF { return nil @@ -430,7 +463,7 @@ func (dc *DataChunk) Read(b []byte) (n int, err error) { } n64 := int64(n) dc.remainOrDiscardSize -= n64 - atomic.AddInt64(&dc.manager.totalUpload, n64) + atomic.AddInt64(&dc.manager.upload.totalDataVolume, n64) return } @@ -513,7 +546,7 @@ type Snapshots struct { maxSize int } -func newRecentSnapshots(size int) *Snapshots { +func newHistorySnapshots(size int) *Snapshots { return &Snapshots{ sp: make([]*Snapshot, 0, size), maxSize: size, diff --git a/speedtest/data_manager_test.go b/speedtest/data_manager_test.go index 14c956e..6f59db6 100644 --- a/speedtest/data_manager_test.go +++ b/speedtest/data_manager_test.go @@ -35,14 +35,14 @@ func TestDataManager_AddTotalDownload(t *testing.T) { }() } wg.Wait() - if dmp.totalDownload != 43521000000 { + if dmp.download.totalDataVolume != 43521000000 { t.Fatal() } } func TestDataManager_GetAvgDownloadRate(t *testing.T) { dm := NewDataManager() - dm.totalDownload = 3000000 + dm.download.totalDataVolume = 3000000 dm.captureTime = time.Second * 10 result := dm.GetAvgDownloadRate() diff --git a/speedtest/internal/README.md b/speedtest/internal/README.md new file mode 100644 index 0000000..a867604 --- /dev/null +++ b/speedtest/internal/README.md @@ -0,0 +1,11 @@ +# Issue #192 + +SpeedTest-Go (1) + +1. Use welford alg to quickly calculate standard deviation and mean. +2. The welford alg integrated moving window feature, This allows us to ignore early data with excessive volatility. +3. Use the coefficient of variation(c.v) to reflect the confidence of the test result datasets. +4. When the data becomes stable(converge), the c.v value will become smaller. When the c.v < 0.05, we terminate this test. We set the tolerance condition as the window buffer being more than half filled and triggering more than five times with c.v < 0.05. +5. Perform EWMA operation on real-time global average, and use c.v as part of the EWMA feedback parameter. +6. The ewma value calculated is the result value of our test. +7. When the test data converge quickly, we can stop early and speed up the testing process. Of course this depends on network/device conditions. diff --git a/speedtest/internal/welford.go b/speedtest/internal/welford.go new file mode 100644 index 0000000..ffeaf49 --- /dev/null +++ b/speedtest/internal/welford.go @@ -0,0 +1,102 @@ +package internal + +import ( + "fmt" + "math" +) + +// Welford Fast standard deviation calculation with moving window +// ref Welford, B. P. (1962). Note on a Method for Calculating Corrected Sums of Squares and Products. Technometrics, 4(3), 419–420. https://doi.org/10.1080/00401706.1962.10490022 +type Welford struct { + n int // data size + mean float64 // mean + sum float64 // sum + vector []float64 // data set + eraseIndex int // the value will be erased next time + cap int + currentStdDev float64 + consecutiveStableIterations int + consecutiveStableIterationsThreshold int + cv float64 + ewmaMean float64 +} + +// NewWelford recommended windowSize = moving time window / sampling frequency +func NewWelford(windowSize int) *Welford { + return &Welford{ + vector: make([]float64, windowSize), + cap: windowSize, + consecutiveStableIterationsThreshold: 10, + } +} + +// Update Enter the given value into the measuring system. +// return bool stability evaluation +func (w *Welford) Update(value float64) bool { + if w.n == w.cap { + delta := w.vector[w.eraseIndex] - w.mean + w.mean -= delta / float64(w.n-1) + w.sum -= delta * (w.vector[w.eraseIndex] - w.mean) + // the calc error is approximated to zero + if w.sum < 0 { + w.sum = 0 + } + w.vector[w.eraseIndex] = value + w.eraseIndex++ + if w.eraseIndex == w.cap { + w.eraseIndex = 0 + } + } else { + w.vector[w.n] = value + w.n++ + } + delta := value - w.mean + w.mean += delta / float64(w.n) + w.sum += delta * (value - w.mean) + w.currentStdDev = math.Sqrt(w.Variance()) + // update C.V + if w.mean == 0 { + w.cv = 1 + } else { + w.cv = w.currentStdDev / w.mean + if w.cv > 1 { + w.cv = 1 + } + } + // ewma beta ratio + // TODO: w.cv needs normalization + beta := w.cv*0.381 + 0.618 + w.ewmaMean = w.mean*beta + w.ewmaMean*(1-beta) + // acc consecutiveStableIterations + if w.cap/2 < w.n && w.cv < 0.03 { + w.consecutiveStableIterations++ + } + return w.consecutiveStableIterations >= w.consecutiveStableIterationsThreshold +} + +func (w *Welford) Mean() float64 { + return w.mean +} + +func (w *Welford) CV() float64 { + return w.cv +} + +func (w *Welford) Variance() float64 { + if w.n < 2 { + return 0 + } + return w.sum / float64(w.n-1) +} + +func (w *Welford) StandardDeviation() float64 { + return w.currentStdDev +} + +func (w *Welford) EWMA() float64 { + return w.ewmaMean +} + +func (w *Welford) String() string { + return fmt.Sprintf("Mean: %.2f, Standard Deviation: %.2f, C.V: %.2f, EWMA: %.2f", w.Mean(), w.StandardDeviation(), w.CV(), w.EWMA()) +} diff --git a/speedtest/internal/welford_test.go b/speedtest/internal/welford_test.go new file mode 100644 index 0000000..415fd30 --- /dev/null +++ b/speedtest/internal/welford_test.go @@ -0,0 +1,38 @@ +package internal + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func BenchmarkWOM(b *testing.B) { + w := NewWelford(10) + rd := rand.New(rand.NewSource(0)) + var arr []float64 + for i := 0; i < 100; i++ { + arr = append(arr, rd.Float64()) + } + for i := 0; i < b.N; i++ { + w.Update(arr[i%100]) + } +} + +func TestWOM(t *testing.T) { + //data := []float64{0, 6.91552, 18.721692307692308, 23.04116556291391, 28.059485148514852, 31.118470119521916, 34.21727152317881, 35.15127065527066, 36.74378054862843, 38.05981374722838, 39.122272, 38.76271506352087, 39.60149084858569, 40.23165538461539, 40.72287589158345, 41.3182940397351, 41.36879800498754, 41.848093896713614, 42.29560044395117, 42.45441684210526, 42.27466666666667, 42.67902476190476, 42.68457142857143, 42.80731190269331, 42.75244, 42.6367104, 42.65657801691007, 42.68948038490007, 42.81048394004282, 42.64527084769124, 42.802427145708585, 42.92410838709678, 43.15598501872659, 43.32482909090909, 43.46838800705467, 43.5166464877213, 43.523846751804555, 43.73983171521036, 43.870409258285115, 43.84913230769231, 43.72752023988006, 43.85322926829268, 43.964127436994765, 44.027410506741056, 43.99742169768498, 44.093007552199026, 44.241321164710996, 44.287108464483204, 44.079746772178254, 44.180878367346935, 44.1128768, 44.17403607843137, 44.0556123076923, 44.20335873255375, 44.19607703703704, 44.2603155216285, 44.202986438258385, 44.2371857042747, 44.17902344827586, 44.31277559322034, 44.30876912840985, 44.378359108781126, 44.3999845410628, 44.37440913415794, 44.4760624609619, 44.471889264841586, 44.454861296184134, 44.37803098927294, 44.37345251396648, 44.47449043478261, 44.43488914285714, 44.50349070422535, 44.45223882254929, 44.433060821917806, 44.46777081081081, 44.4554752, 44.43440842105263, 44.55939356178609, 44.59433376057421, 44.633178734177214, 44.5508462884279, 44.58072493827161, 44.67904608632041, 44.662959537572256, 44.66403237324446, 44.59772449459332, 44.53588837209303, 44.586143382352944, 44.66949738933031, 44.66949618320611, 44.69251965356429, 44.628363956043955, 44.63443946968051, 44.607556129032254, 44.66488851063829, 44.66553062513155, 44.72077, 44.75405813234384, 44.821391020408164, 44.88321874368814, 44.9364192, 44.94112373787369, 44.958842352941176, 44.97548797517455, 44.94797846153846, 44.9455939047619, 44.910117647058826, 44.88617040358744, 44.86545696835092, 44.86558503026968, 44.90363345454546, 44.87501062871554, 44.93104802713802, 44.93590804597701, 44.90068409051044, 44.87467130434782, 44.891225650749874, 44.9156841025641, 44.92164610169491, 44.89092118971601, 44.9027482086319, 44.86017701453105, 44.87692428711898, 44.8651103526735, 44.8483070967742, 44.79754023356263, 44.787605776860815, 44.7891175822446, 44.778797499999996, 44.778773283743995, 44.79586406274028, 44.77517923664122, 44.62143350499849, 44.48119933904161, 44.38141850746269, 44.425831753554505, 44.390321423320096, 44.31049810218978, 44.124642318840586, 44.01107798561151, 44.02091885714285, 44.02881950942861, 44.019682253521125, 44.071147412587415, 44.08087753401833, 44.09411310344827, 44.12980821917809, 44.07751870493811, 44.09083445854713, 44.09876506104924, 44.08840634582056, 44.13032264900663, 44.09793658729115, 44.106114509803916, 44.06832567199065, 44.096165139981935, 44.116, 44.10581480066234, 44.09257851448817, 44.075851106639846, 44.074916, 44.091910073282826, 44.09682567901235, 44.07930895705522, 44.053896892138944, 44.06240019391589, 44.06116150788872, 44.1180531736527, 44.15990285714285, 44.134685126020585, 44.18049829431832, 44.158170252572496, 44.21358883720931, 44.16577849710982, 44.1291108045977, 44.16022671694664, 44.1493200045439, 44.1491597740113, 44.15741842696629, 44.177169162011175, 44.21226797022553, 44.166602585349686, 44.17985497692815, 44.168419672131144, 44.19028366481904, 44.173513455095645, 44.162090322580646, 44.123481608212145, 44.155531914893615, 44.18765714285714, 44.224294736842104, 44.21350366492147, 44.168821324448146, 44.168, 44.17472356215214, 44.187647999999996, 44.18084571428571, 44.18716962744899, 44.19440137359862, 44.1429404079992, 44.15704} + data := []float64{0, 1550.14016, 1102.92032, 759.1852799999999, 601.88672, 502.06617600000004, 435.71381333333335, 385.54944, 348.24736, 316.40547555555554, 289.261344, 267.38202181818184, 252.83786666666668, 245.42818461538462, 245.35842285714287, 235.63543466666667, 227.58704000000003, 220.20446117647057, 213.48643555555554, 207.48786526315791, 201.919424, 197.19868952380952, 192.2912581818182, 188.60752695652172, 185.09968, 181.7553664, 178.6282953846154, 175.99228444444446, 173.34433142857142, 170.91121655172415, 168.496128, 166.37638193548386, 163.99491999999998, 161.9206012121212, 160.24624, 158.76464457142856, 157.2420711111111, 155.45751351351353, 154.00374736842107, 152.63651282051282, 151.112704, 149.88837463414634, 149.02825142857142, 147.3117618604651, 144.63512, 145.58946844444446, 144.72223304347824, 143.76531744680852, 143.01374, 142.06400653061226, 141.0796992, 140.27383843137252, 139.51694153846157, 138.54825660377358, 138.22770370370372, 137.21800727272725, 136.98238857142854, 135.9439045614035, 135.45676137931034, 135.40977898305084, 134.70067622540847, 134.12748048540504, 133.63318709677418, 133.16014222222222, 132.703765, 132.20392861538463, 131.7192387878788, 131.27342328358208, 130.71865882352944, 130.23932753623188, 129.99216914285716, 129.28652169014086, 129.16500888888888, 128.67712438356165, 128.25776864864866, 127.85582506666665, 127.45831157894739, 127.03613922077922, 126.90191179487178, 126.59213772151901, 126.425936, 126.0965688888889, 125.77238634146343, 125.49601735357918, 125.1707276190476, 124.89910964705884, 124.60520186046512, 124.29531218390805, 124.02556363636364, 123.47192808988764, 123.56910222222223, 123.33042637362638, 123.09693913043478, 122.809328172043, 122.58635234042553, 122.41840168421052, 122.15547666666666, 121.92927999999999, 121.73630367346938, 121.48462222222223, 121.29761599999999, 121.06910415841584, 120.87151686274511, 120.68683805825242, 120.51469538461538, 120.2012220952381, 120.15477433962265, 119.2894474766355, 119.77253037037036, 119.57481394495413, 119.31698327272728, 119.10490234234233, 118.99255142857143, 118.76022088495574, 118.8147452631579, 118.6306587826087, 118.51604413793103, 118.38956854700855, 118.1484366101695, 117.98398924369748, 117.95854933333334, 117.80418115702479, 117.68285901639344, 117.32433170731707, 117.36720000000001, 117.21502720000001, 117.10605714285714, 117.01992566929134, 116.88315749999998, 116.77652093023256, 116.65271384615386, 116.51296488549619, 116.39656969696969, 116.29855518796992, 116.18375402985075, 116.00662992592594, 115.96002117647059, 115.85702540145985, 115.77244985507247, 115.67459145200748, 115.48589942857143, 115.44551716312057, 115.35635154929577, 115.21038993006992, 115.11760666666666, 115.01394537931034, 114.97817863013698, 114.80941278911564, 114.72184648648648, 114.64405261744967, 114.55408426666666, 114.41580291390729, 114.3308, 114.27059869281045, 114.18505260423433, 114.14871122580645, 113.99949128205128, 113.94442191082803, 113.83386734177216, 113.59835371069182, 113.515388, 113.6068849689441, 113.52565728395062, 113.47399852760736, 113.35184, 113.3104446060606, 113.19637204819277, 113.12842730538924, 113.09605333333333, 113.08127621301774, 112.81183811764706, 112.9294203508772, 112.81810418604651, 112.69997317919075, 112.63358712643677, 112.5452672, 112.50175090909092, 112.52005423728812, 112.43650516853933, 112.34864983240223, 112.20527644444445, 112.20484950276244, 112.19415912087912, 112.13252546448088, 112.04997913043476, 112.02885535135137, 111.93645075268819, 111.89874994652408, 111.80677787234042, 111.73926264550266, 111.75360336842105, 111.62827225130889, 111.64060833333332, 111.58019481865286, 111.51077113402062, 111.32106666666667, 111.43125714285713, 111.37313757741902, 111.2588913131313, 111.27499738693469, 111.22497689768976} + size := 5 * time.Second / (time.Millisecond * 50) + w := NewWelford(int(size)) + ok := false + for i, x := range data { + if w.Update(x) { + ok = true + break + } + fmt.Printf("[%d] %s\n", i, w) + } + if !ok { + t.Fatal("TestWOM failed") + } +} diff --git a/speedtest/request.go b/speedtest/request.go index 2257d39..436e8b4 100644 --- a/speedtest/request.go +++ b/speedtest/request.go @@ -10,6 +10,7 @@ import ( "net/url" "path" "strings" + "sync/atomic" "time" "github.com/showwin/speedtest-go/speedtest/tcp" @@ -39,20 +40,32 @@ func (s *Server) MultiDownloadTestContext(ctx context.Context, servers Servers) return errors.New("not found available servers") } mainIDIndex := 0 - var fp *funcGroup + var td *TestDirection _context, cancel := context.WithCancel(ctx) + defer cancel() + var errorTimes int64 = 0 + var requestTimes int64 = 0 for i, server := range *ss { if server.ID == s.ID { mainIDIndex = i } sp := server dbg.Printf("Register Download Handler: %s\n", sp.URL) - fp = server.Context.RegisterDownloadHandler(func() { - _ = downloadRequest(_context, sp, 3) + td = server.Context.RegisterDownloadHandler(func() { + atomic.AddInt64(&requestTimes, 1) + if err := downloadRequest(_context, sp, 3); err != nil { + atomic.AddInt64(&errorTimes, 1) + } }) } - fp.Start(cancel, mainIDIndex) // block here - s.DLSpeed = fp.manager.GetAvgDownloadRate() + if td == nil { + return ErrorUninitializedManager + } + td.Start(cancel, mainIDIndex) // block here + s.DLSpeed = ByteRate(td.manager.GetEWMADownloadRate()) + if s.DLSpeed == 0 && float64(errorTimes)/float64(requestTimes) > 0.1 { + s.DLSpeed = -1 // N/A + } return nil } @@ -66,20 +79,32 @@ func (s *Server) MultiUploadTestContext(ctx context.Context, servers Servers) er return errors.New("not found available servers") } mainIDIndex := 0 - var fp *funcGroup + var td *TestDirection _context, cancel := context.WithCancel(ctx) + defer cancel() + var errorTimes int64 = 0 + var requestTimes int64 = 0 for i, server := range *ss { if server.ID == s.ID { mainIDIndex = i } sp := server dbg.Printf("Register Upload Handler: %s\n", sp.URL) - fp = server.Context.RegisterUploadHandler(func() { - _ = uploadRequest(_context, sp, 3) + td = server.Context.RegisterUploadHandler(func() { + atomic.AddInt64(&requestTimes, 1) + if err := uploadRequest(_context, sp, 3); err != nil { + atomic.AddInt64(&errorTimes, 1) + } }) } - fp.Start(cancel, mainIDIndex) // block here - s.ULSpeed = fp.manager.GetAvgUploadRate() + if td == nil { + return ErrorUninitializedManager + } + td.Start(cancel, mainIDIndex) // block here + s.ULSpeed = ByteRate(td.manager.GetEWMAUploadRate()) + if s.ULSpeed == 0 && float64(errorTimes)/float64(requestTimes) > 0.1 { + s.ULSpeed = -1 // N/A + } return nil } @@ -98,13 +123,21 @@ func (s *Server) downloadTestContext(ctx context.Context, downloadRequest downlo dbg.Println("Download test disabled") return nil } + var errorTimes int64 = 0 + var requestTimes int64 = 0 start := time.Now() _context, cancel := context.WithCancel(ctx) s.Context.RegisterDownloadHandler(func() { - _ = downloadRequest(_context, s, 3) + atomic.AddInt64(&requestTimes, 1) + if err := downloadRequest(_context, s, 3); err != nil { + atomic.AddInt64(&errorTimes, 1) + } }).Start(cancel, 0) duration := time.Since(start) - s.DLSpeed = s.Context.GetAvgDownloadRate() + s.DLSpeed = ByteRate(s.Context.GetEWMADownloadRate()) + if s.DLSpeed == 0 && float64(errorTimes)/float64(requestTimes) > 0.1 { + s.DLSpeed = -1 // N/A + } s.TestDuration.Download = &duration s.testDurationTotalCount() return nil @@ -125,13 +158,21 @@ func (s *Server) uploadTestContext(ctx context.Context, uploadRequest uploadFunc dbg.Println("Upload test disabled") return nil } + var errorTimes int64 = 0 + var requestTimes int64 = 0 start := time.Now() _context, cancel := context.WithCancel(ctx) s.Context.RegisterUploadHandler(func() { - _ = uploadRequest(_context, s, 4) + atomic.AddInt64(&requestTimes, 1) + if err := uploadRequest(_context, s, 4); err != nil { + atomic.AddInt64(&errorTimes, 1) + } }).Start(cancel, 0) duration := time.Since(start) - s.ULSpeed = s.Context.GetAvgUploadRate() + s.ULSpeed = ByteRate(s.Context.GetEWMAUploadRate()) + if s.ULSpeed == 0 && float64(errorTimes)/float64(requestTimes) > 0.1 { + s.ULSpeed = -1 // N/A + } s.TestDuration.Upload = &duration s.testDurationTotalCount() return nil @@ -163,11 +204,11 @@ func uploadRequest(ctx context.Context, s *Server, w int) error { size := ulSizes[w] dc := s.Context.NewChunk().UploadHandler(int64(size*100-51) * 10) req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.URL, dc) - req.ContentLength = dc.(*DataChunk).ContentLength - dbg.Printf("Len=%d, XulURL: %s\n", req.ContentLength, s.URL) if err != nil { return err } + req.ContentLength = dc.(*DataChunk).ContentLength + dbg.Printf("Len=%d, XulURL: %s\n", req.ContentLength, s.URL) req.Header.Set("Content-Type", "application/octet-stream") resp, err := s.Context.doer.Do(req) @@ -198,12 +239,12 @@ func (s *Server) PingTestContext(ctx context.Context, callback func(latency time return err } dbg.Printf("Before StandardDeviation: %v\n", vectorPingResult) - mean, _, std, min, max := StandardDeviation(vectorPingResult) + mean, _, std, minLatency, maxLatency := StandardDeviation(vectorPingResult) duration := time.Since(start) s.Latency = time.Duration(mean) * time.Nanosecond s.Jitter = time.Duration(std) * time.Nanosecond - s.MinLatency = time.Duration(min) * time.Nanosecond - s.MaxLatency = time.Duration(max) * time.Nanosecond + s.MinLatency = time.Duration(minLatency) * time.Nanosecond + s.MaxLatency = time.Duration(maxLatency) * time.Nanosecond s.TestDuration.Ping = &duration s.testDurationTotalCount() return nil diff --git a/speedtest/request_test.go b/speedtest/request_test.go index 08740a1..54b7190 100644 --- a/speedtest/request_test.go +++ b/speedtest/request_test.go @@ -29,8 +29,9 @@ func TestDownloadTestContext(t *testing.T) { if err != nil { t.Errorf(err.Error()) } - if server.DLSpeed < idealSpeed*(1-delta) || idealSpeed*(1+delta) < server.DLSpeed { - t.Errorf("got unexpected server.DLSpeed '%v', expected between %v and %v", server.DLSpeed, idealSpeed*(1-delta), idealSpeed*(1+delta)) + value := server.Context.Manager.GetAvgDownloadRate() + if value < idealSpeed*(1-delta) || idealSpeed*(1+delta) < value { + t.Errorf("got unexpected server.DLSpeed '%v', expected between %v and %v", value, idealSpeed*(1-delta), idealSpeed*(1+delta)) } if server.TestDuration.Download == nil || *server.TestDuration.Download != *server.TestDuration.Total { t.Errorf("can't count test duration, server.TestDuration.Download=%v, server.TestDuration.Total=%v", server.TestDuration.Download, server.TestDuration.Total) @@ -59,8 +60,9 @@ func TestUploadTestContext(t *testing.T) { if err != nil { t.Errorf(err.Error()) } - if server.ULSpeed < idealSpeed*(1-delta) || idealSpeed*(1+delta) < server.ULSpeed { - t.Errorf("got unexpected server.ULSpeed '%v', expected between %v and %v", server.ULSpeed, idealSpeed*(1-delta), idealSpeed*(1+delta)) + value := server.Context.Manager.GetAvgUploadRate() + if value < idealSpeed*(1-delta) || idealSpeed*(1+delta) < value { + t.Errorf("got unexpected server.ULSpeed '%v', expected between %v and %v", value, idealSpeed*(1-delta), idealSpeed*(1+delta)) } if server.TestDuration.Upload == nil || *server.TestDuration.Upload != *server.TestDuration.Total { t.Errorf("can't count test duration, server.TestDuration.Upload=%v, server.TestDuration.Total=%v", server.TestDuration.Upload, server.TestDuration.Total) diff --git a/speedtest/server.go b/speedtest/server.go index 03f47d3..e3be322 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -48,8 +48,8 @@ type Server struct { MaxLatency time.Duration `json:"max_latency"` MinLatency time.Duration `json:"min_latency"` Jitter time.Duration `json:"jitter"` - DLSpeed float64 `json:"dl_speed"` - ULSpeed float64 `json:"ul_speed"` + DLSpeed ByteRate `json:"dl_speed"` + ULSpeed ByteRate `json:"ul_speed"` TestDuration TestDuration `json:"test_duration"` Context *Speedtest `json:"-"` @@ -222,7 +222,7 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context) (Servers, error) return Servers{}, err } - payloadType := typeJSONPayload + _payloadType := typeJSONPayload if resp.ContentLength == 0 { _ = resp.Body.Close() @@ -237,14 +237,14 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context) (Servers, error) return Servers{}, err } - payloadType = typeXMLPayload + _payloadType = typeXMLPayload } defer resp.Body.Close() var servers Servers - switch payloadType { + switch _payloadType { case typeJSONPayload: // Decode xml decoder := json.NewDecoder(resp.Body) @@ -360,14 +360,14 @@ func (servers Servers) FindServer(serverID []int) (Servers, error) { if len(retServer) == 0 { // choose the lowest latency server - var min int64 = math.MaxInt64 + var minLatency int64 = math.MaxInt64 var minServerIndex int for index, server := range servers { if server.Latency <= 0 { continue } - if min > server.Latency.Milliseconds() { - min = server.Latency.Milliseconds() + if minLatency > server.Latency.Milliseconds() { + minLatency = server.Latency.Milliseconds() minServerIndex = index } } diff --git a/speedtest/server_test.go b/speedtest/server_test.go index 932d934..f857a2b 100644 --- a/speedtest/server_test.go +++ b/speedtest/server_test.go @@ -140,6 +140,10 @@ func TestCustomServer(t *testing.T) { if err != nil { t.Errorf(err.Error()) } + if got == nil { + t.Error("empty server") + return + } if got.Host != "example.com" { t.Error("did not properly set the Host field on a custom server") } diff --git a/speedtest/speedtest.go b/speedtest/speedtest.go index 2eac251..80354b1 100644 --- a/speedtest/speedtest.go +++ b/speedtest/speedtest.go @@ -13,7 +13,7 @@ import ( ) var ( - version = "1.6.12" + version = "1.7.0" DefaultUserAgent = fmt.Sprintf("showwin/speedtest-go %s", version) ) @@ -46,7 +46,8 @@ type UserConfig struct { Debug bool PingMode Proto - SavingMode bool + SavingMode bool + MaxConnections int CityFlag string LocationFlag string @@ -72,8 +73,9 @@ func (s *Speedtest) NewUserConfig(uc *UserConfig) { } if uc.SavingMode { - s.SetNThread(1) // Set the number of concurrent connections to 1 + uc.MaxConnections = 1 // Set the number of concurrent connections to 1 } + s.SetNThread(uc.MaxConnections) if len(uc.CityFlag) > 0 { var err error diff --git a/speedtest/unit.go b/speedtest/unit.go new file mode 100644 index 0000000..bec6d6d --- /dev/null +++ b/speedtest/unit.go @@ -0,0 +1,102 @@ +package speedtest + +import ( + "strconv" +) + +type UnitType int + +// IEC and SI +const ( + UnitTypeDecimalBits = UnitType(iota) // auto scaled + UnitTypeDecimalBytes // auto scaled + UnitTypeBinaryBits // auto scaled + UnitTypeBinaryBytes // auto scaled + UnitTypeDefaultMbps // fixed +) + +var ( + DecimalBitsUnits = []string{"bps", "Kbps", "Mbps", "Gbps"} + DecimalBytesUnits = []string{"B/s", "KB/s", "MB/s", "GB/s"} + BinaryBitsUnits = []string{"Kibps", "KiMbps", "KiGbps"} + BinaryBytesUnits = []string{"KiB/s", "MiB/s", "GiB/s"} +) + +var unitMaps = map[UnitType][]string{ + UnitTypeDecimalBits: DecimalBitsUnits, + UnitTypeDecimalBytes: DecimalBytesUnits, + UnitTypeBinaryBits: BinaryBitsUnits, + UnitTypeBinaryBytes: BinaryBytesUnits, +} + +const ( + B = 1.0 + KB = 1000 * B + MB = 1000 * KB + GB = 1000 * MB + + IB = 1 + KiB = 1024 * IB + MiB = 1024 * KiB + GiB = 1024 * MiB +) + +type ByteRate float64 + +var globalByteRateUnit UnitType + +func (r ByteRate) String() string { + if r == 0 { + return "0.00 Mbps" + } + if r == -1 { + return "N/A" + } + if globalByteRateUnit != UnitTypeDefaultMbps { + return r.Byte(globalByteRateUnit) + } + return strconv.FormatFloat(float64(r/125000.0), 'f', 2, 64) + " Mbps" +} + +// SetUnit Set global output units +func SetUnit(unit UnitType) { + globalByteRateUnit = unit +} + +// Byte Specifies the format output byte rate +func (r ByteRate) Byte(formatType UnitType) string { + if r == 0 { + return "0.00 Mbps" + } + if r == -1 { + return "N/A" + } + return format(float64(r), formatType) +} + +func format(byteRate float64, i UnitType) string { + val := byteRate + if i%2 == 0 { + val *= 8 + } + if i < UnitTypeBinaryBits { + switch { + case byteRate >= GB: + return strconv.FormatFloat(val/GB, 'f', 2, 64) + " " + unitMaps[i][3] + case byteRate >= MB: + return strconv.FormatFloat(val/MB, 'f', 2, 64) + " " + unitMaps[i][2] + case byteRate >= KB: + return strconv.FormatFloat(val/KB, 'f', 2, 64) + " " + unitMaps[i][1] + default: + return strconv.FormatFloat(val/B, 'f', 2, 64) + " " + unitMaps[i][0] + } + } + switch { + case byteRate >= GiB: + return strconv.FormatFloat(val/GiB, 'f', 2, 64) + " " + unitMaps[i][2] + case byteRate >= MiB: + return strconv.FormatFloat(val/MiB, 'f', 2, 64) + " " + unitMaps[i][1] + default: + return strconv.FormatFloat(val/KiB, 'f', 2, 64) + " " + unitMaps[i][0] + } +} diff --git a/speedtest/unit_test.go b/speedtest/unit_test.go new file mode 100644 index 0000000..7c9c777 --- /dev/null +++ b/speedtest/unit_test.go @@ -0,0 +1,60 @@ +package speedtest + +import ( + "testing" +) + +func BenchmarkFmt(b *testing.B) { + bt := ByteRate(1002031.0) + for i := 0; i < b.N; i++ { + _ = bt.Byte(UnitTypeDecimalBits) + } +} + +func BenchmarkDefaultFmt(b *testing.B) { + bt := ByteRate(1002031.0) + for i := 0; i < b.N; i++ { + _ = bt.String() + } +} + +func TestFmt(t *testing.T) { + testData := []struct { + rate ByteRate + format string + t UnitType + }{ + {123123123.123, "984.98 Mbps", UnitTypeDecimalBits}, + {1231231231.123, "9.85 Gbps", UnitTypeDecimalBits}, + {123123.123, "984.98 Kbps", UnitTypeDecimalBits}, + {123.1, "984.80 bps", UnitTypeDecimalBits}, + + {123123123.123, "123.12 MB/s", UnitTypeDecimalBytes}, + {1231231231.123, "1.23 GB/s", UnitTypeDecimalBytes}, + {123123.123, "123.12 KB/s", UnitTypeDecimalBytes}, + {123.1, "123.10 B/s", UnitTypeDecimalBytes}, + + {123123123.123, "939.35 KiMbps", UnitTypeBinaryBits}, + {1231231231.123, "9.17 KiGbps", UnitTypeBinaryBits}, + {123123.123, "961.90 Kibps", UnitTypeBinaryBits}, + {123.1, "0.96 Kibps", UnitTypeBinaryBits}, + + {123123123.123, "117.42 MiB/s", UnitTypeBinaryBytes}, + {1231231231.123, "1.15 GiB/s", UnitTypeBinaryBytes}, + {123123.123, "120.24 KiB/s", UnitTypeBinaryBytes}, + {123.1, "0.12 KiB/s", UnitTypeBinaryBytes}, + + {-1, "N/A", UnitTypeBinaryBytes}, + {0, "0.00 Mbps", UnitTypeDecimalBits}, + } + + if testData[0].rate.String() != testData[0].format { + t.Errorf("got: %s, want: %s", testData[0].rate.String(), testData[0].format) + } + + for _, v := range testData { + if got := v.rate.Byte(v.t); got != v.format { + t.Errorf("got: %s, want: %s", got, v.format) + } + } +} diff --git a/speedtest/user_test.go b/speedtest/user_test.go index 6ed5c80..28e56ef 100644 --- a/speedtest/user_test.go +++ b/speedtest/user_test.go @@ -13,6 +13,10 @@ func TestFetchUserInfo(t *testing.T) { if err != nil { t.Errorf(err.Error()) } + if user == nil { + t.Error("empty user info") + return + } // IP if len(user.IP) < 7 || len(user.IP) > 15 { t.Errorf("invalid IP length. got: %v;", user.IP)