Back to Skills

langgraph-functional

verified

LangGraph Functional API with @entrypoint and @task decorators. Use when building workflows with the modern LangGraph pattern, enabling parallel execution, persistence, and human-in-the-loop.

View on GitHub

Marketplace

orchestkit

yonatangross/orchestkit

Plugin

ork

development

Repository

yonatangross/orchestkit
33stars

plugins/ork/skills/langgraph-functional/SKILL.md

Last Verified

January 25, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/plugins/ork/skills/langgraph-functional/SKILL.md -a claude-code --skill langgraph-functional

Installation paths:

Claude
.claude/skills/langgraph-functional/
Powered by add-skill CLI

Instructions

# LangGraph Functional API
Build workflows using decorators instead of explicit graph construction.

## Overview

- Sequential workflows with conditional branching
- Orchestrator-worker patterns with parallel execution
- Workflows needing persistence and checkpointing
- Human-in-the-loop approval flows
- Simpler alternative to explicit StateGraph construction

## Core Concepts

### Graph API vs Functional API
```
Graph API (explicit):           Functional API (implicit):
StateGraph → add_node →        @task functions +
add_edge → compile              @entrypoint orchestration
```

**When to Use Functional API**:
- Sequential workflows with conditional logic
- Orchestrator-worker patterns
- Simpler debugging (regular Python functions)
- Parallel task execution

## Quick Start

### Basic Pattern
```python
from langgraph.func import entrypoint, task

@task
def step_one(data: str) -> str:
    """Task returns a future - call .result() to block"""
    return process(data)

@task
def step_two(result: str) -> str:
    return transform(result)

@entrypoint()
def my_workflow(input_data: str) -> str:
    # Tasks return futures - enables parallel execution
    result1 = step_one(input_data).result()
    result2 = step_two(result1).result()
    return result2

# Invoke
output = my_workflow.invoke("hello")
```

### Key Rules
1. **@task** functions return futures - call `.result()` to get value
2. **@entrypoint** is the workflow entry point - orchestrates tasks
3. Tasks inside entrypoint are tracked for persistence/streaming
4. Regular functions (no decorator) execute normally

## Parallel Execution

### Fan-Out Pattern
```python
@task
def fetch_source_a(query: str) -> dict:
    return api_a.search(query)

@task
def fetch_source_b(query: str) -> dict:
    return api_b.search(query)

@task
def merge_results(results: list[dict]) -> dict:
    return {"combined": results}

@entrypoint()
def parallel_search(query: str) -> dict:
    # Launch in parallel - futures start immediately
    f

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
9746 chars