Skip to content

Commit

Permalink
FIX logs in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin on chickenita committed Oct 3, 2024
1 parent 1e88bdd commit 418bdc6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 13 deletions.
17 changes: 8 additions & 9 deletions pypeman/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ def __init__(self, *args, basedir='', regex='.*', interval=1, binary_file=False,

async def start(self):
await super().start()
asyncio.create_task(self.watch_for_file())
asyncio.create_task(self.infinite_watcher())

def file_status(self, filename):
if filename in self.data:
Expand All @@ -935,9 +935,14 @@ def _handle_callback(self, future):
except Dropped:
pass

async def infinite_watcher(self):
while not self.is_stopped():
await self.watch_for_file()
await self.interruptable_sleeper.sleep(self.interval)
logger.warning("Won't watch anymore")

async def watch_for_file(self):
# self.logger.debug("Will sleep")
await self.interruptable_sleeper.sleep(self.interval)
# await asyncio.sleep(self.interval, loop=self.loop)
# self.logger.debug("sleep done")
try:
Expand Down Expand Up @@ -978,16 +983,10 @@ async def watch_for_file(self):
msg.payload = f.read()
msg.meta['filename'] = filename
msg.meta['filepath'] = str(filepath)
fut = asyncio.create_task(self.handle(msg))
fut.add_done_callback(self._handle_callback)
await self.handle(msg)

except Exception: # TODO: might explicitely silence some special cases.
self.logger.exception("filewatcher problem")
finally:
if self.status not in (BaseChannel.STOPPING, BaseChannel.STOPPED,):
asyncio.create_task(self.watch_for_file())
else:
logger.warning("Won't watch anymore")


from pypeman.helpers import lazyload # noqa: E402
Expand Down
7 changes: 3 additions & 4 deletions pypeman/tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,16 @@ def setUp(self):

def tearDown(self):
super().tearDown()
self.clean_loop()

for end in endpoints.all_endpoints:
self.loop.run_until_complete(end.stop())

# Stop all channels
for chan in channels.all_channels:
if not chan.is_stopped:
if not chan.is_stopped():
print(f"stopping chan {chan.name}")
self.loop.run_until_complete(chan.stop())
self.clean_loop()
endpoints.reset_pypeman_endpoints()

def test_base_channel(self):
Expand Down Expand Up @@ -1190,7 +1191,6 @@ def fake_list_dir2(*args):
chan2.watch_for_file = asyncio.coroutine(mock.Mock())
self.start_channels()
self.loop.run_until_complete(chan2.tick())
self.clean_loop()
fake_ftp2.download_file.assert_called_once_with("testdir/file1.txt")
channels.all_channels.remove(chan2)

Expand All @@ -1208,7 +1208,6 @@ def test_fwatcher_channel(self):
ok_fpath.touch()
self.loop.run_until_complete(chan.watch_for_file())
self.assertEqual(n.last_input().payload, "testfilecontent")
self.clean_loop()

def test_channel_stopped_dont_process_message(self):
""" Whether BaseChannel handling return a good result """
Expand Down

0 comments on commit 418bdc6

Please sign in to comment.