Skip to content

Commit

Permalink
Implemented rate-limiting against mqtt
Browse files Browse the repository at this point in the history
Updated documentation
Updated version
  • Loading branch information
albertogeniola committed Oct 4, 2020
1 parent ea2d5ae commit 8c39a1d
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.4.0.3
0.4.0.4
30 changes: 30 additions & 0 deletions docs/advanced-topics.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,36 @@
Advanced topics
===============

Managing rate limits
--------------------

Starting from version 0.4.0.4, the `MerossManager` object limits the MQTT messages sent to the Meross cloud,
in order to prevent ban from Meross security team when flooding the remote MQTT broker.

The current implementation of rate limits is based on a *global* rate limiter and on a *per device* rate limiter.
Each command issued by the manager is first checked against the device limits.
If that limit is not reached yet, then a second check is performed against the global limit.
If both the checks pass, then the command is issued.

In case any of the two limits is reached, the Manager can proceed in two ways.
If the *limit_hits/burst_rate* is **below** the `over_limit_threshold_percentage`, then the message is simply delayed of over_limit_delay_seconds.
If the *limit_hits/burst_rate* is **above** the `over_limit_threshold_percentage`, then the message is droped and `RateLimitExceeded` is raised.

Both limit checks are based on the `Token bucket policy <https://it.wikipedia.org/wiki/Token_bucket>`_ and the developer can set them up when building the `MerossManager` object.
In fact, the `MerossManager` supports the following parameters:

=============================== ============= =========================================================================
Parameter Default value Description
------------------------------- ------------- -------------------------------------------------------------------------
burst_requests_per_second_limit 4 Maximum number of requests that can be served in a-second burst
------------------------------- ------------- -------------------------------------------------------------------------
requests_per_second_limit 1 Number of new tokens per second
------------------------------- ------------- -------------------------------------------------------------------------
over_limit_delay_seconds 1 Seconds to delay when limit is reached
------------------------------- ------------- -------------------------------------------------------------------------
over_limit_threshold_percentage 1000 Percentage threshold above which messages are dropped rather than delayed
=============================== ============= =========================================================================

Sniff device data
-----------------

Expand Down
12 changes: 12 additions & 0 deletions docs/common-gotchas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,15 @@ Inconsistent device state
lost and then restored.


Ban from Meross cloud
Meross security team may suspend the user accounts that perform too many requests.
In some cases, an automated email is delivered to the email address of the user account,
warning him about imminent suspension. In other cases, the account might be suspended without any notice.
If that happens, the user need to contact Meross team and ask for ban-removal.
In any case, to prevent that from happening, be sure to adopt convenient rate-limits,
as introduced in version 0.4.0.4.

At the time of writing, such rate limits are not documented. The MerossManager automatically applies conservative
limits in order to prevent banning from Meross Cloud, however it's up to the Developer to properly configure
the rate limits as explained in :ref:`advanced-topics:Managing rate limits`.

3 changes: 2 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage', 'sphinx.ext.napoleon']
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage', 'sphinx.ext.napoleon', 'sphinx.ext.autosectionlabel']
autosectionlabel_prefix_document = True

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
Expand Down
164 changes: 122 additions & 42 deletions meross_iot/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@
import ssl
import string
import sys
import time
from time import time
from asyncio import Future, AbstractEventLoop
from asyncio import TimeoutError
from hashlib import md5
from typing import Optional, List, TypeVar, Iterable, Callable, Awaitable

from typing import Optional, List, TypeVar, Iterable, Callable, Awaitable, Tuple
import paho.mqtt.client as mqtt

from meross_iot.controller.device import BaseDevice, HubDevice, GenericSubDevice
from meross_iot.device_factory import build_meross_device, build_meross_subdevice
from meross_iot.http_api import MerossHttpClient
from meross_iot.model.enums import Namespace, OnlineStatus
from meross_iot.model.exception import CommandTimeoutError, CommandError
from meross_iot.model.exception import CommandTimeoutError, CommandError, RateLimitExceeded
from meross_iot.model.exception import UnconnectedError
from meross_iot.model.http.device import HttpDeviceInfo
from meross_iot.model.http.subdevice import HttpSubdeviceInfo
Expand All @@ -27,44 +25,108 @@
from meross_iot.model.push.unbind import UnbindPushNotification
from meross_iot.utilities.mqtt import generate_mqtt_password, generate_client_and_app_id, build_client_response_topic, \
build_client_user_topic, verify_message_signature, device_uuid_from_push_notification, build_device_request_topic
from datetime import datetime, timedelta
from datetime import timedelta
from enum import Enum

logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO, stream=sys.stdout)
_LOGGER = logging.getLogger(__name__)

