Skip to content

Commit

Permalink
Merge pull request #19 from Attumm/add_decorator_workers
Browse files Browse the repository at this point in the history
Add decorators flow with python magic
  • Loading branch information
Attumm authored Aug 10, 2024
2 parents 9158174 + daad105 commit 7e52960
Show file tree
Hide file tree
Showing 10 changed files with 736 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Run Unit Tests
run: |
coverage run -m unittest discover
coverage run --omit="*test*" -m unittest discover -p "*test*.py"
- name: Upload coverage reports to Codecov, send only once
if: matrix.python-version == '3.12'
Expand Down
109 changes: 106 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,118 @@ produce(10)

Great, the placement of both scripts can be on any machine with connectivity to the redis instance.

## Example Usage

Let's use Python to make writing workers and producers more fun.
Here's a simple example demonstrating how to use Meesee the pythonic way.

```python
from meesee import Meesee

box = Meesee()

@box.worker()
def foobar(item, worker_id):
print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item))

@box.produce()
def produce_to_foobar(items):
return items

if __name__ == '__main__':
items = [{"name": f"name{i}"} for i in range(10)]
produce_to_foobar(items)
box.push_button(workers=5, wait=1)
```

This example demonstrates:
1. Creating a Meesee instance
2. Defining a worker function using the `@box.worker()` decorator
3. Defining a producer function using the `@box.produce()` decorator
4. Producing items to the queue
5. Starting workers to process the items



Example output
```bash
worker 1 started. foobar listening to foobar
worker 2 started. foobar listening to foobar
worker 3 started. foobar listening to foobar
worker 4 started. foobar listening to foobar
func: foobar, worker_id: 1, item: {"name": "name0"}
func: foobar, worker_id: 1, item: {"name": "name1"}
worker 5 started. foobar listening to foobar
func: foobar, worker_id: 2, item: {"name": "name4"}
func: foobar, worker_id: 3, item: {"name": "name2"}
func: foobar, worker_id: 4, item: {"name": "name3"}
func: foobar, worker_id: 1, item: {"name": "name5"}
func: foobar, worker_id: 1, item: {"name": "name6"}
func: foobar, worker_id: 3, item: {"name": "name7"}
func: foobar, worker_id: 4, item: {"name": "name8"}
func: foobar, worker_id: 2, item: {"name": "name9"}
timeout reached worker 5 stopped
timeout reached worker 2 stopped
timeout reached worker 1 stopped
timeout reached worker 4 stopped
timeout reached worker 3 stopped
Clean shut down
```

This output shows:
- Workers starting and listening to the 'foobar' queue
- Items being processed by different workers
- Workers shutting down after the timeout is reached

## Usage explained


Producers produce to workers, hence the name. They can either pass iterable values or iter themselves. For instance:

```python
@box.produce()
def produce():
return [1, 2, 3]

# or

@box.produce()
def produce_yield():
yield from [1, 2, 3]
```

We can control which queue they will message to in two ways:

1. Specify the queue in the decorator:
```python
@box.produce(queue="foobar")
def produce_yield():
yield from [1, 2, 3]
```
This will produce to the "foobar" queue.

2. Use magic naming:
```python
@box.produce()
def produce_to_foobar():
yield from [1, 2, 3]
```
By naming our function `produce_to_foobar`, the function will also send the data to the "foobar" queue.

For workers, ours are special in that they will start during multiprocessing. Here's an example to start 5 workers. Since we only set up one worker, all workers will be of that type:

```python
box.push_button(workers=5, wait=1)
```

This will start 5 worker processes, each listening to the queue specified in the worker function.

### Installing

Create a virtualenv for your project.
Install meesee:

```
$ . /path/to/virtualenv/bin/activate
$ pip install meesee
$ pip install meesee
```

### Prerequisites
Expand Down
34 changes: 34 additions & 0 deletions examples/example_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import Meesee # noqa: E402

config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100,
"timeout": 1,
}


@Meesee.worker()
def func_a(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item))


@Meesee.worker()
def func_b(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item))


@Meesee.worker()
def func_c(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item))


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
Meesee.start_workers(workers=workers, config=config)
55 changes: 55 additions & 0 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import Meesee # noqa: E402


config = {
"namespace": "removeme",
"key": "tasks", "redis_config": {},
"maxsize": 100,
"timeout": 1,
}

box = Meesee(config)


