Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ci #16

Merged
merged 4 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: CI

on:
push:
branches:
- "**"
pull_request:
branches:
- "**"

permissions:
contents: read

jobs:
build:
runs-on: ${{ matrix.os }}
services:
redis:
image: redis
ports:
- 6379:6379
strategy:
matrix:
os: [ubuntu-latest]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install --upgrade pip setuptools
pip install -r requirements.txt

- name: Install test dependencies
run: |
pip install -r test-requirements.txt

- name: Run Pylama
run: |
pylama * -i E501
# - name: Run MyPy strict
# run: |
# mypy --strict
- name: Run Unit Tests
run: |
python -m unittest discover
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Meesee
[![Build Status](https://travis-ci.com/Attumm/meesee.svg?branch=main)](https://travis-ci.com/Attumm/meesee)
[![CI](https://github.com/Attumm/meesee/actions/workflows/ci.yml/badge.svg)](https://github.com/Attumm/meesee/actions/workflows/ci.yml)

Task queue, Long lived workers process parallelization, with Redis as backend.
The project is used in production by three different companies.
Expand Down
2 changes: 1 addition & 1 deletion examples/example_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ def my_func(item, worker_id):


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(my_func, workers=workers, config=config)
3 changes: 1 addition & 2 deletions examples/example_failure_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ def my_failure_func(item, exception, r_instance, worker_id):
print('failure callback', item, str(exception), worker_id)
int_item = int(item.decode('utf-8'))
# handle item, and resend to queue with item minus one
r_instance.send(int_item-1)
r_instance.send(int_item - 1)


if __name__ == "__main__":
produce(10)
startapp(my_func, workers=10, config=config, on_failure_func=my_failure_func)

4 changes: 3 additions & 1 deletion examples/example_health_check.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import smtplib
from email.message import EmailMessage

from meesee import RedisQueue


def send_email(msg, to, from_, subject='default'):
msg_email = EmailMessage()
Expand All @@ -15,14 +17,14 @@ def send_email(msg, to, from_, subject='default'):
s.quit()
return True


config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100
}

from meesee import RedisQueue

# Note that max_allowed should be alteast one less then maxsize of the config
max_allowed = 10000
Expand Down
7 changes: 6 additions & 1 deletion examples/example_multi_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,31 @@
"timeout": 1,
}]


def func_a(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item))


def func_b(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item))


def func_c(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item))


funcs = [func_a, func_b, func_c]


def produce(items, configs):
r = RedisQueue(**configs[0])
for config in configs:
r.set_list_key(key=config['key'], namespace=config['namespace'])
for _ in range(items):
r.send(config['key'])


if __name__ == '__main__':
produce(3, configs)
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(funcs, workers=workers, config=configs)
4 changes: 3 additions & 1 deletion examples/example_multi_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
def func_a(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item))


def func_b(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item))


def func_c(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item))


funcs = [func_a, func_b, func_c]

if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(funcs, workers=workers, config=config)
2 changes: 1 addition & 1 deletion examples/example_produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ def produce(items):


if __name__ == "__main__":
amount = int(sys.argv[sys.argv.index('-p')+1]) if '-p' in sys.argv else 10
amount = int(sys.argv[sys.argv.index('-p') + 1]) if '-p' in sys.argv else 10
produce(amount)
5 changes: 2 additions & 3 deletions examples/example_sys_signal_doesnt_add_item.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from meesee import RedisQueue
from meesee import startapp

Expand All @@ -22,13 +21,13 @@ def my_func(item, worker_id):
print('regression: item is None')
print('got item {}'.format(locals()))


def raise_sys_exit(item, worker_id):
print('raise sys_exit')
raise SystemExit
raise SystemExit


if __name__ == "__main__":
produce(1)
startapp(raise_sys_exit, workers=1, config=config)
startapp(my_func, workers=1, config=config)

2 changes: 1 addition & 1 deletion examples/example_timout_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ def my_func(item, worker_id):


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(my_func, workers=workers, config=config)
13 changes: 7 additions & 6 deletions examples/example_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from meesee import RedisQueue, startapp

redis_config = {
"host": '127.0.0.1',
"host": "127.0.0.1",
"port": 6380,
"ssl": True,
"ssl_keyfile":'test_redis_key.pem',
"ssl_certfile":'test_redis_cert.pem',
"ssl_cert_reqs":'required',
"ssl_ca_certs":'test_redis_cert.pem'
"ssl_keyfile": "test_redis_key.pem",
"ssl_certfile": "test_redis_cert.pem",
"ssl_cert_reqs": "required",
"ssl_ca_certs": "test_redis_cert.pem",
}


