Skip to content

Commit

Permalink
Added python magic for decorators
Browse files Browse the repository at this point in the history
  • Loading branch information
Attumm committed Aug 2, 2024
1 parent 62f71e7 commit 14508f8
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
50 changes: 50 additions & 0 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import Meesee # noqa: E402


config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100,
"timeout": 1,
}

box = Meesee(config)


@box.worker()
def foobar(item, worker_id):
print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker()
def name_of_the_function(item, worker_id):
print('func: name_of_the_function, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker(queue="passed_name")
def passed_name_not_this_one(item, worker_id):
print('func: passed_name_not_this_one, worker_id: {}, item: {}'.format(worker_id, item))


@box.produce(queue="passed_name")
def produce_some_items(amount):
yield from range(amount)


@box.produce()
def produce_to_foobar(items):
return items


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
produce_some_items(10)
items = [{"name": f"name{i}"} for i in range(10)]
produce_to_foobar(items)
box.push_button(workers, wait=1)
56 changes: 51 additions & 5 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from multiprocessing import Pool

config = {
"namespace": "removeme",
"namespace": "main",
"key": "tasks",
"redis_config": {},
"maxsize": 1000,
Expand Down Expand Up @@ -90,12 +90,41 @@ def __len__(self):


class Meesee:
worker_funcs = []
worker_funcs = {}

def __init__(self, workers=10, namespace="main", timeout=None, redis_config={}):
self.workers = workers
self.namespace = namespace
self.timeout = timeout
self.redis_config = redis_config

@classmethod
def worker(cls):
def produce(cls, queue=None):
def decorator(func):
cls.worker_funcs.append(func)
def wrapper(*args, **kwargs):
if queue:
config["key"] = queue
if "produce_to_" in func.__name__:
config["key"] = func.__name__[len("produce_to_"):]
redis_queue = RedisQueue(**config)

for item in func(*args, **kwargs):
if isinstance(item, (list, dict)):
item = json.dumps(item)
redis_queue.send(item)

return wrapper
return decorator

@staticmethod
def parse_func_name(func):
return func.__name__

@classmethod
def worker(cls, queue=None):
def decorator(func):
parsed_name = queue if queue is not None else cls.parse_func_name(func)
cls.worker_funcs[parsed_name] = func
return func
return decorator

Expand All @@ -107,7 +136,24 @@ def start_workers(cls, workers=10, config=config):
if n_workers > workers:
print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}")
workers = n_workers
startapp(cls.worker_funcs, workers=workers, config=config)

startapp(list(cls.worker_funcs.values()), workers=workers, config=config)

def push_button(self, workers=None, wait=None):
if workers is not None:
self.workers = workers
configs = [
{
"key": queue,
"namespace": self.namespace,
"redis_config": self.redis_config,
} for queue in self.__class__.worker_funcs.keys()
]
if self.timeout is not None or wait is not None:
for config in configs:
config["timeout"] = self.timeout or wait

startapp(list(self.__class__.worker_funcs.values()), workers=self.workers, config=configs)


class InitFail(Exception):
Expand Down

0 comments on commit 14508f8

Please sign in to comment.