Skip to content

Commit

Permalink
Added start of decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
Attumm committed Aug 1, 2024
1 parent 9158174 commit 3d9f345
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
35 changes: 35 additions & 0 deletions examples/example_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import startapp
from meesee import Meesee

config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100,
"timeout": 1,
}


@Meesee.worker()
def func_a(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item))


@Meesee.worker()
def func_b(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item))


@Meesee.worker()
def func_c(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item))


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
Meesee.start_workers(workers=workers, config=config)
4 changes: 4 additions & 0 deletions examples/example_multi_func.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import startapp

config = {
Expand Down
4 changes: 4 additions & 0 deletions examples/example_produce.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import RedisQueue

config = {
Expand Down
26 changes: 24 additions & 2 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


class RedisQueue:

def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None):
# TCP check if connection is alive
# redis_config.setdefault('socket_timeout', 30)
Expand Down Expand Up @@ -88,6 +89,27 @@ def __len__(self):
return self.r.llen(self.list_key)


class Meesee:
worker_funcs = []

@classmethod
def worker(cls):
def decorator(func):
cls.worker_funcs.append(func)
return func
return decorator

@classmethod
def start_workers(cls, workers=10, config=config):
n_workers = len(cls.worker_funcs)
if n_workers == 0:
print("No workers have been assigned with a decorator")
if n_workers > workers:
print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}")
workers = n_workers
startapp(cls.worker_funcs, workers=workers, config=config)


class InitFail(Exception):
pass

Expand Down Expand Up @@ -143,8 +165,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg

def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=None, init_kwargs={}):
with Pool(workers) as p:
args = ((func, func_kwargs, on_failure_func, config, worker_id, init_kwargs)
for worker_id in range(1, workers + 1))
args = [(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs)
for worker_id in range(1, workers + 1)]
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
Expand Down

0 comments on commit 3d9f345

Please sign in to comment.