Skip to content

Commit

Permalink
Optimize Gnocchi aggregates API (#1307)
Browse files Browse the repository at this point in the history
* Optimize Gnocchi aggregates API

Gnocchi aggregates API can present slowness with time, due to the growing number of resources and revisions in large scale OpenStack Cloud environments. This happens due to the situation where Gnocchi does not apply filters in the MySQL queries in the resource tables. I mean, start/stop filters are not applied, and others as well, which makes Gnocchi to manipulate all dataset in the MySQL database and then the filtering is executed in Python.

To cope with that situation, we are proposing an optimization to improve the query executed by Gnocchi in MySQL, and apply the filtering right away; thus, the dataset manipulated by MySQL will be drastically reduced. After applying the patch, we noticed a reduction of 5-6 folds in teh response time for the Gnocchi aggregates API.

* Use proper start timeframe when handling different resources together

* address Daniel's review

* Add some extra information regarding re-aggregation with the use of use_history

* Chungg review

* revert a change that does not work

* Remove documentation change
  • Loading branch information
rafaelweingartner authored Mar 22, 2024
1 parent 57cbfa8 commit 7a289c9
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 40 deletions.
2 changes: 2 additions & 0 deletions doc/source/rest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@
"image_ref": "http://image",
"host": "compute1",
"display_name": "myvm2",
"started_at": "2014-10-06T14:00:02.000000",
"server_group": "my_autoscaling_group",
"metrics": {"cpu.util": "{{ scenarios['create-metric']['response'].json['id'] }}"}
}
Expand All @@ -608,6 +609,7 @@
"image_ref": "http://image",
"host": "compute2",
"display_name": "myvm3",
"started_at": "2014-10-06T14:00:02.000000",
"server_group": "my_autoscaling_group",
"metrics": {"cpu.util": {"archive_policy_name": "{{ scenarios['create-archive-policy']['response'].json['name'] }}"}}
}
Expand Down
154 changes: 150 additions & 4 deletions gnocchi/indexer/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.

import copy
import datetime
import itertools
import operator
Expand Down Expand Up @@ -1071,7 +1072,100 @@ def get_resource(self, resource_type, resource_id, with_metrics=False):
q = q.options(sqlalchemy.orm.joinedload(Resource.metrics))
return session.scalars(q).first()

def _get_history_result_mapper(self, session, resource_type):
def extracts_filters_for_table(self, attribute_filter,
allowed_keys_for_table=[
'creator', 'started_at', 'ended_at',
'user_id', 'project_id',
'original_resource_id', 'id', 'type']):
"""Extracts the filters for resource history table.
Extracts the filters that can be used in the resource history table to
apply in the aggregates query that we execute in the database.
"""

attribute_filters_to_use = copy.deepcopy(attribute_filter)

LOG.debug("Executing the processing of attributes filters [%s] for "
"resource history table.", attribute_filters_to_use)

is_value_list = isinstance(attribute_filters_to_use, list)
is_value_dict = isinstance(attribute_filters_to_use, dict)
is_value_dict_or_list = (is_value_dict or is_value_list)

if not is_value_dict_or_list:
LOG.debug("Attribute filter [%s] is not of expected types [list "
"or dict]. Therefore, we do not do anything with it.",
attribute_filters_to_use)
return attribute_filters_to_use

if is_value_list:
for attribute in attribute_filter:
LOG.debug("Sending attribute filter [%s] to be processed, "
"as it is part of a list of attribute filters.",
attribute)

value_sanitized = self.extracts_filters_for_table(
attribute, allowed_keys_for_table=allowed_keys_for_table)

if not value_sanitized:
LOG.debug("Value [%s] was totally cleaned after being "
"sanitized. Therefore, we remove it from our "
"attribute filter list.", attribute)
attribute_filters_to_use.remove(attribute)
else:
LOG.debug("Replacing value [%s] in list with the sanitized"
"value [%s] in its current position.",
attribute, value_sanitized)
value_index = attribute_filters_to_use.index(attribute)
attribute_filters_to_use[value_index] = value_sanitized

elif is_value_dict:
all_keys = list(attribute_filter.keys())
for key in all_keys:
value = attribute_filter.get(key)

# The value is a leaf when it is not of type dict of list.
is_value_leaf = not (isinstance(
value, dict) or isinstance(value, list))

if key not in allowed_keys_for_table and is_value_leaf:
attribute_being_remove = attribute_filters_to_use.pop(key)
LOG.debug('Removing attribute [%s] with value [%s] from '
'attributes [%s] as it is not an expected key '
'value [%s].', key, attribute_being_remove,
attribute_filter,
allowed_keys_for_table)
else:
LOG.debug("Sending attribute [key=%s, value=%s] from "
"dictionary to be processed.", key, value)
value_sanitized = self.extracts_filters_for_table(
value, allowed_keys_for_table=allowed_keys_for_table)

