Back to Skills

langgraph-parallel

verified

LangGraph parallel execution patterns. Use when implementing fan-out/fan-in workflows, map-reduce over tasks, or running independent agents concurrently.

View on GitHub

Marketplace

orchestkit

yonatangross/skillforge-claude-plugin

Plugin

ork

development

Repository

yonatangross/skillforge-claude-plugin
33stars

plugins/ork/skills/langgraph-parallel/SKILL.md

Last Verified

January 25, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/skillforge-claude-plugin/blob/main/plugins/ork/skills/langgraph-parallel/SKILL.md -a claude-code --skill langgraph-parallel

Installation paths:

Claude
.claude/skills/langgraph-parallel/
Powered by add-skill CLI

Instructions

# LangGraph Parallel Execution

Run independent nodes concurrently for performance.

## Fan-Out/Fan-In Pattern

```python
from langgraph.graph import StateGraph

def fan_out(state):
    """Split work into parallel tasks."""
    state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
    return state

def worker(state):
    """Process one task."""
    task = state["current_task"]
    result = process(task)
    return {"results": [result]}

def fan_in(state):
    """Combine parallel results."""
    combined = aggregate(state["results"])
    return {"final": combined}

workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)

workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in")  # Waits for all workers
```

## Using Send API

```python
from langgraph.constants import Send

def router(state):
    """Route to multiple workers in parallel."""
    return [
        Send("worker", {"task": task})
        for task in state["tasks"]
    ]

workflow.add_conditional_edges("router", router)
```

## Parallel Agent Analysis

```python
from typing import Annotated
from operator import add

class AnalysisState(TypedDict):
    content: str
    findings: Annotated[list[dict], add]  # Accumulates

async def run_parallel_agents(state: AnalysisState):
    """Run multiple agents in parallel."""
    agents = [security_agent, tech_agent, quality_agent]

    # Run all concurrently
    tasks = [agent.analyze(state["content"]) for agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter successful results
    findings = [r for r in results if not isinstance(r, Exception)]

    return {"findings": findings}
```

## Map-Reduce Pattern

```python
def map_node(state):
    """Map: Process each item independently."""
    items = state["items"]
    results = []

    for item in items:
        result = process_item(item)
        results.append(result)

    return

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
4160 chars