From 3d9f3450a2dfae68e7e5dfc14dce7062490ae802 Mon Sep 17 00:00:00 2001 From: Attumm Date: Fri, 2 Aug 2024 01:15:41 +0200 Subject: [PATCH] Added start of decorators --- examples/example_decorator.py | 35 ++++++++++++++++++++++++++++++++++ examples/example_multi_func.py | 4 ++++ examples/example_produce.py | 4 ++++ meesee.py | 26 +++++++++++++++++++++++-- 4 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 examples/example_decorator.py diff --git a/examples/example_decorator.py b/examples/example_decorator.py new file mode 100644 index 0000000..cb7ebb0 --- /dev/null +++ b/examples/example_decorator.py @@ -0,0 +1,35 @@ +import os +import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from meesee import startapp +from meesee import Meesee + +config = { + "namespace": "removeme", + "key": "tasks", + "redis_config": {}, + "maxsize": 100, + "timeout": 1, +} + + +@Meesee.worker() +def func_a(item, worker_id): + print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item)) + + +@Meesee.worker() +def func_b(item, worker_id): + print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item)) + + +@Meesee.worker() +def func_c(item, worker_id): + print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item)) + + +if __name__ == '__main__': + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 + Meesee.start_workers(workers=workers, config=config) diff --git a/examples/example_multi_func.py b/examples/example_multi_func.py index 6343716..8874f2a 100644 --- a/examples/example_multi_func.py +++ b/examples/example_multi_func.py @@ -1,4 +1,8 @@ +import os import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from meesee import startapp config = { diff --git a/examples/example_produce.py b/examples/example_produce.py index 92b2617..63cf72e 100644 --- a/examples/example_produce.py +++ b/examples/example_produce.py @@ -1,4 +1,8 @@ +import os import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from meesee import RedisQueue config = { diff --git a/meesee.py b/meesee.py index e7d4969..42c20df 100644 --- a/meesee.py +++ b/meesee.py @@ -15,6 +15,7 @@ class RedisQueue: + def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None): # TCP check if connection is alive # redis_config.setdefault('socket_timeout', 30) @@ -88,6 +89,27 @@ def __len__(self): return self.r.llen(self.list_key) +class Meesee: + worker_funcs = [] + + @classmethod + def worker(cls): + def decorator(func): + cls.worker_funcs.append(func) + return func + return decorator + + @classmethod + def start_workers(cls, workers=10, config=config): + n_workers = len(cls.worker_funcs) + if n_workers == 0: + print("No workers have been assigned with a decorator") + if n_workers > workers: + print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}") + workers = n_workers + startapp(cls.worker_funcs, workers=workers, config=config) + + class InitFail(Exception): pass @@ -143,8 +165,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=None, init_kwargs={}): with Pool(workers) as p: - args = ((func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) - for worker_id in range(1, workers + 1)) + args = [(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) + for worker_id in range(1, workers + 1)] try: p.starmap(run_worker, args) except (KeyboardInterrupt, SystemExit):