Back to Skills

temporal-io

verified

Temporal.io workflow orchestration for durable, fault-tolerant distributed applications. Use when implementing long-running workflows, saga patterns, microservice orchestration, or systems requiring exactly-once execution guarantees.

View on GitHub

Marketplace

orchestkit

yonatangross/orchestkit

Plugin

orchestkit-complete

development

Repository

yonatangross/orchestkit
33stars

./skills/temporal-io/SKILL.md

Last Verified

January 24, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/./skills/temporal-io/SKILL.md -a claude-code --skill temporal-io

Installation paths:

Claude
.claude/skills/temporal-io/
Powered by add-skill CLI

Instructions

# Temporal.io Workflow Orchestration

Durable execution engine for reliable distributed applications.

## Overview

- Long-running business processes (days/weeks/months)
- Saga patterns requiring compensation/rollback
- Microservice orchestration with retries
- Systems requiring exactly-once execution guarantees
- Complex state machines with human-in-the-loop
- Scheduled and recurring workflows

## Workflow Definition

```python
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self._status = "pending"
        self._order_id: str | None = None

    @workflow.run
    async def run(self, order_data: OrderInput) -> OrderResult:
        self._order_id = await workflow.execute_activity(
            create_order, order_data,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=1)),
        )
        self._status = "processing"

        # Parallel activities
        payment, inventory = await asyncio.gather(
            workflow.execute_activity(process_payment, PaymentInput(order_id=self._order_id), start_to_close_timeout=timedelta(minutes=5)),
            workflow.execute_activity(reserve_inventory, InventoryInput(order_id=self._order_id), start_to_close_timeout=timedelta(minutes=2)),
        )

        self._status = "completed"
        return OrderResult(order_id=self._order_id, payment_id=payment.id)

    @workflow.query
    def get_status(self) -> str:
        return self._status

    @workflow.signal
    async def cancel_order(self, reason: str):
        self._status = "cancelling"
        await workflow.execute_activity(cancel_order_activity, CancelInput(order_id=self._order_id), start_to_close_timeout=timedelta(seconds=30))
        self._status = "cancelled"
```

## Activity Definition

```python
from temporalio import activity
from temporalio.exc

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
6795 chars