diff --git a/examples/example_decorator_magic.py b/examples/example_decorator_magic.py index 279f42d..56d2136 100644 --- a/examples/example_decorator_magic.py +++ b/examples/example_decorator_magic.py @@ -8,8 +8,7 @@ config = { "namespace": "removeme", - "key": "tasks", - "redis_config": {}, + "key": "tasks", "redis_config": {}, "maxsize": 100, "timeout": 1, } @@ -38,13 +37,19 @@ def produce_some_items(amount): @box.produce() -def produce_to_foobar(items): +def produce_to_foo(items): return items +@box.worker_producer(input_queue="foo", output_queue="foobar") +def foo(item, worker_id): + print(f"{worker_id} {item} foo pass it too foobar") + return [item,] + + if __name__ == '__main__': workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 produce_some_items(10) items = [{"name": f"name{i}"} for i in range(10)] - produce_to_foobar(items) + produce_to_foo(items) box.push_button(workers, wait=1) diff --git a/meesee.py b/meesee.py index aa2caa5..069c075 100644 --- a/meesee.py +++ b/meesee.py @@ -6,6 +6,8 @@ from multiprocessing import Pool +from functools import wraps + config = { "namespace": "main", "key": "tasks", @@ -92,16 +94,57 @@ def __len__(self): class Meesee: worker_funcs = {} - def __init__(self, workers=10, namespace="main", timeout=None, redis_config={}): + def __init__(self, workers=10, namespace="main", timeout=None, queue="main", redis_config={}): self.workers = workers self.namespace = namespace self.timeout = timeout + self.queue = queue self.redis_config = redis_config - @classmethod - def produce(cls, queue=None): + def create_produce_config(self): + return { + "key": self.queue, + "namespace": self.namespace, + "redis_config": self.redis_config, + } + + def worker_producer(self, input_queue=None, output_queue=None): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Producer logic + config = self.create_produce_config() + if output_queue: + config["key"] = output_queue + elif "produce_to_" in func.__name__: + config["key"] = func.__name__[len("produce_to_"):] + + redis_queue = RedisQueue(**config) + result = func(*args, **kwargs) + + if isinstance(result, (list, tuple)): + for item in result: + if isinstance(item, (list, dict)): + item = json.dumps(item) + redis_queue.send(item) + elif result is not None: + if isinstance(result, (list, dict)): + result = json.dumps(result) + redis_queue.send(result) + + return result + + # Worker registration + parsed_name = input_queue if input_queue is not None else self.parse_func_name(func) + self.worker_funcs[parsed_name] = wrapper + + return wrapper + return decorator + + def produce(self, queue=None): def decorator(func): def wrapper(*args, **kwargs): + config = self.create_produce_config() if queue: config["key"] = queue if "produce_to_" in func.__name__: @@ -185,7 +228,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg try: func_kwargs = init_add(func_kwargs, init_items, init_kwargs) r = RedisQueue(**config) # TODO rename r - sys.stdout.write('worker {worker_id} started\n'.format(worker_id=worker_id)) + sys.stdout.write('worker {worker_id} started. {func_name} listening to {queue} \n'.format( + worker_id=worker_id, func_name=func.__name__, queue=config["key"])) for key_name, item in r: _, item = func(item.decode('utf-8'), worker_id, **func_kwargs), None except InitFail: