python / expert
Snippet
Django Celery Task Chaining with Result Expiry and Retry Policies
Celery task chains orchestrate multi-step workflows where each task receives the previous task's result. The bind=True parameter gives tasks access to self.retry for exponential backoff. Chain results expire after 7200 seconds (2 hours) preventing memory bloat while link_error captures exceptions globally. Group dispatches parallel workflow execution for bulk operations maximizing throughput.
snippet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from celery import shared_task, chain, group, chordfrom celery.exceptions import MaxRetriesExceededErrorfrom celery.result import AsyncResultfrom django.db import transactionimport logginglogger = logging.getLogger(__name__)@shared_task(bind=True, max_retries=3, default_retry_delay=60)def process_article(self, article_id):try:article = Article.objects.get(id=article_id)article.status = 'processing'article.save()return {'article_id': article_id, 'status': 'processed'}except Article.DoesNotExist:raise self.retry(exc=Exception(f'Article {article_id} not found'), countdown=120)@shared_task(bind=True, max_retries=5)def generate_thumbnail(self, article_data):if not article_data.get('has_image'):return {'skipped': True, 'reason': 'no_image'}# Simulate thumbnail generationreturn {'article_id': article_data['article_id'], 'thumbnail': 'generated'}@shared_taskdef notify_subscribers(article_data):logger.info(f'Notifying subscribers for article {article_data["article_id"]}')return {'notified': True}def article_publishing_workflow(article_id):workflow = chain(process_article.s(article_id),generate_thumbnail.s(),notify_subscribers.s())result = workflow.apply_async(expires=7200, link_error=notify_failure.s())return result.iddef bulk_publish_articles(article_ids):job = group(article_publishing_workflow.si(aid) for aid in article_ids)return job.apply_async()@shared_taskdef notify_failure(request, exc, traceback):logger.error(f'Task {request.id} failed: {exc}')# Send alert to monitoring systemreturn {'task_id': request.id, 'error': str(exc)}
django
Breakdown
1
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
bind=True enables self.retry, max_retries=3 with 60s delay between attempts
2
chain(process_article.s(article_id), generate_thumbnail.s(), notify_subscribers.s())
Sequential execution: each task waits for previous result
3
result = workflow.apply_async(expires=7200)
Result expires after 2 hours to prevent memory leaks
4
job = group(article_publishing_workflow.si(aid) for aid in article_ids)
Parallel execution of workflows, si=immutable signature prevents result passing
5
link_error=notify_failure.s()
Error callback executes on any task failure in chain