diff --git a/pypeman/channels.py b/pypeman/channels.py index 69dcd7b..3ba1176 100644 --- a/pypeman/channels.py +++ b/pypeman/channels.py @@ -915,7 +915,7 @@ def __init__(self, *args, basedir='', regex='.*', interval=1, binary_file=False, async def start(self): await super().start() - asyncio.create_task(self.watch_for_file()) + asyncio.create_task(self.infinite_watcher()) def file_status(self, filename): if filename in self.data: @@ -935,9 +935,14 @@ def _handle_callback(self, future): except Dropped: pass + async def infinite_watcher(self): + while not self.is_stopped(): + await self.watch_for_file() + await self.interruptable_sleeper.sleep(self.interval) + logger.warning("Won't watch anymore") + async def watch_for_file(self): # self.logger.debug("Will sleep") - await self.interruptable_sleeper.sleep(self.interval) # await asyncio.sleep(self.interval, loop=self.loop) # self.logger.debug("sleep done") try: @@ -978,16 +983,10 @@ async def watch_for_file(self): msg.payload = f.read() msg.meta['filename'] = filename msg.meta['filepath'] = str(filepath) - fut = asyncio.create_task(self.handle(msg)) - fut.add_done_callback(self._handle_callback) + await self.handle(msg) except Exception: # TODO: might explicitely silence some special cases. self.logger.exception("filewatcher problem") - finally: - if self.status not in (BaseChannel.STOPPING, BaseChannel.STOPPED,): - asyncio.create_task(self.watch_for_file()) - else: - logger.warning("Won't watch anymore") from pypeman.helpers import lazyload # noqa: E402 diff --git a/pypeman/tests/test_channel.py b/pypeman/tests/test_channel.py index 16da3f9..b500aab 100644 --- a/pypeman/tests/test_channel.py +++ b/pypeman/tests/test_channel.py @@ -85,15 +85,16 @@ def setUp(self): def tearDown(self): super().tearDown() - self.clean_loop() for end in endpoints.all_endpoints: self.loop.run_until_complete(end.stop()) # Stop all channels for chan in channels.all_channels: - if not chan.is_stopped: + if not chan.is_stopped(): + print(f"stopping chan {chan.name}") self.loop.run_until_complete(chan.stop()) + self.clean_loop() endpoints.reset_pypeman_endpoints() def test_base_channel(self): @@ -1190,7 +1191,6 @@ def fake_list_dir2(*args): chan2.watch_for_file = asyncio.coroutine(mock.Mock()) self.start_channels() self.loop.run_until_complete(chan2.tick()) - self.clean_loop() fake_ftp2.download_file.assert_called_once_with("testdir/file1.txt") channels.all_channels.remove(chan2) @@ -1208,7 +1208,6 @@ def test_fwatcher_channel(self): ok_fpath.touch() self.loop.run_until_complete(chan.watch_for_file()) self.assertEqual(n.last_input().payload, "testfilecontent") - self.clean_loop() def test_channel_stopped_dont_process_message(self): """ Whether BaseChannel handling return a good result """