is_value_changed = value != value_sanitized
if not is_value_changed:
LOG.debug("Value [%s] for key [%s] did not changed. "
"Therefore, we go for the next iteration.",
value, key)
continue
if not value_sanitized:
LOG.debug("Value from dict [%s] was totally cleaned after"
" being sanitized. Therefore, we remove it from "
"our attribute filter dictionary.", value)
attribute_filters_to_use.pop(key)
else:
LOG.debug("Replacing attribute [%s] in dict, with "
"sanitized data [%s]. Old value was [%s].",
key, value_sanitized, value)
attribute_filters_to_use[key] = value_sanitized
else:
LOG.debug("This condition should never happen. Attribute filter [%s] "
"is not of expected types [list or dict].",
attribute_filters_to_use)
return attribute_filters_to_use

def _get_history_result_mapper(self, session, resource_type,
attribute_filter=None):

mappers = self._resource_type_to_mappers(session, resource_type)
resource_cls = mappers['resource']
history_cls = mappers['history']
Expand All @@ -1089,8 +1183,53 @@ def _get_history_result_mapper(self, session, resource_type):
s1 = select(*history_cols.values())
s2 = select(*resource_cols.values())
if resource_type != "generic":
engine = session.connection()

# Get the available columns for ResourceHistory table
resource_history_filters_names = list(map(
lambda column: column.name, sqlalchemy.inspect(
ResourceHistory).columns))

history_filters = self.extracts_filters_for_table(
attribute_filter,
allowed_keys_for_table=resource_history_filters_names)

LOG.debug("Filters to be used [%s] in query according to resource "
"history columns [%s] extracted from [%s].",
history_filters, resource_history_filters_names,
attribute_filter)

# Get the available columns for Resource table
resource_filter_names = list(map(
lambda column: column.name, sqlalchemy.inspect(
Resource).columns))

resource_filters = self.extracts_filters_for_table(
attribute_filter, allowed_keys_for_table=resource_filter_names)
LOG.debug("Filters to be used [%s] in query according to resource "
"columns [%s] extracted from [%s].", resource_filters,
resource_filter_names, attribute_filter)

s1 = s1.where(history_cls.revision == ResourceHistory.revision)
if history_filters:
f1 = QueryTransformer.build_filter(
engine.dialect.name, ResourceHistory, history_filters)
s1 = s1.filter(f1)
else:
LOG.debug("No filters supplied to be applied for the resource "
"history table. Attribute filters: [%s].",
attribute_filter)

s2 = s2.where(resource_cls.id == Resource.id)

if resource_filters:
f2 = QueryTransformer.build_filter(
engine.dialect.name, Resource, resource_filters)
s2 = s2.filter(f2)
else:
LOG.debug("No filters supplied to be applied for the resource "
"table. Attribute filters: [%s].", attribute_filter)

union_stmt = sqlalchemy.union(s1, s2)
stmt = union_stmt.alias("result")

Expand Down Expand Up @@ -1124,7 +1263,7 @@ def list_resources(self, resource_type='generic',
with self.facade.independent_reader() as session:
if history:
target_cls = self._get_history_result_mapper(
session, resource_type)
session, resource_type, attribute_filter)
unique_keys = ["id", "revision"]
else:
target_cls = self._resource_type_to_mappers(
Expand Down Expand Up @@ -1185,8 +1324,13 @@ def list_resources(self, resource_type='generic',

# Always include metrics
q = q.options(sqlalchemy.orm.joinedload(target_cls.metrics))

LOG.debug("Executing query [%s] to search for resources.", q)
all_resources = session.scalars(q).unique().all()

LOG.debug("Resources [quantity=%s] [%s] found with query: [%s].",
len(all_resources), all_resources, q)

if details:
grouped_by_type = itertools.groupby(
all_resources, lambda r: (r.revision != -1, r.type))
Expand All @@ -1213,9 +1357,11 @@ def list_resources(self, resource_type='generic',

q = select(target_cls).filter(f)
# Always include metrics
q = q.options(sqlalchemy.orm.joinedload(target_cls.metrics))
q = q.options(sqlalchemy.orm.joinedload(
target_cls.metrics))
try:
all_resources.extend(session.scalars(q).unique().all())
all_resources.extend(
session.scalars(q).unique().all())
except sqlalchemy.exc.ProgrammingError as e:
# NOTE(jd) This exception can happen when the
# resources and their resource type have been
Expand Down
Loading

0 comments on commit 7a289c9

Please sign in to comment.