
Celery is the de-facto framework for running distributed tasks written in Python. Celery has strong support for Django and is well-documented for integration with other frameworks like Flask and FastAPI.
Concurrency Management
Celery uses prefork as it’s default concurrency manager. But using prefork , it has a significant downside for some applications. CPU-intensive tasks are more suitable with prefork . It uses python’s multiprocessing library under the hood. So it spawns processes and can be maxed up to the number of CPU cores you have.
But if your tasks are more I/O bounded then using Gevent or Greenlet would be more useful. These libraries use the event IO loop to achieve the same. For starting a worker with 20 concurrencies which consume from only queue1 and queue2, you can use -
celery -A app W -l INFO -P gevent/eventlet -c 20 -Q queue1,queue2
So one question is what is the number of concurrent tasks you should be running? There is no simple answer to that. If your tasks are CPU intensive then using prefork is a good choice. So the optimal choice would be the number of cores you have in the machine.
Broker Selection
Celery has 3 stable broker integrations namely RabbitMQ, Redis and Amazon SQS out of which only two have support for monitoring and remote control (Using celery inspect and control commands).
If you have a large message payload then it is preferable to use RabbitMQ as a broker. Both RabbitMQ and Redis work well with celery.
Result Backend
You can use RPC as your result backend then celery will store the result on RabbitMQ. It is good for local development only. For real-life applications, you should either any RDBMS of your choice or Redis as a backend.
If your application is running on Django and you want to use the Django backend as the result backend then you can use django-celery-results for your purpose.
For storing results you must have a pre-defined archival policy or deletion policy. Cause if you are running a large number of tasks in your system every day and storing all those results in your DB then you should define a good eviction policy for those results stored.
Parameter Tuning
Using Acks Late
The default behaviour for acks_late is True so that the consumer will acknowledge the message after processing it. It is useful for short-lived tasks. Suppose during the processing of the task the broker connection gets lost or the workers abruptly crashed then the message will stay in the defined queue until the next worker picks it up.
Now if your application doesn’t handle sensitive data then using acks_late=False would be helpful.
If you have a long-running job then it’s better to save internal states and using acks_late=False so that the next time the worker doesn’t pick up the same job from the beginning. But then it is required to define your own resume_job(task_id) function.
It is always helpful to divide your tasks into smaller chunks.
Retrying failed tasks
Sometimes you want to define a retry policy with different parameters. You can do it for each task or define your own retry policy project-wise.
Let’s look at how to use it on a task basis. Suppose you want to retry a specific task for TypeError / ValueError with max retries 3 and with a 15-minute gap for each retry.
@shared_task(
bind=True,
name='task_with_retry_logic',
# For any error you can pass Exception
auto_retry_for = (TypeError, ValueError, )
retry_kwargs={'max_retries': 3, 'countdown': 15 * 60}
)
def task_with_retry_logic(self, *args, **kwargs):
# process some logic
...
Suppose now you want this definition project-wise. So first define your retry policy then write a retry decorator as follows.
from functools import wraps
RETRY_POLICY = {
TypeError.__name__: {
"max_retries": 3,
"countdown": 15 * 60,
"queue": "errors_exe_cls_1"
},
ValueError.__name__: {
"max_retries": 3,
"countdown": 15 * 60,
"queue": "errors_exe_cls_2"
}
}
def retry(task):
@wraps(task)
def wrapper(self, *args, **kwargs):
try:
return task(self, *args, **kwargs)
except Exception as exc:
policy = RETRY_POLICY.get(exc.__class__.__name__, None)
if policy:
kwds = {"args": args, "kwargs": kwargs, **policy}
self.retry(**kwds)
else:
raise exc
return wrapper
Now define the same tasks with the retry decorator.
@shared_task(
bind=True,
name='task_with_retry_logic'
)
@retry
def task_with_retry_logic(self, *args, **kwargs):
# process some logic
...
Writing your own custom Task class
Here is an example of how to use a custom Task class as a base task and use the same property in different tasks.
from celery import Task
from django.db import connection
class CustomTask(Task):
# manage your requirements here
# you can define retry parameters here
# define some properties here use it task wise
@property
def conn(self):
if self._conn is None:
self._conn = connection
return self._conn
@shared_task(bind=True, base=CustomTask)
def exec_query(self):
with self.conn.cursor() as cursor:
cursor.execute("some query...)
Managing Custom states
Celery has some built-in custom states defined in celery.task.states but you can define your own custom states. But the naming should be entirely different from celery-defined names. Here IN_PROGRESS a custom state with updates the meta for every iteration.
@shared_task(bind=True)
def process_tweets(self, tweets):
for idx, tweet in enumerate(tweets):
process_tweet(tweet)
self.update_state(state='IN_PROGRESS',
meta={'processing': f'{idx / len(tweets):.0%}'})
Tracking Multi-level tasks with celery Signals
Now if you have multi-level nested tasks and you want to track the progress of the root task based on the completion of the child task, in that case, you can use celery signals. There are several signals provided by celery but we need only two of those task_prerun and task_postrun . Here is the code snippet overview of how to implement it.
client = redis.Client("your connection string")
@task_prerun.connect
def handle_state_before_execution(**kwargs):
try:
task_id = kwargs["task_id"]
task = kwargs["task"]
parent_id = task.request.parent_id
root_id = task.request.root_id
# only set for root task
if task_id == parent_id == root_id:
payload = {
'failure_child_ids': [],
'task_status': None,
'child_task_ids': [],
}
client.set(root_id, payload)
except Exception as exc:
logger.execption(exc)
@task_postrun.connect
def handle_state_after_execution(**kwargs):
try:
task_id = kwargs["task_id"]
task = kwargs["task"]
root_id = task.request.root_id
payload = client.get('root_id')
# implement your custom logic for updating status
upadate_status(payload, task)
client.set(root_id, payload)
except Exception as exc:
logger.exeception(exc)There are lots of things you can tune according to your application needs, I discussed some of them above.
