Skip to content

Commit

Permalink
chore: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdanp committed May 30, 2017
0 parents commit 937bd3c
Show file tree
Hide file tree
Showing 19 changed files with 708 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/.cache
/.coverage
/htmlcov
__pycache__
15 changes: 15 additions & 0 deletions LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Dramatiq - Simple task queueing for Python
Copyright (C) 2017 CLEARTYPE SRL

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
7 changes: 7 additions & 0 deletions dramatiq/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .actor import Actor, actor # noqa
from .broker import Broker, BrokerError, ActorNotFound, QueueNotFound, get_broker, set_broker # noqa
from .message import Message # noqa
from .middleware import Middleware # noqa
from .worker import Worker # noqa

__version__ = "0.1.0"
82 changes: 82 additions & 0 deletions dramatiq/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from .broker import get_broker
from .message import Message


def actor(fn, *, queue_name="default", actor_name=None, broker=None):
"""Declare an Actor.
Parameters:
fn(callable)
queue_name(str)
actor_name(str)
broker(Broker)
Returns:
Actor
"""
actor_name = actor_name or f"{fn.__module__}.{fn.__name__}"
broker = broker or get_broker()
return Actor(fn, queue_name=queue_name, actor_name=actor_name, broker=broker)


class Actor:
def __init__(self, fn, *, broker, queue_name, actor_name):
self.fn = fn
self.broker = broker
self.queue_name = queue_name
self.actor_name = actor_name
self.broker.declare_actor(self)

def send(self, *args, **kwargs):
"""Asynchronously send a message to this actor.
Note:
All arguments must be JSON-encodable.
Parameters:
\*args(tuple): Positional arguments to send to the actor.
\**kwargs(dict): Keyword arguments to send to the actor.
Returns:
Message: The enqueued message.
"""
return self.send_with_options(args, kwargs)

def send_with_options(self, args, kwargs, **options):
"""Asynchronously send a message to this actor, along with an
arbitrary set of processing options for the broker and
middleware.
Parameters:
args(tuple): Positional arguments that are passed to the actor.
kwargs(dict): Keyword arguments that are passed to the actor.
\**options(dict): Arbitrary options that are passed to the
broker and any registered middleware.
Returns:
Message: The enqueued message.
"""
message = Message(
queue_name=self.queue_name,
actor_name=self.actor_name,
args=args, kwargs=kwargs,
options=options,
)

self.broker.enqueue(message)
return message

def __call__(self, *args, **kwargs):
"""Synchronously call this actor.
Parameters:
\*args: Positional arguments to send to the actor.
\**kwargs: Keyword arguments to send to the actor.
Returns:
Whatever the underlying function backing this actor returns.
"""
return self.fn(*args, **kwargs)

def __str__(self):
return f"Actor({self.fn!r}, queue_name={self.queue_name!r}, actor_name={self.actor_name!r})"
174 changes: 174 additions & 0 deletions dramatiq/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import logging

#: The global broker instance.
global_broker = None


def get_broker():
"""Get the global broker instance.
"""
global global_broker
if global_broker is None:
from .brokers import RabbitmqBroker
set_broker(RabbitmqBroker())
return global_broker


def set_broker(broker):
"""Configure the global broker instance.
"""
global global_broker
global_broker = broker


class Broker:
"""Base class for broker implementations.
Parameters:
middleware(list[Middleware]): The set of middleware that apply
to this broker.
"""

def __init__(self, middleware=None):
self.logger = logging.getLogger("Broker")
self.middleware = middleware or []

def _emit_before(self, signal, *args, **kwargs):
for middleware in self.middleware:
getattr(middleware, f"before_{signal}")(*args, **kwargs)

def _emit_after(self, signal, *args, **kwargs):
for middleware in reversed(self.middleware):
getattr(middleware, f"after_{signal}")(*args, **kwargs)

def add_middleware(self, middleware):
"""Add a middleware object to this broker.
Parameters:
middleware(Middleware)
"""
self.middleware.append(middleware)

def acknowledge(self, queue_name, ack_id): # pragma: no cover
"""Acknowledge that a message is done processing.
Raises:
QueueNotFound: If the given queue was never declared.
Parameters:
queue_name(str): The name of the queue the message was received on.
ack_id(str): The acknowledgement nonce for a particular message.
"""
raise NotImplementedError

def declare_actor(self, actor): # pragma: no cover
"""Declare a new actor on this broker. Declaring an Actor
twice replaces the first actor with the second by name.
Parameters:
actor(Actor)
"""
raise NotImplementedError

def declare_queue(self, queue_name): # pragma: no cover
"""Declare a queue on this broker. This method must be
idempotent.
Parameters:
queue_name(str)
"""
raise NotImplementedError

