Retrieval Augmented Generation systems with vector search, document processing, and hybrid retrieval.
View on GitHubpluginagentmarketplace/custom-plugin-ai-engineer
ai-engineer-plugin
January 20, 2026
Select agents to install to:
npx add-skill https://github.com/pluginagentmarketplace/custom-plugin-ai-engineer/blob/main/skills/rag-systems/SKILL.md -a claude-code --skill rag-systemsInstallation paths:
.claude/skills/rag-systems/# RAG Systems
Build production-grade Retrieval Augmented Generation pipelines.
## Quick Start
### Simple RAG with LangChain
```python
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
# 1. Load documents
loader = PyPDFLoader("document.pdf")
documents = loader.load()
# 2. Split into chunks
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = splitter.split_documents(documents)
# 3. Create embeddings and store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(chunks, embeddings)
# 4. Create RAG chain
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(),
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)
# 5. Query
answer = qa_chain.run("What is the main topic?")
```
## Core Components
### Document Processing Pipeline
```python
from typing import List
import hashlib
class DocumentProcessor:
def __init__(self, chunk_size=1000, overlap=200):
self.chunk_size = chunk_size
self.overlap = overlap
def process(self, documents: List[str]) -> List[dict]:
processed = []
for doc in documents:
# Clean text
cleaned = self._clean_text(doc)
# Split into chunks
chunks = self._chunk_text(cleaned)
# Add metadata
for i, chunk in enumerate(chunks):
processed.append({
'id': self._generate_id(chunk),
'text': chunk,
'chunk_index': i,
'total_chunks': len(chunks)
})
return processed
def _chunk_text(self, text: str) -> List[str]:
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_siz