Back to Skills

multi-agent-orchestration

verified

Multi-agent coordination and synthesis patterns. Use when orchestrating multiple specialized agents, implementing fan-out/fan-in workflows, or synthesizing outputs from parallel agents.

View on GitHub

Marketplace

orchestkit

yonatangross/orchestkit

Plugin

ork

development

Repository

yonatangross/orchestkit
33stars

plugins/ork/skills/multi-agent-orchestration/SKILL.md

Last Verified

January 25, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/plugins/ork/skills/multi-agent-orchestration/SKILL.md -a claude-code --skill multi-agent-orchestration

Installation paths:

Claude
.claude/skills/multi-agent-orchestration/
Powered by add-skill CLI

Instructions

# Multi-Agent Orchestration

Coordinate multiple specialized agents for complex tasks.

## Fan-Out/Fan-In Pattern

```python
async def multi_agent_analysis(content: str) -> dict:
    """Fan-out to specialists, fan-in to synthesize."""
    agents = [
        ("security", security_agent),
        ("performance", performance_agent),
        ("code_quality", quality_agent),
        ("architecture", architecture_agent),
    ]

    # Fan-out: Run all agents in parallel
    tasks = [agent(content) for _, agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter successful results
    findings = [
        {"agent": name, "result": result}
        for (name, _), result in zip(agents, results)
        if not isinstance(result, Exception)
    ]

    # Fan-in: Synthesize findings
    return await synthesize_findings(findings)
```

## Supervisor Pattern

```python
class Supervisor:
    """Central coordinator that routes to specialists."""

    def __init__(self, agents: dict):
        self.agents = agents  # {"security": agent, "performance": agent}
        self.completed = []

    async def run(self, task: str) -> dict:
        """Route task through appropriate agents."""
        # 1. Determine which agents to use
        plan = await self.plan_routing(task)

        # 2. Execute in dependency order
        results = {}
        for agent_name in plan.execution_order:
            if plan.can_parallelize(agent_name):
                # Run parallel batch
                batch = plan.get_parallel_batch(agent_name)
                batch_results = await asyncio.gather(*[
                    self.agents[name](task, context=results)
                    for name in batch
                ])
                results.update(dict(zip(batch, batch_results)))
            else:
                # Run sequential
                results[agent_name] = await self.agents[agent_name](
                    task, context=results
                )

        return re

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
5779 chars