Back to Skills

celery-advanced

verified

Advanced Celery patterns including canvas workflows, priority queues, rate limiting, multi-queue routing, and production monitoring. Use when implementing complex task orchestration, task prioritization, or enterprise-grade background processing.

View on GitHub

Marketplace

orchestkit

yonatangross/orchestkit

Plugin

orchestkit-complete

development

Repository

yonatangross/orchestkit
33stars

./skills/celery-advanced/SKILL.md

Last Verified

January 24, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/./skills/celery-advanced/SKILL.md -a claude-code --skill celery-advanced

Installation paths:

Claude
.claude/skills/celery-advanced/
Powered by add-skill CLI

Instructions

# Advanced Celery Patterns

Enterprise-grade task orchestration beyond basic background jobs.

## Overview

- Complex multi-step task workflows (ETL pipelines, order processing)
- Priority-based task processing (premium vs standard users)
- Rate-limited external API calls (API quotas, throttling)
- Multi-queue routing (dedicated workers per task type)
- Production monitoring and observability
- Task result aggregation and fan-out patterns

## Canvas Workflows

### Signatures (Task Invocation)

```python
from celery import signature, chain, group, chord

# Create a reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})

# Immutable signature (won't receive results from previous task)
sig = process_order.si(order_id)

# Partial signature (curry arguments)
partial_sig = send_email.s(subject="Order Update")
# Later: partial_sig.delay(to="user@example.com", body="...")
```

### Chains (Sequential Execution)

```python
from celery import chain

# Tasks execute sequentially, passing results
workflow = chain(
    extract_data.s(source_id),      # Returns raw_data
    transform_data.s(),              # Receives raw_data, returns clean_data
    load_data.s(destination_id),     # Receives clean_data
)
result = workflow.apply_async()

# Access intermediate results
chain_result = result.get()  # Final result
parent_result = result.parent.get()  # Previous task result

# Error handling in chains
@celery_app.task(bind=True)
def transform_data(self, raw_data):
    try:
        return do_transform(raw_data)
    except TransformError as exc:
        # Chain stops here, no subsequent tasks run
        raise self.retry(exc=exc, countdown=60)
```

### Groups (Parallel Execution)

```python
from celery import group

# Execute tasks in parallel
parallel = group(
    process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()

# Wait for all to complete
results = group_result.get()  # List of results

# Check comp

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
12153 chars