@box.worker()
def foobar(item, worker_id):
print('func: foobar, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker()
def name_of_the_function(item, worker_id):
print('func: name_of_the_function, worker_id: {}, item: {}'.format(worker_id, item))


@box.worker(queue="passed_name")
def passed_name_not_this_one(item, worker_id):
print('func: passed_name_not_this_one, worker_id: {}, item: {}'.format(worker_id, item))


@box.produce(queue="passed_name")
def produce_some_items(amount):
yield from range(amount)


@box.produce()
def produce_to_foo(items):
return items


@box.worker_producer(input_queue="foo", output_queue="foobar")
def foo(item, worker_id):
print(f"{worker_id} {item} foo pass it too foobar")
return [item,]


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
produce_some_items(10)
items = [{"name": f"name{i}"} for i in range(10)]
produce_to_foo(items)
box.push_button(workers, wait=1)
7 changes: 6 additions & 1 deletion examples/example_multi_func.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import os
import sys
from meesee import startapp

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import startapp # noqa: E402


config = {
"namespace": "removeme",
Expand Down
6 changes: 5 additions & 1 deletion examples/example_produce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os
import sys
from meesee import RedisQueue

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import RedisQueue # noqa: E402

config = {
"namespace": "removeme",
Expand Down
116 changes: 114 additions & 2 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@

from multiprocessing import Pool

from functools import wraps

config = {
"namespace": "removeme",
"namespace": "main",
"key": "tasks",
"redis_config": {},
"maxsize": 1000,
}


class RedisQueue:

def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None):
# TCP check if connection is alive
# redis_config.setdefault('socket_timeout', 30)
Expand Down Expand Up @@ -88,6 +91,113 @@ def __len__(self):
return self.r.llen(self.list_key)


class Meesee:
worker_funcs = {}

def __init__(self, workers=10, namespace="main", timeout=None, queue="main", redis_config={}):
self.workers = workers
self.namespace = namespace
self.timeout = timeout
self.queue = queue
self.redis_config = redis_config

def create_produce_config(self):
return {
"key": self.queue,
"namespace": self.namespace,
"redis_config": self.redis_config,
}

def worker_producer(self, input_queue=None, output_queue=None):
def decorator(func):

@wraps(func)
def wrapper(*args, **kwargs):

config = self.create_produce_config()
if output_queue:
config["key"] = output_queue
elif "produce_to_" in func.__name__:
config["key"] = func.__name__[len("produce_to_"):]

redis_queue = RedisQueue(**config)
result = func(*args, **kwargs)

if isinstance(result, (list, tuple)):
for item in result:
if isinstance(item, (list, dict)):
item = json.dumps(item)
redis_queue.send(item)
elif result is not None:
if isinstance(result, (list, dict)):
result = json.dumps(result)
redis_queue.send(result)

return result
parsed_name = input_queue if input_queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = wrapper

return wrapper
return decorator

def produce(self, queue=None):
def decorator(func):
def wrapper(*args, **kwargs):
config = self.create_produce_config()
if queue:
config["key"] = queue
if "produce_to_" in func.__name__:
config["key"] = func.__name__[len("produce_to_"):]
redis_queue = RedisQueue(**config)

for item in func(*args, **kwargs):
if isinstance(item, (list, dict)):
item = json.dumps(item)
redis_queue.send(item)

return wrapper
return decorator

@staticmethod
def parse_func_name(func):
return func.__name__

@classmethod
def worker(cls, queue=None):
def decorator(func):
parsed_name = queue if queue is not None else cls.parse_func_name(func)
cls.worker_funcs[parsed_name] = func
return func
return decorator

@classmethod
def start_workers(cls, workers=10, config=config):
n_workers = len(cls.worker_funcs)
if n_workers == 0:
print("No workers have been assigned with a decorator")
if n_workers > workers:
print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}")
workers = n_workers

startapp(list(cls.worker_funcs.values()), workers=workers, config=config)

def push_button(self, workers=None, wait=None):
if workers is not None:
self.workers = workers
configs = [
{
"key": queue,
"namespace": self.namespace,
"redis_config": self.redis_config,
} for queue in self.__class__.worker_funcs.keys()
]
if self.timeout is not None or wait is not None:
for config in configs:
config["timeout"] = self.timeout or wait

startapp(list(self.__class__.worker_funcs.values()), workers=self.workers, config=configs)


class InitFail(Exception):
pass

Expand Down Expand Up @@ -117,7 +227,8 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg
try:
func_kwargs = init_add(func_kwargs, init_items, init_kwargs)
r = RedisQueue(**config) # TODO rename r
sys.stdout.write('worker {worker_id} started\n'.format(worker_id=worker_id))
sys.stdout.write('worker {worker_id} started. {func_name} listening to {queue} \n'.format(
worker_id=worker_id, func_name=func.__name__, queue=config["key"]))
for key_name, item in r:
_, item = func(item.decode('utf-8'), worker_id, **func_kwargs), None
except InitFail:
Expand All @@ -138,6 +249,7 @@ def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwarg
time.sleep(0.1) # Throttle restarting

if config.get('timeout') is not None:
sys.stdout.write('timeout reached worker {worker_id} stopped\n'.format(worker_id=worker_id))
break


Expand Down
Loading

0 comments on commit 7e52960

Please sign in to comment.