Advanced Celery patterns including canvas workflows, priority queues, rate limiting, multi-queue routing, and production monitoring. Use when implementing complex task orchestration, task prioritization, or enterprise-grade background processing.
View on GitHubyonatangross/orchestkit
ork-async
January 25, 2026
Select agents to install to:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/plugins/ork-async/skills/celery-advanced/SKILL.md -a claude-code --skill celery-advancedInstallation paths:
.claude/skills/celery-advanced/# Advanced Celery Patterns
Enterprise-grade task orchestration beyond basic background jobs.
## Overview
- Complex multi-step task workflows (ETL pipelines, order processing)
- Priority-based task processing (premium vs standard users)
- Rate-limited external API calls (API quotas, throttling)
- Multi-queue routing (dedicated workers per task type)
- Production monitoring and observability
- Task result aggregation and fan-out patterns
## Canvas Workflows
### Signatures (Task Invocation)
```python
from celery import signature, chain, group, chord
# Create a reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})
# Immutable signature (won't receive results from previous task)
sig = process_order.si(order_id)
# Partial signature (curry arguments)
partial_sig = send_email.s(subject="Order Update")
# Later: partial_sig.delay(to="user@example.com", body="...")
```
### Chains (Sequential Execution)
```python
from celery import chain
# Tasks execute sequentially, passing results
workflow = chain(
extract_data.s(source_id), # Returns raw_data
transform_data.s(), # Receives raw_data, returns clean_data
load_data.s(destination_id), # Receives clean_data
)
result = workflow.apply_async()
# Access intermediate results
chain_result = result.get() # Final result
parent_result = result.parent.get() # Previous task result
# Error handling in chains
@celery_app.task(bind=True)
def transform_data(self, raw_data):
try:
return do_transform(raw_data)
except TransformError as exc:
# Chain stops here, no subsequent tasks run
raise self.retry(exc=exc, countdown=60)
```
### Groups (Parallel Execution)
```python
from celery import group
# Execute tasks in parallel
parallel = group(
process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()
# Wait for all to complete
results = group_result.get() # List of results
# Check comp