diff --git a/gnocchi/incoming/__init__.py b/gnocchi/incoming/__init__.py index bc59f3ad2..0efbb3708 100644 --- a/gnocchi/incoming/__init__.py +++ b/gnocchi/incoming/__init__.py @@ -43,6 +43,9 @@ class IncomingDriver(object): SACK_PREFIX = "incoming" CFG_PREFIX = 'gnocchi-config' CFG_SACKS = 'sacks' + # NOTE(sileht): By default we use threads, but some driver can disable + # threads by setting this to utils.sequencial_map + MAP_METHOD = staticmethod(utils.parallel_map) @property def NUM_SACKS(self): @@ -122,11 +125,10 @@ def add_measures_batch(self, metrics_and_measures): and values are a list of :py:class:`gnocchi.incoming.Measure`. """ - utils.parallel_map( - self._store_new_measures, - ((metric_id, self._encode_measures(measures)) - for metric_id, measures - in six.iteritems(metrics_and_measures))) + self.MAP_METHOD(self._store_new_measures, + ((metric_id, self._encode_measures(measures)) + for metric_id, measures + in six.iteritems(metrics_and_measures))) @staticmethod def _store_new_measures(metric_id, data): diff --git a/gnocchi/incoming/swift.py b/gnocchi/incoming/swift.py index 66681a256..a743868d9 100644 --- a/gnocchi/incoming/swift.py +++ b/gnocchi/incoming/swift.py @@ -21,12 +21,17 @@ from gnocchi.common import swift from gnocchi import incoming +from gnocchi import utils swclient = swift.swclient swift_utils = swift.swift_utils class SwiftStorage(incoming.IncomingDriver): + # NOTE(sileht): Using threads with swiftclient doesn't work + # as expected, so disable it + MAP_METHOD = staticmethod(utils.sequencial_map) + def __init__(self, conf, greedy=True): super(SwiftStorage, self).__init__(conf) self.swift = swift.get_connection(conf) diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index e12fff34a..6132afc72 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -119,6 +119,10 @@ def get_driver(conf, coord): class StorageDriver(object): + # NOTE(sileht): By default we use threads, but some driver can disable + # threads by setting this to utils.sequencial_map + MAP_METHOD = staticmethod(utils.parallel_map) + def __init__(self, conf, coord): self.coord = coord @@ -127,7 +131,7 @@ def upgrade(): pass def _get_measures(self, metric, keys, aggregation, version=3): - return utils.parallel_map( + return self.MAP_METHOD( self._get_measures_unbatched, ((metric, key, aggregation, version) for key in keys)) @@ -217,7 +221,7 @@ def get_measures(self, metric, granularities, raise AggregationDoesNotExist(metric, aggregation, g) aggregations.append(agg) - agg_timeseries = utils.parallel_map( + agg_timeseries = self.MAP_METHOD( self._get_measures_timeserie, ((metric, ag, from_timestamp, to_timestamp) for ag in aggregations)) @@ -568,7 +572,7 @@ def _map_add_measures(bound_timeserie): d.granularity, carbonara.round_timestamp( tstamp, d.granularity)) - utils.parallel_map( + self.MAP_METHOD( self._add_measures, ((aggregation, d, metric, ts, current_first_block_timestamp, diff --git a/gnocchi/storage/swift.py b/gnocchi/storage/swift.py index e13aa0de0..611963654 100644 --- a/gnocchi/storage/swift.py +++ b/gnocchi/storage/swift.py @@ -82,6 +82,9 @@ class SwiftStorage(storage.StorageDriver): WRITE_FULL = True + # NOTE(sileht): Using threads with swiftclient doesn't work + # as expected, so disable it + MAP_METHOD = staticmethod(utils.sequencial_map) def __init__(self, conf, coord=None): super(SwiftStorage, self).__init__(conf, coord) diff --git a/gnocchi/utils.py b/gnocchi/utils.py index 0510d18cb..573276cc3 100644 --- a/gnocchi/utils.py +++ b/gnocchi/utils.py @@ -299,11 +299,15 @@ def get_driver_class(namespace, conf): conf.driver).driver +def sequencial_map(fn, list_of_args): + return list(itertools.starmap(fn, list_of_args)) + + def parallel_map(fn, list_of_args): """Run a function in parallel.""" if parallel_map.MAX_WORKERS == 1: - return list(itertools.starmap(fn, list_of_args)) + return sequencial_map(fn, list_of_args) with futures.ThreadPoolExecutor( max_workers=parallel_map.MAX_WORKERS) as executor: