Skip to content

Commit

Permalink
Merge pull request #25 from Attumm/v1.6.0
Browse files Browse the repository at this point in the history
bumped version v1.6.0
  • Loading branch information
Attumm authored Aug 16, 2024
2 parents a59992e + 342e889 commit 2986827
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 36 deletions.
12 changes: 2 additions & 10 deletions examples/example_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@

from meesee import Meesee # noqa: E402

config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100,
"timeout": 1,
}

box = Meesee(config)
box = Meesee()


@box.worker()
Expand All @@ -33,4 +25,4 @@ def func_c(item, worker_id):

if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
box.start_workers(workers=workers, config=config)
box.push_button(workers=workers)
9 changes: 1 addition & 8 deletions examples/example_decorator_magic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,7 @@
from meesee import Meesee # noqa: E402


config = {
"namespace": "removeme",
"key": "tasks", "redis_config": {},
"maxsize": 100,
"timeout": 1,
}

box = Meesee(config)
box = Meesee()


@box.worker()
Expand Down
44 changes: 44 additions & 0 deletions examples/example_decorator_produce_to_multiple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
import sys

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from meesee import Meesee # noqa: E402

box = Meesee()


@box.produce_to()
def produce_multi(items):
return items


@box.worker()
def foo1(item, worker_id):
print(f"{worker_id} {item} foo1")
return [item,]


@box.worker()
def foo2(item, worker_id):
print(f"{worker_id} {item} foo2")
return [item,]


@box.worker()
def foo3(item, worker_id):
print(f"{worker_id} {item} foo3")
return [item,]


if __name__ == '__main__':
items = [
("foo1", "item1"),
("foo2", "item2"),
("foo3", "item3"),
("foo1", "item4"),
("foo2", "item5"),
("foo3", "item6"),
]
produce_multi(items)
box.push_button(wait=1)
65 changes: 58 additions & 7 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(self, workers=10, namespace="main", timeout=None, queue="main", red
self.timeout = timeout
self.queue = queue
self.redis_config = redis_config
self.worker_funcs = {}
self._worker_funcs = {}

def create_produce_config(self):
return {
Expand Down Expand Up @@ -135,7 +135,7 @@ def wrapper(*args, **kwargs):

return result
parsed_name = input_queue if input_queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = wrapper
self._worker_funcs[parsed_name] = wrapper

return wrapper
return decorator
Expand All @@ -158,25 +158,76 @@ def wrapper(*args, **kwargs):
return wrapper
return decorator

def produce_to(self):
"""
Produce items to be sent to specific queues.
Send items to its corresponding queue using a RedisQueue.
The decorated function should yield tuples of (queue_name, item_value).
Example:
@box.produce_to()
def produce_multi(items):
return items
items = [
("foo1", "item1"),
("foo2", "item2"),
("foo3", "item3"),
("foo1", "item4"),
("foo2", "item5"),
("foo3", "item6"),
]
produce_multi(items)
In this example:
- Each tuple in the `items` list represents a (queue, value) pair.
- The first element of each tuple ("foo1", "foo2", "foo3") is the queue name.
- The second element of each tuple ("item1", "item2", etc.) is the value to be sent to the queue.
The decorator will process these items as follows:
1. "item1" will be sent to the "foo1" queue
2. "item2" will be sent to the "foo2" queue
3. "item3" will be sent to the "foo3" queue
4. "item4" will be sent to the "foo1" queue
5. "item5" will be sent to the "foo2" queue
6. "item6" will be sent to the "foo3" queue
Notes:
- If an item is a list or dict, it will be JSON-encoded before being sent to the queue.
"""
def decorator(func):
def wrapper(*args, **kwargs):
config = self.create_produce_config()
redis_queue = RedisQueue(**config)

for queue, item in func(*args, **kwargs):
if isinstance(item, (list, dict)):
item = json.dumps(item)
redis_queue.send_to(queue, item)

return wrapper
return decorator

def parse_func_name(self, func):
return func.__name__

def worker(self, queue=None):
def decorator(func):
parsed_name = queue if queue is not None else self.parse_func_name(func)
self.worker_funcs[parsed_name] = func
self._worker_funcs[parsed_name] = func
return func
return decorator

def start_workers(self, workers=10, config=config):
n_workers = len(self.worker_funcs)
n_workers = len(self._worker_funcs)
if n_workers == 0:
sys.stdout.write("No workers have been assigned with a decorator\n")
if n_workers > workers:
sys.stdout.write(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}\n")
workers = n_workers

startapp(list(self.worker_funcs.values()), workers=workers, config=config)
startapp(list(self._worker_funcs.values()), workers=workers, config=config)

def push_button(self, workers=None, wait=None):
if workers is not None:
Expand All @@ -186,13 +237,13 @@ def push_button(self, workers=None, wait=None):
"key": queue,
"namespace": self.namespace,
"redis_config": self.redis_config,
} for queue in self.worker_funcs.keys()
} for queue in self._worker_funcs.keys()
]
if self.timeout is not None or wait is not None:
for config in configs:
config["timeout"] = self.timeout or wait

startapp(list(self.worker_funcs.values()), workers=self.workers, config=configs)
startapp(list(self._worker_funcs.values()), workers=self.workers, config=configs)


