Skip to content

Commit

Permalink
Merge pull request #283 from mhcomm/dvl/quentin/replace_filemsgstore_id
Browse files Browse the repository at this point in the history
FileMessageStore: Don't use path as id, only fname + add a param to not encode the payload for remoteadmin
  • Loading branch information
quentinql authored Jun 25, 2024
2 parents e2b4dc8 + 7bfa4ad commit 09f00fa
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 48 deletions.
14 changes: 11 additions & 3 deletions pypeman/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,24 @@ def add_context(self, key, msg):
payload=copy.deepcopy(msg.payload),
)

def to_dict(self):
def to_dict(self, encode_payload=True):
"""
Convert the current message object to a dict. Payload is pickled.
Convert the current message object to a dict.
Payload is pickled and b64 encoded if encode_payload not set to False
:return: A dict with an equivalent of message
"""
result = {}
result['timestamp'] = self.timestamp.strftime(DATE_FORMAT)
result['uuid'] = self.uuid
result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii')
if encode_payload:
result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii')
else:
try:
result['payload'] = str(self.payload)
except Exception:
default_logger.warning("Cannot convert to string payload %r, pickling it")
result['payload'] = base64.b64encode(pickle.dumps(self.payload)).decode('ascii')
result['meta'] = self.meta
result['ctx'] = {}

Expand Down
73 changes: 40 additions & 33 deletions pypeman/msgstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ async def delete(self, id):
class FileMessageStoreFactory(MessageStoreFactory):
"""
Generate a FileMessageStore message store instance.
Store a file in `<base_path>/<store_id>/<month>/<day>/` hierachy.
Store a file in `<base_path>/<store_id>/<year>/<month>/<day>/` hierachy.
"""

# TODO add an option to reguraly archive old file or delete them
Expand All @@ -364,7 +364,7 @@ def get_store(self, store_id):


class FileMessageStore(MessageStore):
""" Store a file in `<base_path>/<store_id>/<month>/<day>/` hierachy."""
""" Store a file in `<base_path>/<store_id>/<year>/<month>/<day>/` hierachy."""
# TODO file access should be done in another thread. Waiting for file backend.

def __init__(self, path, store_id):
Expand All @@ -374,7 +374,7 @@ def __init__(self, path, store_id):

# Match msg file name
self.msg_re = re.compile(
r'^(?P<msg_date>[0-9]{8})_(?P<msg_time>[0-9]{2}[0-9]{2})_(?P<msg_uid>[0-9abcdef]*)$')
r'^(?P<msg_date>[0-9]{8})_(?P<msg_time>[0-9]{2}[0-9]{2})_(?P<msg_uid>[0-9a-zA-Z]*)$')

try:
# Try to make dirs if necessary
Expand All @@ -384,6 +384,17 @@ def __init__(self, path, store_id):

self._total = 0

def id2path(self, id):
match = self.msg_re.match(id)
if not match:
raise ValueError(f"Id '{id}' not a correct id")
msg_str_date = match.groupdict()["msg_date"]
year = msg_str_date[:4]
month = msg_str_date[4:6]
day = msg_str_date[6:8]
msg_path = Path(self.base_path) / year / month / day / id
return msg_path

async def start(self):
self._total = await self.count_msgs()

Expand All @@ -393,27 +404,18 @@ async def store(self, msg):

# The filename is the file id
filename = "{}_{}".format(msg.timestamp.strftime(DATE_FORMAT), msg.uuid)
dirs = os.path.join(str(msg.timestamp.year),
"%02d" % msg.timestamp.month,
"%02d" % msg.timestamp.day)

try:
# Try to make dirs if necessary
os.makedirs(os.path.join(self.base_path, dirs))
except FileExistsError:
pass

file_path = os.path.join(dirs, filename)
msg_path = self.id2path(filename)
msg_path.parent.mkdir(parents=True, exist_ok=True)

# Write message to file
with open(os.path.join(self.base_path, file_path), "w") as f:
with msg_path.open("w") as f:
f.write(msg.to_json())

await self.change_message_state(file_path, Message.PENDING)
await self.change_message_state(filename, Message.PENDING)

self._total += 1

return file_path
return filename

