python / expert
Snippet
Django Celery Task Chaining mit Result Expiry und Retry Policies
Celery Task Chains orchestrieren Multi-Step Workflows, wobei jede Task das Ergebnis der vorherigen Task erhält. Der bind=True Parameter gibt Tasks Zugriff auf self.retry für exponentielles Backoff. Chain-Ergebnisse laufen nach 7200 Sekunden (2 Stunden) ab und verhindern Memory Bloat, während link_error Exceptions global erfasst. Group dispatcht parallele Workflow-Ausführung für Bulk-Operationen und maximiert Durchsatz.
snippet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from celery import shared_task, chain, group, chordfrom celery.exceptions import MaxRetriesExceededErrorfrom celery.result import AsyncResultfrom django.db import transactionimport logginglogger = logging.getLogger(__name__)@shared_task(bind=True, max_retries=3, default_retry_delay=60)def process_article(self, article_id):try:article = Article.objects.get(id=article_id)article.status = 'processing'article.save()return {'article_id': article_id, 'status': 'processed'}except Article.DoesNotExist:raise self.retry(exc=Exception(f'Article {article_id} not found'), countdown=120)@shared_task(bind=True, max_retries=5)def generate_thumbnail(self, article_data):if not article_data.get('has_image'):return {'skipped': True, 'reason': 'no_image'}# Simulate thumbnail generationreturn {'article_id': article_data['article_id'], 'thumbnail': 'generated'}@shared_taskdef notify_subscribers(article_data):logger.info(f'Notifying subscribers for article {article_data["article_id"]}')return {'notified': True}def article_publishing_workflow(article_id):workflow = chain(process_article.s(article_id),generate_thumbnail.s(),notify_subscribers.s())result = workflow.apply_async(expires=7200, link_error=notify_failure.s())return result.iddef bulk_publish_articles(article_ids):job = group(article_publishing_workflow.si(aid) for aid in article_ids)return job.apply_async()@shared_taskdef notify_failure(request, exc, traceback):logger.error(f'Task {request.id} failed: {exc}')# Send alert to monitoring systemreturn {'task_id': request.id, 'error': str(exc)}
django
Erklärung
1
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
bind=True aktiviert self.retry, max_retries=3 mit 60s Verzögerung zwischen Versuchen
2
chain(process_article.s(article_id), generate_thumbnail.s(), notify_subscribers.s())
Sequentielle Ausführung: jede Task wartet auf vorheriges Ergebnis
3
result = workflow.apply_async(expires=7200)
Ergebnis läuft nach 2 Stunden ab um Memory Leaks zu verhindern
4
job = group(article_publishing_workflow.si(aid) for aid in article_ids)
Parallele Ausführung von Workflows, si=immutable Signature verhindert Ergebnisweiterleitung
5
link_error=notify_failure.s()
Error Callback führt bei jeder Task-Fehler in der Kette aus