Skip to content

Commit

Permalink
Added producer_worker and changed config setup
Browse files Browse the repository at this point in the history
  • Loading branch information
Attumm committed Aug 3, 2024
1 parent 14508f8 commit 185bdf6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 8 deletions.
13 changes: 9 additions & 4 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"key": "tasks", "redis_config": {},
"maxsize": 100,
"timeout": 1,
}
Expand Down Expand Up @@ -38,13 +37,19 @@ def produce_some_items(amount):


@box.produce()
def produce_to_foobar(items):
def produce_to_foo(items):
return items


@box.worker_producer(input_queue="foo", output_queue="foobar")
def foo(item, worker_id):
print(f"{worker_id} {item} foo pass it too foobar")
return [item,]


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
produce_some_items(10)
items = [{"name": f"name{i}"} for i in range(10)]
produce_to_foobar(items)
produce_to_foo(items)
box.push_button(workers, wait=1)
52 changes: 48 additions & 4 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from multiprocessing import Pool

from functools import wraps

config = {
"namespace": "main",
"key": "tasks",
Expand Down Expand Up @@ -92,16 +94,57 @@ def __len__(self):
class Meesee:
worker_funcs = {}

def __init__(self, workers=10, namespace="main", timeout=None, redis_config={}):
def __init__(self, workers=10, namespace="main", timeout=None, queue="main", redis_config={}):
self.workers = workers
self.namespace = namespace
self.timeout = timeout
self.queue = queue
self.redis_config = redis_config

@classmethod
def produce(cls, queue=None):
def create_produce_config(self):
return {
"key": self.queue,
"namespace": self.namespace,
"redis_config": self.redis_config,
}

def worker_producer(self, input_queue=None, output_queue=None):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Producer logic
config = self.create_produce_config()
if output_queue:
config["key"] = output_queue
elif "produce_to_" in func.__name__:
config["key"] = func.__name__[len("produce_to_"):]

redis_queue = RedisQueue(**config)
result = func(*args, **kwargs)

if isinstance(result, (list, tuple)):
for item in result:
if isinstance(item, (list, dict)):
item = json.dumps(item)
redis_queue.send(item)
elif result is not None:
if isinstance(result, (list, dict)):
result = json.dumps(result)
redis_queue.send(result)

return result

# Worker registration
parsed_name = input_queue if input_queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = wrapper

return wrapper
return decorator

def produce(self, queue=None):
def decorator(func):
def wrapper(*args, **kwargs):
config = self.create_produce_config()
if queue:
config["key"] = queue
if "produce_to_" in func.__name__:
Expand Down Expand Up @@ -185,7 +228,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg
try:
func_kwargs = init_add(func_kwargs, init_items, init_kwargs)
r = RedisQueue(**config) # TODO rename r
sys.stdout.write('worker {worker_id} started\n'.format(worker_id=worker_id))
sys.stdout.write('worker {worker_id} started. {func_name} listening to {queue} \n'.format(
worker_id=worker_id, func_name=func.__name__, queue=config["key"]))
for key_name, item in r:
_, item = func(item.decode('utf-8'), worker_id, **func_kwargs), None
except InitFail:
Expand Down

0 comments on commit 185bdf6

Please sign in to comment.