LangGraph supervisor-worker pattern. Use when building central coordinator agents that route to specialized workers, implementing round-robin or priority-based agent dispatch.
View on GitHubyonatangross/skillforge-claude-plugin
ork-langgraph-core
plugins/ork-langgraph-core/skills/langgraph-supervisor/SKILL.md
January 25, 2026
Select agents to install to:
npx add-skill https://github.com/yonatangross/skillforge-claude-plugin/blob/main/plugins/ork-langgraph-core/skills/langgraph-supervisor/SKILL.md -a claude-code --skill langgraph-supervisorInstallation paths:
.claude/skills/langgraph-supervisor/# LangGraph Supervisor Pattern
Coordinate multiple specialized agents with a central supervisor.
## Basic Supervisor
```python
from langgraph.graph import StateGraph, END
def supervisor(state: WorkflowState) -> WorkflowState:
"""Route to next worker based on state."""
if state["needs_analysis"]:
state["next"] = "analyzer"
elif state["needs_validation"]:
state["next"] = "validator"
else:
state["next"] = END
return state
def analyzer(state: WorkflowState) -> WorkflowState:
"""Specialized analysis worker."""
result = analyze(state["input"])
state["results"].append(result)
return state
# Build graph
workflow = StateGraph(WorkflowState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("analyzer", analyzer)
workflow.add_node("validator", validator)
# Supervisor routes dynamically
workflow.add_conditional_edges(
"supervisor",
lambda s: s["next"],
{
"analyzer": "analyzer",
"validator": "validator",
END: END
}
)
# Workers return to supervisor
workflow.add_edge("analyzer", "supervisor")
workflow.add_edge("validator", "supervisor")
workflow.set_entry_point("supervisor")
app = workflow.compile()
```
## Round-Robin Supervisor
```python
ALL_AGENTS = ["security", "tech", "implementation", "tutorial"]
def supervisor_node(state: AnalysisState) -> AnalysisState:
"""Route to next available agent."""
completed = set(state["agents_completed"])
available = [a for a in ALL_AGENTS if a not in completed]
if not available:
state["next"] = "quality_gate"
else:
state["next"] = available[0]
return state
# Register all agent nodes
for agent_name in ALL_AGENTS:
workflow.add_node(agent_name, create_agent_node(agent_name))
workflow.add_edge(agent_name, "supervisor")
```
## Priority-Based Routing
```python
AGENT_PRIORITIES = {
"security": 1, # Run first
"tech": 2,
"implementation": 3,
"tutorial": 4 # R