Expand All @@ -28,7 +28,8 @@ def produce(items):


def my_func(item, worker_id):
print('worker: {worker_id} hello, look at me, msg: {item}'.format(worker_id=worker_id, item=item))
print("worker: {worker_id} hello, look at me, msg: {item}".format(worker_id=worker_id, item=item))


if __name__ == "__main__":
# Create self-signed certs
Expand Down
14 changes: 7 additions & 7 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

class RedisQueue:
def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None):
# TCP check if connection is alive, Sane defaults
redis_config.setdefault('socket_timeout', 30)
redis_config.setdefault('socket_keepalive', True)
# TCP check if connection is alive
# redis_config.setdefault('socket_timeout', 30)
# redis_config.setdefault('socket_keepalive', True)
# Ping check if connection is alive
# redis_config.setdefault('health_check_interval', 30)
self.r = redis.Redis(**redis_config)
Expand Down Expand Up @@ -105,7 +105,7 @@ def setup_init_items(func_kwargs, init_kwargs):
return {name: func_kwargs[name] for name in init_kwargs.keys()}


def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs):
def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs): # noqa:C901
if isinstance(func, list):
func = func[worker_id % len(func)]
if isinstance(config, list):
Expand Down Expand Up @@ -148,7 +148,7 @@ def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=No
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('Starting Graceful exit\n')
p.close()
p.join()
sys.stdout.write('Starting Graceful exit\n')
p.close()
p.join()
sys.stdout.write('Clean shut down\n')
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pylama==8.4.1
37 changes: 18 additions & 19 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def produce(amount):
r = RedisQueue(**example_config)
for i in range(1, amount+1):
for i in range(1, amount + 1):
r.send(i)


Expand Down Expand Up @@ -54,9 +54,9 @@ def tearDown(self):
def test_incr_key_equals_produces_single_worker(self):
expected = 100
produce(expected)

key = 'test:amount'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(increment_by_one, workers=1, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -67,22 +67,22 @@ def test_incr_key_equals_produces_single_worker(self):
def test_incr_key_equals_produces_five_workers(self):
expected = 100
produce(expected)

key = 'test:amount'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(increment_by_one, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)

result = int(redis_instance.get(key))
self.assertEqual(result, expected)

def test_incr_key_equals_produces_multiple_workers(self):
expected = 123
expected = 123
produce(expected)

key = 'test:amount'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(increment_by_one, workers=7, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -93,13 +93,13 @@ def test_incr_key_equals_produces_multiple_workers(self):
def test_all_workers_are_present(self):
expected = 268
expected_workers_amount = 5
expected_workers = {i for i in range(1, expected_workers_amount+1)}
expected_workers = {i for i in range(1, expected_workers_amount + 1)}

produce(expected)

key = 'test:amount'
key_workerids = 'test:workerids'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(incr_and_append_worker_id, workers=expected_workers_amount, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -109,9 +109,9 @@ def test_all_workers_are_present(self):

result_workers = redis_instance.lrange(key_workerids, 0, -1)
result_workers_set = {int(i) for i in sorted(result_workers)}

self.assertEqual(result_workers_set, expected_workers)


def append_item(item, worker_id, key, r, test_result_key):
r.r.lpush(test_result_key, item)
Expand All @@ -126,10 +126,10 @@ def tearDown(self):
def test_items_send_are_handled_single_worker(self):
expected = ['1', '2', '3']
produce_items(expected)

key = 'test:items'
result_key = 'test:result'
kwargs = {'key':key, 'r': RedisQueue, 'test_result_key': result_key}
kwargs = {'key': key, 'r': RedisQueue, 'test_result_key': result_key}
init_kwargs = {'r': example_config}

startapp(append_item, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -140,10 +140,10 @@ def test_items_send_are_handled_single_worker(self):
def test_items_send_are_handled_multiple_worker(self):
expected = ['1', '2', '3']
produce_items(expected)

key = 'test:items'
result_key = 'test:result'
kwargs = {'key':key, 'r': RedisQueue, 'test_result_key': result_key}
kwargs = {'key': key, 'r': RedisQueue, 'test_result_key': result_key}
init_kwargs = {'r': example_config}

startapp(append_item, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand Down Expand Up @@ -178,4 +178,3 @@ def test_handle_sys(self):

if __name__ == '__main__':
unittest.main()