T = TypeVar('T', bound=BaseDevice) # Declare type variable


class RateLimiter(object):
def __init__(self, window_interval: timedelta, window_max_requests: int):
self._window_max_requests = window_max_requests
self._window_interval = window_interval
self._setup_window()

def _setup_window(self):
self._window_start = datetime.now()
self._window_end = self._window_start + self._window_interval
self._tokens = self._window_max_requests

def _check_window(self):
now = datetime.now()
if now > self._window_end:
self._window_start = datetime.now()
self._window_end = self._window_start + self._window_interval
self._tokens = self._window_max_requests

def try_get_token(self, message=None):
self._check_window()
if self._tokens > 0:
self._tokens -= 1
return True
class TokenBucketRateLimiter(object):
"""
Simple implementation of token bucket rate limiter algorithm
Careful: This class is not thread-safe.
"""

if message is None:
_LOGGER.warning("Rate limiter kicked in.")
else:
_LOGGER.warning(message)
def __init__(self,
window_interval: timedelta,
tokens_per_interval: int,
max_burst_size: int):
self._window_interval_seconds = window_interval.total_seconds()
self._tokens_per_interval = tokens_per_interval
self._max_burst = max_burst_size

# Let's keep track of limit hits in the ongoing time-window
self._limit_hits_in_window = 0

# Set the initial interval end in the past, so that the first iteration is consistent with the following ones
self._current_window_end = time() - self._window_interval_seconds
self._remaining_tokens = 0

def _add_tokens(self):
# Calculate the number of tokens that we should add.
# This is calculated as number of intervals we skipped * tokens_per_interval
# However, we can only add up to max_burst tokens
now = time()
if now < self._current_window_end:
# Do not add tokens for intervals that have been already
# considered
return

return False
# Calculate how many intervals have passed since the end of the previous one
n_intervals = (now - self._current_window_end) // self._window_interval_seconds + 1
n_tokens = n_intervals * self._tokens_per_interval
self._remaining_tokens = min(self._remaining_tokens + n_tokens, self._max_burst)
self._current_window_end = now + self._window_interval_seconds
self._limit_hits_in_window = 0

@property
def over_limit_percentace(self):
return (self._limit_hits_in_window / self._max_burst) * 100

def check_limit_reached(self) -> bool:
# Add tokens if needed
self._add_tokens()

if self._remaining_tokens > 0:
self._remaining_tokens -= 1
return False

self._limit_hits_in_window += 1
return True


class RateLimitResult(Enum):
NotLimited = 0,
GlobalLimitReached = 1,
PerDeviceLimitReached = 2


class RateLimitChecker(object):
def __init__(self,
global_burst_rate=4,
global_time_window=timedelta(seconds=1),
global_tokens_per_interval=2,
device_burst_rate=2,
device_time_window=timedelta(seconds=1),
device_tokens_per_interval=1):
# Global limiter configuration
self._global_limiter = TokenBucketRateLimiter(window_interval=global_time_window,
tokens_per_interval=global_tokens_per_interval,
max_burst_size=global_burst_rate)
# Device limiters
self._devices_limiters = {}
self._device_burst_rate = device_burst_rate
self._device_time_window = device_time_window
self._device_tokens_per_interval = device_tokens_per_interval

def check_limits(self, device_uuid) -> Tuple[RateLimitResult, float]:
# Check the device limit first
if device_uuid not in self._devices_limiters:
self._devices_limiters[device_uuid] = TokenBucketRateLimiter(window_interval=self._device_time_window,
tokens_per_interval=self._device_tokens_per_interval,
max_burst_size=self._device_burst_rate)
device_limiter = self._devices_limiters[device_uuid]
if device_limiter.check_limit_reached():
return RateLimitResult.PerDeviceLimitReached, device_limiter.over_limit_percentace

# Check the global rate limiter
if self._global_limiter.check_limit_reached():
return RateLimitResult.GlobalLimitReached, self._global_limiter.over_limit_percentace

return RateLimitResult.NotLimited, 0


class MerossManager(object):
Expand All @@ -81,7 +143,10 @@ def __init__(self,
port: Optional[int] = 2001,
ca_cert: Optional[str] = None,
loop: Optional[AbstractEventLoop] = None,
max_requests_per_second: Optional[int] = 4,
over_limit_delay_seconds: int = 1,
over_limit_threshold_percentage: float = 1000,
burst_requests_per_second_limit: int = 4,
requests_per_second_limit: int = 1,
*args,
**kwords) -> None:

