Skip to content

Commit

Permalink
Implemented roller_shutter config management
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogeniola committed Jul 8, 2022
1 parent 840f73b commit 2fc4f9d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
23 changes: 21 additions & 2 deletions meross_iot/controller/mixins/roller_shutter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async def async_handle_push_notification(self, namespace: Namespace, data: dict)
state = RollerShutterState(roller_shutter['state']) # open (position=100, state=1), close (position=0, state=2), stop (position=-1, state=0)
self._shutter__state_by_channel[channel_index] = state
locally_handled = True

elif namespace == Namespace.ROLLER_SHUTTER_POSITION:
_LOGGER.debug(f"{self.__class__.__name__} handling push notification for namespace "
f"{namespace}")
Expand Down Expand Up @@ -120,7 +119,6 @@ async def async_fetch_config(self, timeout: Optional[float] = None, *args, **kwa
for d in config:
channel = d['channel']
channel_config = d.copy()
del channel_config['channel']
self._shutter__config_by_channel[channel] = channel_config

def get_open_timer_duration_millis(self, channel: int = 0, *args, **kwargs) -> int:
Expand All @@ -131,6 +129,27 @@ def get_close_timer_duration_millis(self, channel: int = 0, *args, **kwargs) ->
self.check_full_update_done()
return self._shutter__config_by_channel.get(channel).get("signalClose")

async def async_set_config(self, open_timer_seconds: int, close_timer_seconds: int, channel: int = 0, timeout: Optional[float] = None, *args, **kwargs) -> None:
"""
Sets the configuration parameters for the roller shutter on the given channel.
:param open_timer_seconds: open timer, min 10, max 120.
:param close_timer_seconds: close timer, min 10, max 120.
:param channel: channel to configure
:param timeout:
:return:
"""
if open_timer_seconds < 10 or open_timer_seconds > 120:
raise ValueError("Invalid open_timer_seconds timer, must be between 10 and 120 seconds.")
if close_timer_seconds < 10 or close_timer_seconds > 120:
raise ValueError("Invalid close_timer_seconds timer, must be between 10 and 120 seconds.")

config = {"channel": channel, "signalOpen": 1000*open_timer_seconds, "signalClose": 1000*close_timer_seconds}
res = await self._execute_command(method="SET",
namespace=Namespace.ROLLER_SHUTTER_CONFIG,
payload={"config": config},
timeout=timeout)
self._shutter__config_by_channel[channel] = config

def get_status(self, channel: int = 0, *args, **kwargs) -> RollerShutterState:
"""
The current roller shutter status. Returns 1 if the given roller shutter is open, 2 if it is close, 0 if it is stop.
Expand Down
20 changes: 18 additions & 2 deletions tests/test_roller_shutter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import random
from typing import Union, Dict

from aiohttp import web
Expand Down Expand Up @@ -146,7 +147,7 @@ async def test_get_opening_timer(self):
await self.test_device.async_update()

opening_timer = self.test_device.get_open_timer_duration_millis(channel=0)
self.assertEqual(opening_timer, 30000)
self.assertGreater(opening_timer, 0)

@unittest_run_loop
async def test_get_closing_timer(self):
Expand All @@ -158,7 +159,22 @@ async def test_get_closing_timer(self):
await self.test_device.async_update()

closing_timer = self.test_device.get_close_timer_duration_millis(channel=0)
self.assertEqual(closing_timer, 30000)
self.assertGreater(closing_timer, 0)

@unittest_run_loop
async def test_set_config(self):
if self.test_device is None:
self.skipTest("No RollerShutter device has been found to run this test on.")
print(f"Testing device {self.test_device.name}")

open_timer = random.randint(10,120)
close_timer = random.randint(10, 120)
await self.test_device.async_set_config(open_timer_seconds=open_timer, close_timer_seconds=close_timer, channel=0)
await self.test_device.async_fetch_config()
opening_timer = self.test_device.get_open_timer_duration_millis(channel=0)
self.assertEqual(opening_timer, open_timer*1000)
closing_timer = self.test_device.get_close_timer_duration_millis(channel=0)
self.assertEqual(closing_timer, close_timer*1000)

async def tearDownAsync(self):
if self.requires_logout:
Expand Down

0 comments on commit 2fc4f9d

Please sign in to comment.