Back to Skills

distributed-locks

verified

Distributed locking patterns with Redis and PostgreSQL for coordination across instances. Use when implementing exclusive access, preventing race conditions, or coordinating distributed resources.

View on GitHub

Marketplace

orchestkit

yonatangross/orchestkit

Plugin

ork

development

Repository

yonatangross/orchestkit
33stars

plugins/ork/skills/distributed-locks/SKILL.md

Last Verified

January 25, 2026

Install Skill

Select agents to install to:

Scope:
npx add-skill https://github.com/yonatangross/orchestkit/blob/main/plugins/ork/skills/distributed-locks/SKILL.md -a claude-code --skill distributed-locks

Installation paths:

Claude
.claude/skills/distributed-locks/
Powered by add-skill CLI

Instructions

# Distributed Locks

Coordinate exclusive access to resources across multiple service instances.

## Overview

- Preventing duplicate processing of jobs/events
- Coordinating singleton processes (cron, leaders)
- Protecting critical sections across instances
- Implementing leader election
- Rate limiting at distributed level

## Lock Types Comparison

| Lock Type | Durability | Latency | Use Case |
|-----------|-----------|---------|----------|
| Redis (single) | Low | ~1ms | Fast, non-critical |
| Redlock (multi) | High | ~5ms | Critical, HA required |
| PostgreSQL advisory | High | ~2ms | Already using PG, ACID |

## Quick Reference

### Redis Lock (Single Node)

```python
from uuid_utils import uuid7
import redis.asyncio as redis

class RedisLock:
    """Redis lock with Lua scripts for atomicity."""

    ACQUIRE = "if redis.call('set',KEYS[1],ARGV[1],'NX','PX',ARGV[2]) then return 1 end return 0"
    RELEASE = "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) end return 0"

    def __init__(self, client: redis.Redis, name: str, ttl_ms: int = 30000):
        self._client = client
        self._name = f"lock:{name}"
        self._owner = str(uuid7())
        self._ttl = ttl_ms

    async def acquire(self) -> bool:
        return await self._client.eval(self.ACQUIRE, 1, self._name, self._owner, self._ttl) == 1

    async def release(self) -> bool:
        return await self._client.eval(self.RELEASE, 1, self._name, self._owner) == 1

    async def __aenter__(self):
        if not await self.acquire():
            raise LockError(f"Failed to acquire {self._name}")
        return self

    async def __aexit__(self, *_):
        await self.release()
```

See [redis-locks.md](references/redis-locks.md) for complete implementation with retry/extend.

### PostgreSQL Advisory Lock

```python
from sqlalchemy import text

async def with_advisory_lock(session, lock_id: int):
    """PostgreSQL advisory lock (session-level)."""
    await session.execut

Validation Details

Front Matter
Required Fields
Valid Name Format
Valid Description
Has Sections
Allowed Tools
Instruction Length:
3845 chars