Expand Down Expand Up @@ -121,7 +186,14 @@ def __init__(self,
self._user_topic = build_client_user_topic(user_id=self._cloud_creds.user_id)

# Setup a rate limiter
self._limiter = RateLimiter(window_interval=timedelta(seconds=1), window_max_requests=max_requests_per_second)
self._over_limit_delay = over_limit_delay_seconds
self._over_limit_threshold = over_limit_threshold_percentage
self._limiter = RateLimitChecker(
global_burst_rate=burst_requests_per_second_limit,
device_burst_rate=burst_requests_per_second_limit,
global_tokens_per_interval=requests_per_second_limit,
device_tokens_per_interval=requests_per_second_limit
)

def register_push_notification_handler_coroutine(self, coro: Callable[
[GenericPushNotification, List[BaseDevice]], Awaitable]) -> None:
Expand Down Expand Up @@ -555,11 +627,19 @@ async def async_execute_cmd(self,
raise UnconnectedError()

# Check API rate limits.
if not self._limiter.try_get_token(message=f"Rate limiter: re-scheduling. Command {method} {namespace} "
f"message to {destination_device_uuid}"):
# TODO: avoid starvation. We should prevent chatty device from putting too heavy load.
# This can be achieved by holding per-uuid stats.
await asyncio.sleep(delay=1, loop=self._loop)
limit_result, overlimit_percentage = self._limiter.check_limits(device_uuid=destination_device_uuid)
if limit_result != RateLimitResult.NotLimited:
_LOGGER.debug(f"Current over-limit: {overlimit_percentage} %")
# If the over-limit rate is too high, just drop the call.
if overlimit_percentage > self._over_limit_threshold:
_LOGGER.error(f"Rate limit reached: over-limit percentage is {overlimit_percentage}% which exceeds "
f"the current {self._over_limit_threshold} limit. The call will be dropped.")
raise RateLimitExceeded()

# In case the limit is hit but the the overlimit is sustainable, do not raise an exception, just
# buy some time
_LOGGER.debug(f"Rate limit reached: api call will be delayed by {self._over_limit_delay} seconds")
await asyncio.sleep(delay=self._over_limit_delay, loop=self._loop)
return await self.async_execute_cmd(destination_device_uuid=destination_device_uuid,
method=method, namespace=namespace, payload=payload,
timeout=timeout)
Expand Down Expand Up @@ -613,7 +693,7 @@ def _build_mqtt_message(self, method: str, namespace: Namespace, payload: dict):
md5_hash = md5()
md5_hash.update(randomstring.encode('utf8'))
messageId = md5_hash.hexdigest().lower()
timestamp = int(round(time.time()))
timestamp = int(round(time()))

# Hash the messageId, the key and the timestamp
md5_hash = md5()
Expand Down
4 changes: 4 additions & 0 deletions meross_iot/model/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ class CommandError(Exception):
def __init__(self, error_payload: dict):
super().__init__()
self.error_payload = error_payload


class RateLimitExceeded(Exception):
pass
20 changes: 16 additions & 4 deletions tests/test_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from meross_iot.http_api import MerossHttpClient
from meross_iot.manager import MerossManager
from meross_iot.model.enums import OnlineStatus
from meross_iot.model.exception import RateLimitExceeded

EMAIL = os.environ.get('MEROSS_EMAIL')
PASSWORD = os.environ.get('MEROSS_PASSWORD')
Expand All @@ -36,14 +37,25 @@ async def setUpAsync(self):

self.test_sensors = manager.find_devices(device_class=ElectricityMixin)

async def _perform_requests(self, sensor: ElectricityMixin, n_requests: int):
tasks = []
for i in range(n_requests):
tasks.append(sensor.async_get_instant_metrics())
await asyncio.gather(*tasks)

@unittest_run_loop
async def test_high_rate(self):
async def test_sustainable_rate(self):
if len(self.test_sensors) < 1:
self.skipTest("No device found for this test")

for d in self.test_sensors:
for i in range(20):
await d.async_get_instant_metrics()
await self._perform_requests(sensor=self.test_sensors[0], n_requests=20)

@unittest_run_loop
async def test_unsustainable_rate(self):
if len(self.test_sensors) < 1:
self.skipTest("No device found for this test")
with self.assertRaises(RateLimitExceeded):
await self._perform_requests(sensor=self.test_sensors[0], n_requests=200)

async def tearDownAsync(self):
await self.meross_client.async_logout()
2 changes: 2 additions & 0 deletions tests/test_push_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
if os.name == 'nt':
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
import asyncio


class TestPushNotificationHandler(AioHTTPTestCase):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_togglex.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
if os.name == 'nt':
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
import asyncio


class TestToggleX(AioHTTPTestCase):
Expand Down

0 comments on commit 8c39a1d

Please sign in to comment.