def enqueue(self, message): # pragma: no cover
"""Enqueue a message on this broker.
Parameters:
message(Message)
"""
raise NotImplementedError

def get_actor(self, actor_name): # pragma: no cover
"""Look up an actor by its name.
Raises:
ActorNotFound: If the actor was never declared.
Returns:
Actor: The actor.
"""
raise NotImplementedError

def get_consumer(self, queue_name, on_message): # pragma: no cover
"""Get an object that consumes messages from the queue and
calls on_message for every message that it finds.
Raises:
QueueNotFound: If the given queue was never declared.
Parameters:
queue_name(str): The name of the queue to consume messages off of.
on_message(callable): A function to be called whenever a
message is received. The function must take two parameters:
a Message object and an ack_id.
Returns:
Consumer: A consumer object.
"""
raise NotImplementedError

def get_declared_queues(self): # pragma: no cover
"""Returns a list of all the named queues declared on this broker.
"""
raise NotImplementedError

def process_message(self, message, ack_id):
"""Process a message and then acknowledge it.
Parameters:
message(Message)
ack_id(str)
"""
try:
self._emit_before("process_message", message)
actor = self.get_actor(message.actor_name)
res = actor(*message.args, **message.kwargs)
self._emit_after("process_message", message, result=res)

except BaseException as e:
self._emit_after("process_message", message, exception=e)

finally:
self.logger.info("Acknowledging message %r with %r.", message, ack_id)
self._emit_before("acknowledge", message, ack_id)
self.acknowledge(message.queue_name, ack_id)
self._emit_after("acknowledge", message, ack_id)


class Consumer:
"""Base class for consumer objects.
"""

def start(self): # pragma: no cover
"""Start this consumer.
"""
raise NotImplementedError

def stop(self): # pragma: no cover
"""Stop this consumer.
"""
raise NotImplementedError


class BrokerError(Exception):
"""Base class for broker-related errors.
"""


class ActorNotFound(BrokerError):
"""Raised when a message is sent to an actor that hasn't been declared.
"""


class QueueNotFound(BrokerError):
"""Raised when a message is sent to an queue that hasn't been declared.
"""
2 changes: 2 additions & 0 deletions dramatiq/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .rabbitmq import RabbitmqBroker # noqa
from .stub import StubBroker # noqa
6 changes: 6 additions & 0 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from ..broker import Broker


class RabbitmqBroker(Broker):
"""A broker that can be used with RabbitMQ.
"""
93 changes: 93 additions & 0 deletions dramatiq/brokers/stub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import uuid

from queue import Queue, Empty

from ..broker import Broker, Consumer, ActorNotFound, QueueNotFound
from ..message import Message


class StubBroker(Broker):
"""A broker that can be used within unit tests.
"""

def __init__(self, middleware=None):
super().__init__(middleware=middleware)

self.actors = {}
self.queues = {}

def acknowledge(self, queue_name, ack_id):
try:
self._emit_before("acknowledge", queue_name, ack_id)
queue = self.queues[queue_name]
queue.task_done()
self._emit_after("acknowledge", queue_name, ack_id)
except KeyError:
raise QueueNotFound(queue_name)

def declare_actor(self, actor):
self._emit_before("declare_actor", actor)
self.declare_queue(actor.queue_name)
self.actors[actor.actor_name] = actor
self._emit_after("declare_actor", actor)

def declare_queue(self, queue_name):
if queue_name not in self.queues:
self._emit_before("declare_queue", queue_name)
self.queues[queue_name] = Queue()
self._emit_after("declare_queue", queue_name)

def enqueue(self, message):
self._emit_before("enqueue", message)
self.queues[message.queue_name].put(message.encode())
self._emit_after("enqueue", message)

def get_actor(self, actor_name):
try:
return self.actors[actor_name]
except KeyError:
raise ActorNotFound(actor_name)

def get_consumer(self, queue_name, on_message):
try:
queue = self.queues[queue_name]
return StubConsumer(queue, on_message)
except KeyError:
raise QueueNotFound(queue_name)

def get_declared_queues(self):
return self.queues.keys()

def join(self, queue_name):
"""Wait for all the messages on the given queue to be processed.
Raises:
QueueNotFound: If the given queue was never declared.
Parameters:
queue_name(str)
"""
try:
self.queues[queue_name].join()
except KeyError:
raise QueueNotFound(queue_name)


class StubConsumer(Consumer):
def __init__(self, queue, on_message):
self.running = False
self.queue = queue
self.on_message = on_message

def start(self):
self.running = True
while self.running:
try:
data = self.queue.get(timeout=5)
message = Message.decode(data)
self.on_message(message, str(uuid.uuid4()))
except Empty:
pass

def stop(self):
self.running = False
Loading

0 comments on commit 937bd3c

Please sign in to comment.