Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new remote admin Plugin #256

Merged
merged 14 commits into from
Sep 15, 2023
44 changes: 44 additions & 0 deletions pypeman/msgstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from itertools import islice
from collections import OrderedDict
from pathlib import Path

from pypeman.message import Message

Expand Down Expand Up @@ -112,6 +113,17 @@ async def total(self):
:return: total count of messages
"""

async def delete(self, id):
"""
Delete one message in the store corresponding to given `id` with his status.
Useful for tests and maybe in the future to permits cleanup of message store

!CAUTION! : cannot be undone

:param id: Message id. Message store dependant.
:return: A dict `{'id':<message_id>, 'state': <message_state>, 'message': <message_object>}`.
"""


class NullMessageStoreFactory(MessageStoreFactory):
""" Return an NullMessageStore that do nothing at all. """
Expand Down Expand Up @@ -140,6 +152,9 @@ async def search(self, **kwargs):
async def total(self):
return 0

async def delete(self, id):
return None
klausfmh marked this conversation as resolved.
Show resolved Hide resolved


class FakeMessageStoreFactory(MessageStoreFactory):
""" Return an Fake message store """
Expand Down Expand Up @@ -175,6 +190,13 @@ async def search(self, **kwargs):
async def total(self):
return 0

async def delete(self, id):
"""
we delete nothing here, but return what would have been deleted
(for testing)
"""
return {'id': id, 'state': 'processed', 'message': None}
klausfmh marked this conversation as resolved.
Show resolved Hide resolved


class MemoryMessageStoreFactory(MessageStoreFactory):
""" Return a Memory message store. All message are lost at pypeman stop. """
Expand Down Expand Up @@ -281,6 +303,11 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
async def total(self):
return len(self.messages)

async def delete(self, id):
resp = dict(self.messages.pop(id))
resp['message'] = Message.from_dict(resp['message'])
return resp


class FileMessageStoreFactory(MessageStoreFactory):
"""
Expand Down Expand Up @@ -484,3 +511,20 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e

async def total(self):
return self._total

async def _delete_meta_file(self, id):
meta_fpath = (Path(self.base_path) / str(id)).with_suffix(".meta")
klausfmh marked this conversation as resolved.
Show resolved Hide resolved
meta_fpath.unlink()

async def delete(self, id):
fpath = Path(self.base_path) / str(id)
if not fpath.exists():
raise IndexError

with fpath.open("rb") as f:
msg = Message.from_json(f.read().decode('utf-8'))

data_to_return = {'id': id, 'state': await self.get_message_state(id), 'message': msg}
fpath.unlink()
self._delete_meta_file(id=id)
return data_to_return
4 changes: 2 additions & 2 deletions pypeman/plugin_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def init_plugins(self):
if self.plugins:
for plugin in self.plugins:
if plugin.status == plugin.STARTED:
self.plugin.do_stop()
plugin.do_stop()
for plugin in self.plugins:
if plugin.status == plugin.STOPPED:
self.plugin.do_destroy()
plugin.do_destroy()

# instantiate plugins
self.plugins = []
Expand Down
2 changes: 1 addition & 1 deletion pypeman/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,5 @@ def do_destroy(self):
self.destroy()

def destroy(self):
assert self.status in (self.INITIAL, self.STOPPED)
assert self.status in (self.INITIALIZED, self.STOPPED)
self.status = self.DESTROYED
Empty file.
26 changes: 26 additions & 0 deletions pypeman/plugins/remoteadmin/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from aiohttp import web

from pypeman.plugins.base import BasePlugin
from pypeman.plugins.remoteadmin import urls


class RemoteAdminPlugin(BasePlugin):

def __init__(self, host="127.0.0.1", port=8091, url_prefix=""):
super().__init__()
self.app = web.Application()
self.host = host
self.port = port
self.url_prefix = url_prefix
self.runner = web.AppRunner(self.app)

def ready(self):
urls.init_urls(self.app, prefix=self.url_prefix)

async def start(self):
await self.runner.setup()
site = web.TCPSite(self.runner, self.host, self.port)
await site.start()

async def stop(self):
await self.runner.cleanup()
25 changes: 25 additions & 0 deletions pypeman/plugins/remoteadmin/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from aiohttp import web

from pypeman.plugins.remoteadmin import views


def init_urls(app, prefix=""):
"""
Create the pypeman remoteadmin routing

Args:
app (aiohttp.web.Application): The aiohttp web app where the
url routings have to be added
"""
app.add_routes([
# API :
web.get(prefix + '/channels', views.list_channels),
web.get(prefix + '/channels/{channelname}/start', views.start_channel),
web.get(prefix + '/channels/{channelname}/stop', views.stop_channel),
web.get(prefix + '/channels/{channelname}/messages', views.list_msgs),
web.get(prefix + '/channels/{channelname}/messages/{message_id}/replay', views.replay_msg),
web.get(prefix + '/channels/{channelname}/messages/{message_id}/view', views.view_msg),
web.get(prefix + '/channels/{channelname}/messages/{message_id}/preview', views.preview_msg),
# WEBSOCKETS :
web.get(prefix + '/', views.backport_old_client),
])
Loading
Loading