def _is_json_meta(self, id):
"""Check if the message meta file is a json. If it's not,
Expand All @@ -423,7 +425,8 @@ def _is_json_meta(self, id):
Args:
id (str): The id of the message
"""
meta_fpath = (Path(self.base_path) / id).with_suffix(".meta")
msg_fpath = self.id2path(id)
meta_fpath = msg_fpath.with_suffix(".meta")
if not meta_fpath.exists():
return False
with meta_fpath.open("r") as fin:
Expand All @@ -439,7 +442,8 @@ def _convert_meta_to_json(self, id):
Args:
id (str): The id of the message to convert
"""
meta_fpath = (Path(self.base_path) / id).with_suffix(".meta")
msg_fpath = self.id2path(id)
meta_fpath = msg_fpath.with_suffix(".meta")
with meta_fpath.open("r") as fin:
state = fin.read()
meta_data = {"state": state}
Expand All @@ -448,7 +452,8 @@ def _convert_meta_to_json(self, id):
return meta_data

async def get_message_meta_infos(self, id, meta_info_name=None):
meta_fpath = (Path(self.base_path) / id).with_suffix(".meta")
msg_fpath = self.id2path(id)
meta_fpath = msg_fpath.with_suffix(".meta")
if meta_fpath.exists():
is_new_meta_file = self._is_json_meta(id)
if not is_new_meta_file:
Expand All @@ -464,7 +469,8 @@ async def get_message_meta_infos(self, id, meta_info_name=None):
return meta_data

async def add_message_meta_infos(self, id, meta_info_name, info):
meta_fpath = (Path(self.base_path) / id).with_suffix(".meta")
msg_fpath = self.id2path(id)
meta_fpath = msg_fpath.with_suffix(".meta")
if meta_fpath.exists():
is_new_meta_file = self._is_json_meta(id)
if not is_new_meta_file:
Expand All @@ -486,10 +492,11 @@ async def get_message_state(self, id):
return await self.get_message_meta_infos(id, "state")

async def get(self, id):
if not os.path.exists(os.path.join(self.base_path, id)):
fpath = self.id2path(id)
if not fpath.exists():
raise IndexError

with open(os.path.join(self.base_path, id), "rb") as f:
with fpath.open("rb") as f:
msg = Message.from_json(f.read().decode('utf-8'))
return {
'id': id,
Expand Down Expand Up @@ -527,8 +534,8 @@ async def count_msgs(self):
for year in await self.sorted_list_directories(os.path.join(self.base_path)):
for month in await self.sorted_list_directories(os.path.join(self.base_path, year)):
for day in await self.sorted_list_directories(os.path.join(self.base_path, year, month)):
for msg_name in sorted(os.listdir(os.path.join(self.base_path, year, month, day))):
found = self.msg_re.match(msg_name)
for msg_id in sorted(os.listdir(os.path.join(self.base_path, year, month, day))):
found = self.msg_re.match(msg_id)
if found:
count += 1
return count
Expand Down Expand Up @@ -583,48 +590,48 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
if end_dt:
if msg_date > end_dt.date():
continue
for msg_name in sorted(
for msg_id in sorted(
os.listdir(os.path.join(
self.base_path, year, month, day)),
reverse=reverse):
found = self.msg_re.match(msg_name)
found = self.msg_re.match(msg_id)
if found:
msg_str_time = found.groupdict()["msg_time"]
hour = int(msg_str_time[:2])
minute = int(msg_str_time[2:4])
msg_time = datetime.time(hour=hour, minute=minute, second=0)
msg_dt = datetime.datetime.combine(msg_date, msg_time)
mid = os.path.join(year, month, day, msg_name)
if start_dt:
if msg_dt < start_dt:
continue
if end_dt:
if msg_dt > end_dt:
continue
if text:
if not await self.is_txt_in_msg(mid, text):
if not await self.is_txt_in_msg(msg_id, text):
continue
if rtext:
if not await self.is_regex_in_msg(mid, rtext):
if not await self.is_regex_in_msg(msg_id, rtext):
continue
if start <= position < end:
# TODO: need to do processing of payload
# before filtering (HL7 / json-str)
# TODO: add filter here
# TODO: can we transfoer into a generator?
result.append(await self.get(mid))
result.append(await self.get(msg_id))
position += 1
return result

async def total(self):
return self._total

async def _delete_meta_file(self, id):
meta_fpath = (Path(self.base_path) / str(id)).with_suffix(".meta")
msg_path = self.id2path(id)
meta_fpath = msg_path.with_suffix(".meta")
meta_fpath.unlink()

async def delete(self, id):
fpath = Path(self.base_path) / str(id)
fpath = self.id2path(id)
if not fpath.exists():
raise IndexError

Expand Down
31 changes: 26 additions & 5 deletions pypeman/plugins/remoteadmin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,18 @@ async def replay_msg(request, ws=None):
:params channel: The channel name.
:params msg_id: The message id to replay.
:queryparams:
encode_payload (Bool, default=False): Force pickling and encoding the payload or not
"""
channelname = request.match_info['channelname']
message_id = request.match_info['message_id']
args = request.rel_url.query
encode_payload = args.get("encode_payload", False)
chan = get_channel(channelname)
try:
msg_res = await chan.replay(message_id)
result = msg_res.to_dict()
result = msg_res.to_dict(encode_payload=encode_payload)
except IndexError:
message = f"Cannot replay msg, id {message_id} probably doesn't exists"
logger.error(message)
Expand All @@ -160,16 +165,22 @@ async def view_msg(request, ws=None):
:params channelname: The channel name.
:params message_id: The message id to view
:queryparams:
encode_payload (Bool, default=False): Force pickling and encoding the payload or not
"""

channelname = request.match_info['channelname']
message_id = request.match_info['message_id']

args = request.rel_url.query
encode_payload = args.get("encode_payload", False)

chan = get_channel(channelname)

try:
msg_res = await chan.message_store.get_msg_content(message_id)
result = msg_res.to_dict()
result = msg_res.to_dict(encode_payload=encode_payload)
except IndexError:
message = f"Cannot view msg, id {message_id} probably doesn't exists"
logger.error(message)
Expand All @@ -190,14 +201,20 @@ async def preview_msg(request, ws=None):
:params channelname: The channel name.
:params message_id: The message id to preview
:queryparams:
encode_payload (Bool, default=False): Force pickling and encoding the payload or not
"""
channelname = request.match_info['channelname']
message_id = request.match_info['message_id']

