diff --git a/gnocchi/storage/ceph.py b/gnocchi/storage/ceph.py index 9b9f93c44..69a0f223a 100644 --- a/gnocchi/storage/ceph.py +++ b/gnocchi/storage/ceph.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. import collections +import daiquiri from oslo_config import cfg @@ -42,6 +43,11 @@ rados = ceph.rados +LOG = daiquiri.getLogger(__name__) + +DEFAULT_RADOS_BUFFER_SIZE = 8192 +MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE = {} + class CephStorage(storage.StorageDriver): WRITE_FULL = False @@ -88,6 +94,13 @@ def _store_metric_splits(self, metrics_keys_aggregations_data_offset, for key, agg, data, offset in keys_aggregations_data_offset: name = self._get_object_name( metric, key, agg.method, version) + metric_size = len(data) + + if metric_size > DEFAULT_RADOS_BUFFER_SIZE: + MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE[name] = metric_size + LOG.debug( + "Storing time series size [%s] for metric [%s].", + metric_size, name) if offset is None: self.ioctx.write_full(name, data) else: @@ -153,7 +166,14 @@ def _get_splits_unbatched(self, metric, key, aggregation, version=3): try: name = self._get_object_name( metric, key, aggregation.method, version) - return self._get_object_content(name) + + metric_size = MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE.get( + name, DEFAULT_RADOS_BUFFER_SIZE) + + LOG.debug("Reading metric [%s] with buffer size of [%s].", + name, metric_size) + + return self._get_object_content(name, buffer_size=metric_size) except rados.ObjectNotFound: return @@ -206,9 +226,16 @@ def _build_unaggregated_timeserie_path(metric, version): def _get_or_create_unaggregated_timeseries_unbatched( self, metric, version=3): + metric_name = self._build_unaggregated_timeserie_path(metric, version) + metric_size = MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE.get( + metric_name, DEFAULT_RADOS_BUFFER_SIZE) + + LOG.debug("Reading unaggregated metric [%s] with buffer size of [%s].", + metric_name, metric_size) + try: contents = self._get_object_content( - self._build_unaggregated_timeserie_path(metric, version)) + metric_name, buffer_size=metric_size) except rados.ObjectNotFound: self._create_metric(metric) else: @@ -218,14 +245,23 @@ def _get_or_create_unaggregated_timeseries_unbatched( def _store_unaggregated_timeseries_unbatched( self, metric, data, version=3): - self.ioctx.write_full( - self._build_unaggregated_timeserie_path(metric, version), data) - def _get_object_content(self, name): + metric_name = self._build_unaggregated_timeserie_path(metric, version) + metric_size = len(data) + + if metric_size > DEFAULT_RADOS_BUFFER_SIZE: + MAP_UNAGGREGATED_METRIC_NAME_BY_SIZE[metric_name] = metric_size + LOG.debug( + "Storing unaggregated time series size [%s] for metric [%s]", + metric_size, metric_name) + self.ioctx.write_full(metric_name, data) + + def _get_object_content(self, name, buffer_size=DEFAULT_RADOS_BUFFER_SIZE): offset = 0 content = b'' + while True: - data = self.ioctx.read(name, offset=offset) + data = self.ioctx.read(name, length=buffer_size, offset=offset) if not data: break content += data