Skip to content

Commit

Permalink
swift: avoid Connection aborted
Browse files Browse the repository at this point in the history
This change removes usage of threads with swift driver.

This avoids to get "Connection aborted" because a thread is stuck
and the server side decide to break the connection.

Related-bug: gnocchixyz#509
(cherry picked from commit bc18ebd)
  • Loading branch information
sileht committed May 22, 2018
1 parent 2632b6e commit fdd7770
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 9 deletions.
12 changes: 7 additions & 5 deletions gnocchi/incoming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class IncomingDriver(object):
SACK_PREFIX = "incoming"
CFG_PREFIX = 'gnocchi-config'
CFG_SACKS = 'sacks'
# NOTE(sileht): By default we use threads, but some driver can disable
# threads by setting this to utils.sequencial_map
MAP_METHOD = staticmethod(utils.parallel_map)

@property
def NUM_SACKS(self):
Expand Down Expand Up @@ -122,11 +125,10 @@ def add_measures_batch(self, metrics_and_measures):
and values are a list of
:py:class:`gnocchi.incoming.Measure`.
"""
utils.parallel_map(
self._store_new_measures,
((metric_id, self._encode_measures(measures))
for metric_id, measures
in six.iteritems(metrics_and_measures)))
self.MAP_METHOD(self._store_new_measures,
((metric_id, self._encode_measures(measures))
for metric_id, measures
in six.iteritems(metrics_and_measures)))

@staticmethod
def _store_new_measures(metric_id, data):
Expand Down
5 changes: 5 additions & 0 deletions gnocchi/incoming/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@

from gnocchi.common import swift
from gnocchi import incoming
from gnocchi import utils

swclient = swift.swclient
swift_utils = swift.swift_utils


class SwiftStorage(incoming.IncomingDriver):
# NOTE(sileht): Using threads with swiftclient doesn't work
# as expected, so disable it
MAP_METHOD = staticmethod(utils.sequencial_map)

def __init__(self, conf, greedy=True):
super(SwiftStorage, self).__init__(conf)
self.swift = swift.get_connection(conf)
Expand Down
10 changes: 7 additions & 3 deletions gnocchi/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def get_driver(conf, coord):

class StorageDriver(object):

# NOTE(sileht): By default we use threads, but some driver can disable
# threads by setting this to utils.sequencial_map
MAP_METHOD = staticmethod(utils.parallel_map)

def __init__(self, conf, coord):
self.coord = coord

Expand All @@ -127,7 +131,7 @@ def upgrade():
pass

def _get_measures(self, metric, keys, aggregation, version=3):
return utils.parallel_map(
return self.MAP_METHOD(
self._get_measures_unbatched,
((metric, key, aggregation, version)
for key in keys))
Expand Down Expand Up @@ -217,7 +221,7 @@ def get_measures(self, metric, granularities,
raise AggregationDoesNotExist(metric, aggregation, g)
aggregations.append(agg)

agg_timeseries = utils.parallel_map(
agg_timeseries = self.MAP_METHOD(
self._get_measures_timeserie,
((metric, ag, from_timestamp, to_timestamp)
for ag in aggregations))
Expand Down Expand Up @@ -568,7 +572,7 @@ def _map_add_measures(bound_timeserie):
d.granularity, carbonara.round_timestamp(
tstamp, d.granularity))

utils.parallel_map(
self.MAP_METHOD(
self._add_measures,
((aggregation, d, metric, ts,
current_first_block_timestamp,
Expand Down
3 changes: 3 additions & 0 deletions gnocchi/storage/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
class SwiftStorage(storage.StorageDriver):

WRITE_FULL = True
# NOTE(sileht): Using threads with swiftclient doesn't work
# as expected, so disable it
MAP_METHOD = staticmethod(utils.sequencial_map)

def __init__(self, conf, coord=None):
super(SwiftStorage, self).__init__(conf, coord)
Expand Down
6 changes: 5 additions & 1 deletion gnocchi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,15 @@ def get_driver_class(namespace, conf):
conf.driver).driver


def sequencial_map(fn, list_of_args):
return list(itertools.starmap(fn, list_of_args))


def parallel_map(fn, list_of_args):
"""Run a function in parallel."""

if parallel_map.MAX_WORKERS == 1:
return list(itertools.starmap(fn, list_of_args))
return sequencial_map(fn, list_of_args)

with futures.ThreadPoolExecutor(
max_workers=parallel_map.MAX_WORKERS) as executor:
Expand Down

0 comments on commit fdd7770

Please sign in to comment.