Skip to content

ghilesmeddour/faktory_worker_python

Repository files navigation

Faktory Client Python (faktory_worker_python)

This repository provides Python Client (Consumer and Producer) for the Faktory background job server.

                   +--------------------+
                   |                    |
                   |     Faktory        |
                   |     Server         |
          +---->>>>|                    +>>>>----+
          |        |                    |        |
          |        |                    |        |
          |        +--------------------+        |
          |                                      |
          |                                      |
          |                                      |
          |                                      |
+-----------------------------------------------------------+     
|         .                Faktory               .          |            
|         .                Client                .          |  
|         .                                      .          |          
|   +-----------------+               +-----------------+   |
|   |                 |               |                 |   |
|   |    Producer     |               |    Consumer     |   |
|   |     pushes      |               |    (Worker)     |   |
|   |      jobs       |               |     fetches     |   |
|   |                 |               |       jobs      |   |
|   |                 |               |                 |   |
|   +-----------------+               +-----------------+   |          
|                                                           |            
+-----------------------------------------------------------+            
  • Server - the Faktory daemon which stores background jobs in queues to be processed by Workers.
  • Client - an entity that communicates with the Faktory server using the FWP. A single client can act as both a consumer and a producer.
  • Consumer (or Worker) - a client that fetches work units (jobs) from the server for execution.
  • Producer - a client that issues new work units to the server.

This library tries to implement the FWP as well as possible. If you notice any inconsistencies, please report them.

Installation

pip install pyfaktory

Usage

Faktory Server

If you have a Faktory server already running, make sure you have the correct url.

# Default url for a Faktory server running locally
faktory_server_url = 'tcp://localhost:7419'

For the installation of faktory, please refer to the official documentation.

After installation, you can run it locally.

$ /usr/bin/faktory
Faktory 1.8.0

You can use a password for the Faktory server via the environment variable FAKTORY_PASSWORD. Note if this value starts with a /, then it is considered a pointer to a file on the filesystem with the password. By default /etc/faktory/password is used.

The format of the Faktory URL is as follows:

tcp://:password@localhost:7419

You can access the Fakotry GUI.

To run Faktory in production:

/usr/bin/faktory -e production

Faktory in production mode requires a password by default since version 0.7.0.

Faktory Client

Import pyfaktory.

from pyfaktory import Client, Consumer, Job, Producer

A single client can act as both a consumer and a producer. (You can enable IPv6 by setting enable_ipv6 to True)

client = Client(faktory_url='tcp://localhost:7419')
client.connect()

# Now you can use the client

# At the end, disconnect the client
client.disconnect()

Client is a context manager, so you can use with statement.

with Client(faktory_url='tcp://localhost:7419') as client:
    # Use client

Use role argument to say how you want to use the client. This argument has three possible values: 'consumer', 'producer' or 'both'.

# A client that acts as both a consumer and a producer.
client = Client(faktory_url='tcp://localhost:7419', role='both')

Producer

Use the client to push jobs.

Push job

with Client(faktory_url='tcp://localhost:7419', role='producer') as client:
    producer = Producer(client=client)
    job_1 = Job(jobtype='adder', args=(5, 4), queue='default')
    producer.push(job_1)

Push bulk jobs

You can push several jobs at once. There is no limit, but 1000 at a time is recommended as a best practice.

with Client(faktory_url='tcp://localhost:7419', role='producer') as client:
    producer = Producer(client=client)
    job_2 = Job(jobtype='adder', args=(50, 41))
    job_3 = Job(jobtype='adder', args=(15, 68))
    res = producer.push_bulk([job_2, job_3])

Consumer (Worker)

Use a worker to pull jobs from Faktory server and execute them.

def adder(x, y):
    logging.info(f"{x} + {y} = {x + y}")

with Client(faktory_url='tcp://localhost:7419', role='consumer') as client:
    consumer = Consumer(client=client, queues=['default'], concurrency=1)
    consumer.register('adder', adder)
    consumer.run()

Use priority to indicates in which queue order the jobs should be fetched first.

# With strict priority, there is a risk of starvation
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='strict')
# Each queue has an equal chance of being fetched first
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='uniform')
# Weights must be specified
consumer = Consumer(client=client, queues=['critical', 'default', 'bulk'], priority='weighted', weights=[0.6, 0.3, 0.1])

Capture exceptions using Sentry

To capture exceptions using Sentry before failling jobs set sentry_capture_exception argument to True.

consumer = Consumer(client=client, sentry_capture_exception=True)

Info

You can get various information about the server using info Client method.

with Client(faktory_url='tcp://localhost:7419') as client:
    server_info = client.info()
    print(server_info)

Mutate

A wrapper for the Mutate API to script certain repairs or migrations.

⚠️ MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.

