Kafka Connect integration expert. Covers source and sink connectors, JDBC, Elasticsearch, S3, Debezium CDC, SMT (Single Message Transforms), connector configuration, and data pipeline patterns. Activates for kafka connect, connectors, source connector, sink connector, jdbc connector, debezium, smt, data pipeline, cdc.
View on GitHubanton-abyzov/specweave
sw-confluent
January 25, 2026
Select agents to install to:
npx add-skill https://github.com/anton-abyzov/specweave/blob/main/plugins/specweave-confluent/skills/confluent-kafka-connect/SKILL.md -a claude-code --skill confluent-kafka-connectInstallation paths:
.claude/skills/confluent-kafka-connect/# Confluent Kafka Connect Skill
Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.
## What I Know
### Connector Types
**Source Connectors** (External System → Kafka):
- JDBC Source: Databases → Kafka
- Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
- S3 Source: AWS S3 files → Kafka
- File Source: Local files → Kafka
**Sink Connectors** (Kafka → External System):
- JDBC Sink: Kafka → Databases
- Elasticsearch Sink: Kafka → Elasticsearch
- S3 Sink: Kafka → AWS S3
- HDFS Sink: Kafka → Hadoop HDFS
**Single Message Transforms (SMTs)**:
- Field operations: Insert, Mask, Replace, TimestampConverter
- Routing: RegexRouter, TimestampRouter
- Filtering: Filter, Predicates
## When to Use This Skill
Activate me when you need help with:
- Connector setup ("Configure JDBC connector")
- CDC patterns ("Debezium MySQL CDC")
- Data pipelines ("Stream database changes to Kafka")
- SMT transforms ("Mask sensitive fields")
- Connector troubleshooting ("Connector task failed")
## Common Patterns
### Pattern 1: JDBC Source (Database → Kafka)
**Use Case**: Stream database table changes to Kafka
**Configuration**:
```json
{
"name": "jdbc-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-",
"table.whitelist": "users,orders",
"poll.interval.ms": "5000"
}
}
```
**Modes**:
- `incrementing`: Track by auto-increment ID
- `timestamp`: Track by timestamp column
- `timestamp+incrementing`: Both (most reliable)
### Pattern 2: Debezium CDC (MySQL → Kafka)
**Use Case**: Capture all database changes (INSERT/UPDATE/DELETE)
**Configuration**:
```json
{
"name": "debezium-mysql-cdc",
"config": {
"connector.class": "io.debezi