args = request.rel_url.query
encode_payload = args.get("encode_payload", False)

chan = get_channel(channelname)
try:
msg_res = await chan.message_store.get_preview_str(message_id)
result = msg_res.to_dict()
result = msg_res.to_dict(encode_payload=encode_payload)
except IndexError:
message = f"Cannot preview msg, id {message_id} probably doesn't exists"
logger.error(message)
Expand Down Expand Up @@ -248,11 +265,15 @@ async def backport_old_client(request):
elif cmd_method == "preview_msg":
message_id = params[1]
request.match_info["message_id"] = message_id
await preview_msg(request, ws=ws)
query_url = request.rel_url.with_query({"encode_payload": "True"})
new_req = request.clone(rel_url=query_url)
await preview_msg(request=new_req, ws=ws)
elif cmd_method == "view_msg":
message_id = params[1]
request.match_info["message_id"] = message_id
await view_msg(request=request, ws=ws)
query_url = request.rel_url.with_query({"encode_payload": "True"})
new_req = request.clone(rel_url=query_url)
await view_msg(request=new_req, ws=ws)
elif cmd_method == "replay_msg":
message_id = params[1]
request.match_info["message_id"] = message_id
Expand Down
21 changes: 14 additions & 7 deletions pypeman/tests/test_msgstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,12 @@ def test_file_message_store(self):

# Test processed message
dict_msg = self.loop.run_until_complete(
chan.message_store.get('1982/11/28/19821128_1235_%s' % msg3.uuid))
chan.message_store.get('19821128_1235_%s' % msg3.uuid))
self.assertEqual(dict_msg['state'], 'processed', "Message %s should be in processed state!" % msg3)

# Test failed message
dict_msg = self.loop.run_until_complete(
chan.message_store.get('1982/11/12/19821112_1435_%s' % msg5.uuid))
chan.message_store.get('19821112_1435_%s' % msg5.uuid))
self.assertEqual(dict_msg['state'], 'error', "Message %s should be in error state!" % msg5)

self.assertTrue(os.path.exists("%s/%s/1982/11/28/19821128_1235_%s"
Expand All @@ -320,12 +320,12 @@ def test_file_message_store(self):

# Test view message
msg_content = self.loop.run_until_complete(chan.message_store.get_msg_content(
'1982/11/12/19821112_1435_%s' % msg5.uuid))
'19821112_1435_%s' % msg5.uuid))
self.assertEqual(msg_content.payload, msg5.payload, "Failure of message %s view!" % msg5)

# Test preview message
msg_content = self.loop.run_until_complete(chan.message_store.get_preview_str(
'1982/11/12/19821112_1435_%s' % msg5.uuid))
'19821112_1435_%s' % msg5.uuid))
self.assertEqual(msg_content.payload, msg5.payload[:1000], "Failure of message %s preview!" % msg5)

self.clean_loop()
Expand All @@ -340,13 +340,20 @@ def test_file_message_store_meta(self):
new_meta_tst_path = data_tst_path / "new_meta.meta"
with new_meta_tst_path.open("r") as fin:
new_meta_data = json.load(fin)
msg_id = "msgid"
msg_uid = "msgid"
msg_year = "2024"
msg_month = "06"
msg_day = "13"
msg_date = f"{msg_year}{msg_month}{msg_day}"
msg_time = "0000"
msg_id = f"{msg_date}_{msg_time}_{msg_uid}"

with tempfile.TemporaryDirectory() as tempdir:
print(tempdir)
store_factory = msgstore.FileMessageStoreFactory(path=tempdir)
store = store_factory.get_store(store_id="")
meta_dst_path = Path(tempdir) / f"{msg_id}.meta"
meta_dst_folder_path = Path(tempdir) / msg_year / msg_month / msg_day
meta_dst_folder_path.mkdir(parents=True, exist_ok=True)
meta_dst_path = meta_dst_folder_path / f"{msg_id}.meta"
shutil.copy(old_meta_tst_path, meta_dst_path)
# Tests that msgstore could read an old meta file
msg_state = asyncio.run(store.get_message_state(msg_id))
Expand Down

0 comments on commit 09f00fa

Please sign in to comment.