-
Notifications
You must be signed in to change notification settings - Fork 6
/
process.rb
executable file
·263 lines (226 loc) · 9.67 KB
/
process.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
#!/usr/bin/env ruby
# This is a simple Ruby data processing library for (specifically)
# use with RSB. There's a similar but different one for RRB.
#
# For simple use, you can either use the default cohorts, which
# group by the Ruby version, the URL, the server command and
# the amount of time benchmarked. In some cases you'll want to
# provide a different division into cohorts.
#
# In my canonical benchmarking, a "normal" run tends to have an
# error rate of zero in 180 seconds. So if you're seeing anything
# that exceeds the threshold here (0.01%, or 1 per 10k requests),
# something's up.
require "json"
require "optparse"
cohorts_by = "rvm current,warmup_seconds,benchmark_seconds,server_cmd,url"
input_glob = "rsb_*.json"
error_proportion = 0.0001 # Default to 0.01% of requests in any single file may have an error
permissive_cohorts = false
include_raw_data = false
OptionParser.new do |opts|
opts.banner = "Usage: ruby process.rb [options]"
opts.on("-c", "--cohorts-by COHORTS", "Comma-separated variables to partition data by, incl. RUBY_VERSION,warmup_iterations,etc.") do |c|
cohorts_by = c
end
opts.on("-i", "--input-glob GLOB", "File pattern to match on (default #{input_glob})") do |s|
input_glob = s
end
opts.on("-e PROPORTION", "--error-tolerance PROPORTION", "Error tolerance in analysis as a proportion of requests per data file -- defaults to 0.0001, or 0.01% of requests in a particular file may have an error.") do |p|
error_proportion = p.to_f
end
opts.on("-p", "--permissive-cohorts", "Allow cohort components to be NULL for a particular file or sample") do
permissive_cohorts = true
end
opts.on("--include-raw-data", "Include all latencies in final output file") do
include_raw_data = true
end
end.parse!
OUTPUT_FILE = "process_output.json"
cohort_indices = cohorts_by.strip.split(",")
req_time_by_cohort = {}
req_rates_by_cohort = {}
throughput_by_cohort = {}
errors_by_cohort = {}
INPUT_FILES = Dir[input_glob]
process_output = {
cohort_indices: cohort_indices,
input_files: INPUT_FILES,
#req_time_by_cohort: req_time_by_cohort,
throughput_by_cohort: throughput_by_cohort,
#startup_by_cohort: startup_by_cohort,
processed: {
:cohort => {},
},
}
# wrk encodes its arrays as (value, count) pairs, which get
# dumped into a long single array by wrk_bench. This method
# reencodes as simple Ruby arrays.
def run_length_array_to_simple_array(input)
out = []
input.each_slice(2) do |val, count|
out.concat([val] * count)
end
out
end
error_total = 0
INPUT_FILES.each do |f|
begin
d = JSON.load File.read(f)
rescue JSON::ParserError
raise "Error parsing JSON in file: #{f.inspect}"
end
# Assign a cohort to these samples
cohort = cohort_indices.map do |cohort_elt|
raise "Unexpected file format for file #{f.inspect}!" unless d && d["settings"] && d["environment"]
item = nil
if d["settings"].has_key?(cohort_elt)
item = d["settings"][cohort_elt]
elsif d["environment"].has_key?(cohort_elt)
item = d["environment"][cohort_elt]
else
if permissive_cohorts
cohort_elt = ""
else
raise "Can't find setting or environment object #{cohort_elt} in file #{f.inspect}!"
end
end
item
end.freeze
# Reject incorrect versions of data format
if d["version"] != "wrk:2"
raise "Unrecognized data version #{d["version"].inspect} in JSON file #{f.inspect}!"
end
latencies = run_length_array_to_simple_array d["requests"]["benchmark"]["latencies"]
req_rates = run_length_array_to_simple_array d["requests"]["benchmark"]["req_per_sec"]
errors = d["requests"]["benchmark"]["errors"]
if errors.values.any? { |e| e > 0 }
errors_in_file = errors.values.inject(0, &:+)
error_total += errors_in_file
error_rate = errors_in_file.to_f / latencies.size
if error_rate > error_proportion
raise "Error rate of #{error_rate.inspect} exceeds maximum of #{error_proportion}! Raise the maximum with -e or throw away file #{f.inspect}!"
end
end
duration = d["settings"]["benchmark_seconds"]
if duration.nil? || duration < 0.00001
raise "Problem with duration (#{duration.inspect}), file #{f.inspect}, cohort #{cohort.join(", ")}"
end
req_time_by_cohort[cohort] ||= []
req_time_by_cohort[cohort].concat latencies
req_rates_by_cohort[cohort] ||= []
req_rates_by_cohort[cohort].concat req_rates
throughput_by_cohort[cohort] ||= []
throughput_by_cohort[cohort].push (latencies.size.to_f / duration)
errors_by_cohort[cohort] ||= []
errors_by_cohort[cohort].push errors
end
def percentile(list, pct)
len = list.length
how_far = pct * 0.01 * (len - 1)
prev_item = how_far.to_i
return list[prev_item] if prev_item >= len - 1
return list[0] if prev_item < 0
linear_combination = how_far - prev_item
list[prev_item] + (list[prev_item + 1] - list[prev_item]) * linear_combination
end
def array_mean(arr)
return nil if arr.empty?
arr.inject(0.0, &:+) / arr.size
end
# Calculate variance based on the Wikipedia article of algorithms for variance.
# https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
# Includes Bessel's correction.
def array_variance(arr)
n = arr.size
return nil if arr.empty? || n < 2
ex = ex2 = 0.0
arr0 = arr[0].to_f
arr.each do |x|
diff = x - arr0
ex += diff
ex2 += diff * diff
end
(ex2 - (ex * ex) / arr.size) / (arr.size - 1)
end
req_time_by_cohort.keys.sort.each do |cohort|
latencies = req_time_by_cohort[cohort].map { |num| num / 1_000_000.0 }.sort
rates = req_rates_by_cohort[cohort].sort
throughputs = throughput_by_cohort[cohort].sort
cohort_printable = cohort_indices.zip(cohort).map { |a, b| "#{a}: #{b}" }.join(", ")
print "=====\nCohort: #{cohort_printable}, # of requests: #{latencies.size} http requests, #{throughputs.size} batches\n"
process_output[:processed][:cohort][cohort] = {
request_percentiles: {},
rate_percentiles: {},
throughputs: throughputs,
cohort_samples: latencies.size,
cohort_batches: throughputs.size,
}
if include_raw_data
process_output[:processed][:cohort][cohort][:latencies] = latencies
process_output[:processed][:cohort][cohort][:request_rates] = rates
end
print "--\n Request latencies:\n"
(0..100).each do |p|
process_output[:processed][:cohort][cohort][:request_percentiles][p.to_s] = percentile(latencies, p)
print " #{"%2d" % p}%ile: #{percentile(latencies, p)}\n" if p % 5 == 0
end
variance = array_variance(latencies)
print " Mean: #{array_mean(latencies).inspect} Median: #{percentile(latencies, 50).inspect} Variance: #{variance.inspect} StdDev: #{Math.sqrt(variance).inspect}\n"
process_output[:processed][:cohort][cohort][:latency_mean] = array_mean(latencies)
process_output[:processed][:cohort][cohort][:latency_median] = percentile(latencies, 50)
process_output[:processed][:cohort][cohort][:latency_variance] = variance
print "--\n Requests/Second Rates:\n"
(0..20).each do |i|
p = i * 5
process_output[:processed][:cohort][cohort][:rate_percentiles][p.to_s] = percentile(rates, p)
print " #{"%2d" % p}%ile: #{percentile(rates, p)}\n"
end
variance = array_variance(rates)
std_dev = variance.nil? ? nil : Math.sqrt(variance)
STDERR.puts "Variance: #{variance.inspect}"
print " Mean: #{array_mean(rates).inspect} Median: #{percentile(rates, 50).inspect} Variance: #{variance.inspect} StdDev: #{std_dev.inspect}\n"
process_output[:processed][:cohort][cohort][:rate_mean] = array_mean(rates)
process_output[:processed][:cohort][cohort][:rate_median] = percentile(rates, 50)
process_output[:processed][:cohort][cohort][:rate_variance] = array_variance(rates)
print "--\n Throughput in reqs/sec for each full run:\n"
if throughputs.size == 1
# Only one run means no variance or standard deviation
print " Mean: #{array_mean(throughputs).inspect} Median: #{percentile(throughputs, 50).inspect}\n"
process_output[:processed][:cohort][cohort][:throughput_mean] = array_mean(throughputs)
process_output[:processed][:cohort][cohort][:throughput_median] = percentile(throughputs, 50)
process_output[:processed][:cohort][cohort][:throughput_variance] = array_variance(throughputs)
else
variance = array_variance(throughputs)
print " Mean: #{array_mean(throughputs).inspect} Median: #{percentile(throughputs, 50).inspect} Variance: #{variance} StdDev: #{Math.sqrt(variance)}\n"
process_output[:processed][:cohort][cohort][:throughput_mean] = array_mean(throughputs)
process_output[:processed][:cohort][cohort][:throughput_median] = percentile(throughputs, 50)
process_output[:processed][:cohort][cohort][:throughput_variance] = variance
end
print " #{throughputs.inspect}\n\n"
print "--\n Error rates:\n"
errors_by_type = {
"connect" => 0,
"read" => 0,
"write" => 0,
"status" => 0,
"timeout" => 0,
}
errors_by_cohort[cohort].each { |e| e.each { |k, v| errors_by_type[k] += v }}
error_total = errors_by_cohort[cohort].map { |e| e.values.inject(0, &:+) }.inject(0, &:+)
process_output[:processed][:cohort][cohort][:error_total] = error_total
process_output[:processed][:cohort][cohort][:error_rate] = error_total.to_f / latencies.size
process_output[:processed][:cohort][cohort][:errors_by_type] = errors_by_type
print " Cohort rate: #{error_total.to_f / latencies.size}, cohort total errors: #{error_total}\n"
print " By type:\n"
print " Connect: #{errors_by_type["connect"]}\n"
print " Read: #{errors_by_type["read"]}\n"
print " Write: #{errors_by_type["write"]}\n"
print " HTTP Status: #{errors_by_type["status"]}\n"
print " Timeout: #{errors_by_type["timeout"]}\n"
print "\n\n"
end
print "******************\n"
File.open(OUTPUT_FILE, "w") do |f|
f.print JSON.pretty_generate(process_output)
end