Skip to content

Commit

Permalink
Merge pull request #273 from klausfmh/dvl/klausf/pypeman_tools
Browse files Browse the repository at this point in the history
tools: view_store / send_from_store
  • Loading branch information
klausfmh authored Mar 22, 2024
2 parents 93bfa14 + f0cadaa commit 051f6cb
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 4 deletions.
7 changes: 3 additions & 4 deletions pypeman/contrib/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ def __init__(self, url, *args, method=None, headers=None, auth=None,
self.method = method
self.headers = headers
self.cookies = cookies
if isinstance(auth, tuple):
auth = aiohttp.BasicAuth(*auth)
self.auth = auth
self.verify = verify
self.params = params
Expand Down Expand Up @@ -295,10 +297,7 @@ async def handle_request(self, msg):
else:
get_params.append((key, param))

if isinstance(self.auth, tuple):
basic_auth = aiohttp.BasicAuth(self.auth[0], self.auth[1])
else:
basic_auth = self.auth
basic_auth = self.auth

data = None
if method.lower() in ['put', 'post', 'patch']:
Expand Down
4 changes: 4 additions & 0 deletions pypeman/msgstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ async def search(self, start=0, count=10, order_by='timestamp', start_dt=None, e
if not await self.is_regex_in_msg(mid, rtext):
continue
if start <= position < end:
# TODO: need to do processing of payload
# before filtering (HL7 / json-str)
# TODO: add filter here
# TODO: can we transfoer into a generator?
result.append(await self.get(mid))
position += 1
return result
Expand Down
62 changes: 62 additions & 0 deletions pypeman/tool_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
dispatcher command for pypeman tools
"""
import argparse
import importlib
import sys

from functools import partial


commands = [
("view_store", "read and search in file a pypeman store"),
("send_from_store", "send entry from store"),
]


def list_commands():
"""
list command and it's short description
"""
print("List of commands:")
for cmd, descr in commands:
print(f"{cmd}: {descr}")


def imp_mod_and_run(cmd, args):
"""
import module and run its main func
"""
mod = importlib.import_module(f"pypeman.tools.{cmd}")
sys.argv[1:] = args
mod.main()


def mk_parser():
"""
create argument parser
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-l", "--list-commands", help="list commands with small description", action="store_true")
subparsers = parser.add_subparsers(dest="command", help="sub command to call")
for cmd, descr in commands:
sub_prs = subparsers.add_parser(cmd, description=descr, add_help=False)
sub_prs.set_defaults(func=partial(imp_mod_and_run, cmd))
return parser


def main():
parser = mk_parser()
options, args = parser.parse_known_args()
if options.list_commands:
list_commands()
return
if not options.command:
parser.print_help()
return
options.func(args)


if __name__ == "__main__":
main()
Empty file added pypeman/tools/__init__.py
Empty file.
69 changes: 69 additions & 0 deletions pypeman/tools/send_from_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import argparse

from functools import partial
from pathlib import Path

import requests

from pypeman.tools.view_store import Filter
from pypeman.tools.view_store import process_file_store


def http_send(url, store_id, msg):
"""
sends message to a given url
"""
print(f"send {store_id} to {url}")
print(f"msg = {msg}")
payload = msg.payload
ses = requests.session()
ses.post(url, json=payload)


def get_send_function(dst_str):
"""
determines function, that will send a message to a target
"""
if dst_str.startswith("http"):
return partial(http_send, dst_str)

raise Exception(f"can't determine send function for {dst_str}")


def mk_parser():
"""
create argument parser for send_from_store command
"""
descr = "send store contents to an endpoint"
parser = argparse.ArgumentParser(descr)
parser.add_argument("path", help="path to search messages")
parser.add_argument(
"--type", "-t",
default="json",
help="payload type (dflt = %(default)s)",
)
parser.add_argument(
"--filter", "-f",
help="filter (dot_separated_names=value)",
action="append",
)
parser.add_argument(
"--destination", "-d",
help="destination 'url' to send messages to",
default="http://localhost:8000",
)
return parser


def main():
options = mk_parser().parse_args()
print(f"options {options}")
path = Path(options.path)
filters = options.filter or []
filters = [Filter(filter_str) for filter_str in filters]
send_func = get_send_function(options.destination)
process_file_store(path, pl_type=options.type, filters=filters, action=send_func)


if __name__ == "__main__":
main()
128 changes: 128 additions & 0 deletions pypeman/tools/view_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import argparse
import ast
import asyncio
import json

from pathlib import Path

from pypeman.msgstore import FileMessageStore


class Filter:
"""
a simple filter to filter messages fetched from a storage
"""
def __init__(self, filter_str):
self.filter_str = filter_str
name, val = filter_str.split("=", 1)
self.name = name
self.val = ast.literal_eval(val)

def __str__(self):
return self.filter_str

def match(self, msg):
return msg.payload.get(self.name) == self.val

def __repr__(self):
return str(self)


def split_path_to_store_n_id(path):
"""
helper to split an absolute msg_store path to a store and its id-path
"""
path_wo_date = path.parents[3]
store_name = path_wo_date.name
store_path = path_wo_date.parent
# TODO: most refactor the stores to simplify access by uid or file name or bu
# TODO: absolute file name
store = FileMessageStore(str(store_path), store_name)
rel_path_w_date = path.relative_to(path_wo_date)
return store, rel_path_w_date


def print_msg(store_id, msg):
"""
just print store id and msg payload
"""
print(store_id, msg.payload)


def handle_store_entry(path, pl_type=None, filters=None, action=print):
"""
handles a store entry and print if not rejected by filters
"""
store, rel_path_w_date = split_path_to_store_n_id(path)
entry = asyncio.run(store.get(str(rel_path_w_date)))
msg = entry["message"]
msg_id = entry["id"]
payload = msg.payload
if pl_type == "json":
msg.payload = payload = json.loads(payload)

match = True
for filter in filters:
if not filter.match(msg):
match = False
break
if not match:
return
action(msg_id, msg)


def process_file_store(
path, pl_type=None, filters=None, action=print_msg):
"""
handles one or multiple file store messages
must later see how to make code handle also other stores
param: path: the store 'id' / path of the store entry
param: filters: list of filters to filter message objects
param: action: action to perform with entry
"""

if path.is_file():
handle_store_entry(
path, pl_type=pl_type, filters=filters, action=action)

all_entries = path.glob("**/*")
f_paths = (
path for path in all_entries
if path.is_file() and not path.suffix
)
for entry in f_paths:
handle_store_entry(
entry, pl_type=pl_type, filters=filters, action=action)


def mk_parser():
"""
argument parser for view_store command
"""
descr = "view store contents or search"
parser = argparse.ArgumentParser(descr)
parser.add_argument("path", help="path to search messages")
parser.add_argument(
"--type", "-t",
default="json",
help="payload type (dflt = %(default)s)",
)
parser.add_argument(
"--filter", "-f",
help="filter (names=value) at the moment only for payload fields",
action="append",
)
return parser


def main():
options = mk_parser().parse_args()
path = Path(options.path)
filters = options.filter or []
filters = [Filter(filter_str) for filter_str in filters]
process_file_store(path, pl_type=options.type, filters=filters)


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"pypeman.tests.test_app",
"pypeman.tests.test_app_testing",
"pypeman.tests.settings",
"pypeman.tools",
],

package_data={
Expand All @@ -54,6 +55,7 @@
entry_points={
"console_scripts": [
"pypeman = pypeman.commands:cli",
"pypeman_tool = pypeman.tool_commands:main",
]
},

Expand Down

0 comments on commit 051f6cb

Please sign in to comment.