Skip to content

Commit

Permalink
incorporate free storage metrics into autoscaler tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
akumar1214 committed Oct 31, 2024
1 parent 303f029 commit c07e39f
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module ElasticGraph

def get_cluster_health: () -> ::Hash[::String, untyped]
def get_node_os_stats: () -> ::Hash[::String, untyped]
def get_node_os_roles: () -> ::Hash[::String, untyped]
def get_flat_cluster_settings: () -> ::Hash[::String, untyped]
def put_persistent_cluster_settings: (::Hash[::Symbol | ::String, untyped]) -> void

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def get_cluster_health
def get_node_os_stats
transform_errors { |c| c.nodes.stats(metric: "os").body }
end

def get_node_roles
transform_errors { |c| c.nodes.stats(metric: "roles") }
end

def get_flat_cluster_settings
transform_errors { |c| c.cluster.get_settings(flat_settings: true).body }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def define_stubs(stub, requested_stubs)
stub.get("/_cluster/health") { |env| response_for(body, env) }
in :get_node_os_stats
stub.get("/_nodes/stats/os") { |env| response_for(body, env) }
in :get_node_roles
stub.get("/_nodes/stats/roles") { |env| response_for(body, env) }
in :get_flat_cluster_settings
stub.get("/_cluster/settings?flat_settings=true") { |env| response_for(body, env) }
in :put_persistent_cluster_settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego

spec.add_dependency "aws-sdk-lambda", "~> 1.125"
spec.add_dependency "aws-sdk-sqs", "~> 1.80"
spec.add_dependency "aws-sdk-cloudwatch", "~> 1.10"

# aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+
# we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block)
def initialize(
datastore_core:,
sqs_client: nil,
lambda_client: nil
lambda_client: nil,
cloudwatch_client: nil
)
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

def sqs_client
Expand All @@ -53,13 +55,21 @@ def lambda_client
end
end

def cloudwatch_client
@cloudwatch_client ||= begin
require "aws-sdk-cloudwatch"
Aws::CloudWatch::Client.new
end
end

def concurrency_scaler
@concurrency_scaler ||= begin
require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler"
ConcurrencyScaler.new(
datastore_core: @datastore_core,
sqs_client: sqs_client,
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ module ElasticGraph
class IndexerAutoscalerLambda
# @private
class ConcurrencyScaler
def initialize(datastore_core:, sqs_client:, lambda_client:)
def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:)
@logger = datastore_core.logger
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

MINIMUM_CONCURRENCY = 2

def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:)
def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, minimum_free_storage:, indexer_function_name:)
queue_attributes = get_queue_attributes(queue_urls)
queue_arns = queue_attributes.fetch(:queue_arns)
num_messages = queue_attributes.fetch(:total_messages)
Expand All @@ -37,6 +38,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi

new_target_concurrency =
if num_messages.positive?
free_storage = get_min_free_storage

cpu_utilization = get_max_cpu_utilization
cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0