from pyfaktory import Client, JobFilter, MutateOperation

client = Client(faktory_url='tcp://localhost:7419')
client.connect()

# Find all scheduled jobs with type `QuickbooksSyncJob` and discard them
op = MutateOperation(
    cmd='discard', 
    target='scheduled', 
    filter=JobFilter(jobtype="QuickbooksSyncJob")
)
client.mutate(op)

# Clear the Retry queue completely
op = MutateOperation(
    cmd='discard', 
    target='retries', 
    filter=JobFilter(regexp="*")
)
client.mutate(op)

# Clear the Retry queue completely
op = MutateOperation(
    cmd='discard', 
    target='retries'
)
client.mutate(op)

# Send a two specific JIDs in the Retry queue to the Dead queue
op = MutateOperation(
    cmd='kill', 
    target='retries', 
    filter=JobFilter(jobtype="QuickbooksSyncJob", jids=["123456789", "abcdefgh"])
)
client.mutate(op)

# Enqueue all retries with a first argument matching "bob"
op = MutateOperation(
    cmd='requeue', 
    target='retries', 
    filter=JobFilter(regexp="*\"args\":[\"bob\"*")
)
client.mutate(op)

client.disconnect()

Queues

Pausing

Queues may be paused (no job can be fetched from them while paused) or unpaused (resume fetching).

# Pause a list of queues
client.queue_pause(queues=['bulk', 'another_queue'])

# Pause all queues
client.queue_pause(all_queues=True)


# Unpause a list of queues
client.queue_unpause(queues=['bulk'])

# Unpause all queues
client.queue_unpause(all_queues=True)

Remove

Queues can be removed which deletes all jobs within them. It does not stop currently executing jobs from those queues.

# Remove a list of queues
client.queue_remove(queues=['bulk'])

# Remove all queues
client.queue_remove(all_queues=True)

Batch (untested)

Refer to documentation.

from pyfaktory import Batch, Client, Job, Producer, TargetJob

client = Client(faktory_url='tcp://localhost:7419')
client.connect()

producer = Producer(client=client)

batch = Batch(
    description="An optional description for the Web UI",
    success=TargetJob(jobtype="MySuccessCallback", args=[123], queue="critical"),
    complete=TargetJob(jobtype="MyCompleteCallback", args=['aa'], queue="critical")
)

# Create a new batch
resp = producer.batch_new(batch)

# Push as many jobs as necessary for the batch
# You may nest batches
# The initial batch data has a TTL of 30 minutes and will expire if batch is not commited
producer.push(Job(jobtype='SomeJob', args=(5, 4), custom={"bid": bid}))
producer.push(Job(jobtype='SomeOtherJob', args=(0, 15), custom={"bid": bid}))

# Commit the batch
producer.batch_commit(bid)

client.disconnect()

Use batch_open to open a created batch.

producer.batch_open(bid)

Use parent_bid for child batch.

child_batch = Batch(
    parent_bid=bid,
    description="An optional description for the Web UI",
    success=TargetJob(jobtype="MySuccessCallback", args=[123], queue="critical"),
    complete=TargetJob(jobtype="MyCompleteCallback", args=['aa'], queue="critical")
)

Custom command

If you want to use a Faktory command that is not yet implemented in this client library, you can send custom commands.

from pyfaktory import Client

my_command = 'INFO\r\n'

with Client(faktory_url='tcp://localhost:7419') as client:
    resp = client._send_and_receive(my_command)
    print(resp)

Outer multiprocessing

You can use outer multiprocessing.

def run_worker():
    try:
        with Client(faktory_url=faktory_server_url, role='consumer') as client:
            import multiprocessing
            custom_context = multiprocessing.get_context("spawn")
            consumer = Consumer(client=client, queues=['t_test'], concurrency=2, context=custom_context)
            consumer.register('task_001', task_001)
            consumer.run()
    except Exception as e:
        print(f"task running error: {str(e)}")

Example

Find examples in ./examples.

  • Start the Faktory server.
/usr/bin/faktory
  • Launch a producer.
python examples/fproducer.py
  • Launch a consumer.
python examples/fconsumer.py

Contribute

Issues

If you encounter a problem, please report it.

In addition to the description of your problem, report the server and client versions.

pyfaktory.__version__
/usr/bin/faktory -v

Reproduce your problem while increasing the level of debugging for both the server and the client, and report the logs.

import logging

logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
                    level=logging.DEBUG,
                    datefmt='%Y-%m-%d %H:%M:%S')
/usr/bin/faktory -l debug

PRs

Please, feel free to create pull requests.

License

See the LICENSE file for license rights and limitations (MIT).

About

Faktory Client Python (Producer and Consumer/Worker)

Resources

License

Stars

Watchers

Forks

Packages

No packages published