class InitFail(Exception):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
long_description=long_description,
long_description_content_type='text/markdown',

version='1.5.0',
version='1.6.0',
py_modules=['meesee'],
install_requires=['redis==4.5.5'],
python_requires='>3.5',
Expand Down
93 changes: 83 additions & 10 deletions tests/mock_function_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ def test_func_none(input_data):

mock_redis_queue.assert_called()
mock_redis_queue.return_value.send.assert_called()
self.assertIn("foo", self.box.worker_funcs)
self.assertIn("bar", self.box.worker_funcs)
self.assertIn("produce_to_qux", self.box.worker_funcs)
self.assertIn("foo", self.box._worker_funcs)
self.assertIn("bar", self.box._worker_funcs)
self.assertIn("produce_to_qux", self.box._worker_funcs)

mock_redis_queue.return_value.send.assert_any_call(json.dumps({"key": "test_data"}))

Expand All @@ -76,7 +76,7 @@ def setUp(self):
@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_no_workers(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {}
self.box._worker_funcs = {}
self.box.start_workers()
mock_stdout_write.assert_called_once_with("No workers have been assigned with a decorator\n")
mock_startapp.assert_called_once_with(
Expand All @@ -88,38 +88,38 @@ def test_start_workers_no_workers(self, mock_stdout_write, mock_startapp):
@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_enough_workers(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock()}
self.box._worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock()}
self.box.start_workers(workers=3)
mock_stdout_write.assert_not_called()
mock_startapp.assert_called_once_with(
list(self.box.worker_funcs.values()),
list(self.box._worker_funcs.values()),
workers=3,
config=config,
)

@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_not_enough_workers(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock(), 'worker3': MagicMock()}
self.box._worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock(), 'worker3': MagicMock()}
self.box.start_workers(workers=2)
mock_stdout_write.assert_called_once_with(
"Not enough workers, increasing the workers started with: 2 we need atleast: 3\n"
)
mock_startapp.assert_called_once_with(
list(self.box.worker_funcs.values()),
list(self.box._worker_funcs.values()),
workers=3,
config=config,
)

@patch('meesee.startapp')
@patch('sys.stdout.write')
def test_start_workers_custom_config(self, mock_stdout_write, mock_startapp):
self.box.worker_funcs = {'worker1': MagicMock()}
self.box._worker_funcs = {'worker1': MagicMock()}
custom_config = {'custom': 'config'}
self.box.start_workers(workers=1, config=custom_config)
mock_stdout_write.assert_not_called()
mock_startapp.assert_called_once_with(
list(self.box.worker_funcs.values()),
list(self.box._worker_funcs.values()),
workers=1,
config=custom_config
)
Expand Down Expand Up @@ -364,5 +364,78 @@ def test_len(self):
self.assertEqual(len(self.queue), 5)


class TestProduceToDecorator(unittest.TestCase):
def setUp(self):
self.box = Meesee(workers=5, namespace="test", timeout=2)

@patch('meesee.RedisQueue')
def test_produce_to_decorator(self, mock_redis_queue):
# Mock the RedisQueue instance
mock_queue_instance = MagicMock()
mock_redis_queue.return_value = mock_queue_instance

# Define a function decorated with produce_to
@self.box.produce_to()
def produce_multi(items):
return items

# Test data
items = [
("foo1", "item1"),
("foo2", {"key": "item2"}),
("foo3", ["item3", "item3b"]),
("foo1", "item4"),
("foo2", "item5"),
("foo3", "item6"),
]

# Call the decorated function
produce_multi(items)

# Assertions
self.assertEqual(mock_redis_queue.call_count, 1)
self.assertEqual(mock_queue_instance.send_to.call_count, len(items))

# Check if send_to was called with correct arguments for each item
expected_calls = [
(("foo1", "item1")),
(("foo2", json.dumps({"key": "item2"}))),
(("foo3", json.dumps(["item3", "item3b"]))),
(("foo1", "item4")),
(("foo2", "item5")),
(("foo3", "item6")),
]

for (queue, item), result in zip(expected_calls, mock_queue_instance.send_to.call_args_list):
self.assertEqual(result[0][0], queue)
self.assertEqual(result[0][1], item)

@patch('meesee.RedisQueue')
def test_produce_to_with_custom_function(self, mock_redis_queue):
mock_queue_instance = MagicMock()
mock_redis_queue.return_value = mock_queue_instance

@self.box.produce_to()
def custom_produce():
yield "queue1", "item1"
yield "queue2", {"key": "item2"}
yield "queue3", ["item3", "item3b"]

custom_produce()

self.assertEqual(mock_redis_queue.call_count, 1)
self.assertEqual(mock_queue_instance.send_to.call_count, 3)

expected_calls = [
(("queue1", "item1")),
(("queue2", json.dumps({"key": "item2"}))),
(("queue3", json.dumps(["item3", "item3b"]))),
]

for (queue, item), result in zip(expected_calls, mock_queue_instance.send_to.call_args_list):
self.assertEqual(result[0][0], queue)
self.assertEqual(result[0][1], item)


if __name__ == '__main__':
unittest.main()

0 comments on commit 2986827

Please sign in to comment.