Expand All @@ -45,27 +48,33 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
if current_concurrency.nil?
details_logger.log_unset
nil
elsif free_storage < minimum_free_storage
details_logger.log_pause(free_storage)
0
elsif cpu_utilization < min_cpu_target
increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5)
(current_concurrency * increase_factor).round.tap do |new_concurrency|
details_logger.log_increase(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
new_concurrency: new_concurrency,
)
end
elsif cpu_utilization > max_cpu_target
decrease_factor = cpu_utilization / cpu_midpoint - 1
(current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency|
details_logger.log_decrease(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
end
else
details_logger.log_no_change(
cpu_utilization: cpu_utilization,
min_free_storage: free_storage,
current_concurrency: current_concurrency
)
current_concurrency
Expand Down Expand Up @@ -94,6 +103,43 @@ def get_max_cpu_utilization
end.max.to_f
end

def get_min_free_storage
metric_data_queries = get_data_node_ids_by_cluster_name.map(&:first).map do |cluster_name, node_id|
{
id: node_id,
metric_stat: {
metric: {
namespace: 'AWS/ES',
metric_name: 'FreeStorageSpace',
dimensions: [
{ name: 'DomainName', value: cluster_name },
{ name: 'NodeId', value: node_id }
]
},
period: 60,
stat: 'Minimum'
},
return_data: true
}
end

metric_response = @cloudwatch_client.get_metric_data({
start_time: ::Time.now - 3600,
end_time: ::Time.now,
metric_data_queries: metric_data_queries
})

metric_response.metric_data_results.map { |result| result.values.first }.min / (1024 * 1024) # result is in bytes
end

def get_data_node_ids_by_cluster_name
@datastore_core.clients_by_name.flat_map do |name, client|
client.get_node_roles.map do |id, roles|
roles["roles"].include?("data") ? { name => id } : nil
end
end.compact
end

def get_queue_attributes(queue_urls)
attributes_per_queue = queue_urls.map do |queue_url|
@sqs_client.get_queue_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,39 @@ def initialize(
}
end

def log_increase(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_increase(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:)
log_result({
"action" => "increase",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_decrease(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_decrease(cpu_utilization:, min_free_storage:, current_concurrency:, new_concurrency:)
log_result({
"action" => "decrease",
"cpu_utilization" => cpu_utilization,
"min_free_storage" => min_free_storage,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_no_change(cpu_utilization:, current_concurrency:)
def log_no_change(cpu_utilization:, min_free_storage:, current_concurrency:)
log_result({
"action" => "no_change",
"cpu_utilization" => cpu_utilization,
"current_concurrency" => current_concurrency
"min_free_storage" => min_free_storage,
"current_concurrency" => current_concurrency,
})
end

def log_pause(min_free_storage)
log_result({
"action" => "pause",
"min_free_storage" => min_free_storage
})
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def handle_request(event:, context:)
min_cpu_target: event.fetch("min_cpu_target"),
max_cpu_target: event.fetch("max_cpu_target"),
maximum_concurrency: event.fetch("maximum_concurrency"),
minimum_free_storage: event.fetch("minimum_free_storage"),
indexer_function_name: event.fetch("indexer_function_name")
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module ElasticGraph
def initialize: (
datastore_core: DatastoreCore,
sqs_client: Aws::SQS::Client,
lambda_client: Aws::Lambda::Client
lambda_client: Aws::Lambda::Client,
cloudwatch_client: Aws::CloudWatch::Client
) -> void

MINIMUM_CONCURRENCY: ::Integer
Expand All @@ -14,6 +15,7 @@ module ElasticGraph
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
maximum_concurrency: ::Integer,
minimum_free_storage: ::Integer,
indexer_function_name: ::String
) -> void

Expand All @@ -23,8 +25,11 @@ module ElasticGraph
@datastore_core: DatastoreCore
@sqs_client: Aws::SQS::Client
@lambda_client: Aws::Lambda::Client
@cloudwatch_client: Aws::CloudWatch::Client

def get_max_cpu_utilization: () -> ::Float
def get_min_free_storage: () -> ::Float
def get_data_node_ids_by_cluster_name: () -> ::Hash[::String, ::String]
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_concurrency: (::String) -> ::Integer?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,26 @@ module ElasticGraph

def log_increase: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_decrease: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_no_change: (
cpu_utilization: ::Float,
min_free_storage: ::Float,
current_concurrency: ::Integer
) -> void

def log_pause: (::String) -> void

def log_reset: () -> void

def log_unset: () -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module ElasticGraph
datastore_core: DatastoreCore,
?sqs_client: Aws::SQS::Client?,
?lambda_client: Aws::Lambda::Client?,
?cloudwatch_client: Aws::CloudWatch::Client?,
) -> void

@sqs_client: Aws::SQS::Client?
Expand All @@ -19,6 +20,9 @@ module ElasticGraph
@lambda_client: Aws::Lambda::Client?
def lambda_client: () -> Aws::Lambda::Client

@cloudwatch_client: Aws::CloudWatch::Client?
def cloudwatch_client: () -> Aws::CloudWatch::Client

@concurrency_scaler: ConcurrencyScaler?
def concurrency_scaler: () -> ConcurrencyScaler
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module BuildsIndexerAutoscalerLambda
def build_indexer_autoscaler(
sqs_client: nil,
lambda_client: nil,
cloudwatch_client: nil,
**datastore_core_options,
&customize_datastore_config
)
Expand All @@ -28,6 +29,7 @@ def build_indexer_autoscaler(
IndexerAutoscalerLambda.new(
sqs_client: sqs_client,
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client,
datastore_core: datastore_core
)
end
Expand Down
Loading

0 comments on commit c07e39f

Please sign in to comment.