Claude Agent Skill · by Wshobson

Embedding Strategies

Choose the right embedding model and chunking strategy for your vector search setup. Covers Voyage AI models (Anthropic's recommendation for Claude apps), OpenA

Install
Terminal · npx
$npx skills add https://github.com/wshobson/agents --skill embedding-strategies
Works with Paperclip

How Embedding Strategies fits into a Paperclip company.

Embedding Strategies drops into any Paperclip agent that handles this kind of work. Assign it to a specialist inside a pre-configured PaperclipOrg company and the skill becomes available on every heartbeat — no prompt engineering, no tool wiring.

S
SaaS FactoryPaired

Pre-configured AI company — 18 agents, 18 skills, one-time purchase.

$27$59
Explore pack
Source file
SKILL.md600 lines
Expand
---name: embedding-strategiesdescription: Select and optimize embedding models for semantic search and RAG applications. Use when choosing embedding models, implementing chunking strategies, or optimizing embedding quality for specific domains.--- # Embedding Strategies Guide to selecting and optimizing embedding models for vector search applications. ## When to Use This Skill - Choosing embedding models for RAG- Optimizing chunking strategies- Fine-tuning embeddings for domains- Comparing embedding model performance- Reducing embedding dimensions- Handling multilingual content ## Core Concepts ### 1. Embedding Model Comparison (2026) | Model                      | Dimensions | Max Tokens | Best For                            || -------------------------- | ---------- | ---------- | ----------------------------------- || **voyage-3-large**         | 1024       | 32000      | Claude apps (Anthropic recommended) || **voyage-3**               | 1024       | 32000      | Claude apps, cost-effective         || **voyage-code-3**          | 1024       | 32000      | Code search                         || **voyage-finance-2**       | 1024       | 32000      | Financial documents                 || **voyage-law-2**           | 1024       | 32000      | Legal documents                     || **text-embedding-3-large** | 3072       | 8191       | OpenAI apps, high accuracy          || **text-embedding-3-small** | 1536       | 8191       | OpenAI apps, cost-effective         || **bge-large-en-v1.5**      | 1024       | 512        | Open source, local deployment       || **all-MiniLM-L6-v2**       | 384        | 256        | Fast, lightweight                   || **multilingual-e5-large**  | 1024       | 512        | Multi-language                      | ### 2. Embedding Pipeline ```Document → Chunking → Preprocessing → Embedding Model → Vector        [Overlap, Size]  [Clean, Normalize]  [API/Local]``` ## Templates ### Template 1: Voyage AI Embeddings (Recommended for Claude) ```pythonfrom langchain_voyageai import VoyageAIEmbeddingsfrom typing import Listimport os # Initialize Voyage AI embeddings (recommended by Anthropic for Claude)embeddings = VoyageAIEmbeddings(    model="voyage-3-large",    voyage_api_key=os.environ.get("VOYAGE_API_KEY")) def get_embeddings(texts: List[str]) -> List[List[float]]:    """Get embeddings from Voyage AI."""    return embeddings.embed_documents(texts) def get_query_embedding(query: str) -> List[float]:    """Get single query embedding."""    return embeddings.embed_query(query) # Specialized models for domainscode_embeddings = VoyageAIEmbeddings(model="voyage-code-3")finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2")legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")``` ### Template 2: OpenAI Embeddings ```pythonfrom openai import OpenAIfrom typing import Listimport numpy as np client = OpenAI() def get_embeddings(    texts: List[str],    model: str = "text-embedding-3-small",    dimensions: int = None) -> List[List[float]]:    """Get embeddings from OpenAI with optional dimension reduction."""    # Handle batching for large lists    batch_size = 100    all_embeddings = []     for i in range(0, len(texts), batch_size):        batch = texts[i:i + batch_size]         kwargs = {"input": batch, "model": model}        if dimensions:            # Matryoshka dimensionality reduction            kwargs["dimensions"] = dimensions         response = client.embeddings.create(**kwargs)        embeddings = [item.embedding for item in response.data]        all_embeddings.extend(embeddings)     return all_embeddings  def get_embedding(text: str, **kwargs) -> List[float]:    """Get single embedding."""    return get_embeddings([text], **kwargs)[0]  # Dimension reduction with Matryoshka embeddingsdef get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:    """Get embedding with reduced dimensions (Matryoshka)."""    return get_embedding(        text,        model="text-embedding-3-small",        dimensions=dimensions    )``` ### Template 3: Local Embeddings with Sentence Transformers ```pythonfrom sentence_transformers import SentenceTransformerfrom typing import List, Optionalimport numpy as np class LocalEmbedder:    """Local embedding with sentence-transformers."""     def __init__(        self,        model_name: str = "BAAI/bge-large-en-v1.5",        device: str = "cuda"    ):        self.model = SentenceTransformer(model_name, device=device)        self.model_name = model_name     def embed(        self,        texts: List[str],        normalize: bool = True,        show_progress: bool = False    ) -> np.ndarray:        """Embed texts with optional normalization."""        embeddings = self.model.encode(            texts,            normalize_embeddings=normalize,            show_progress_bar=show_progress,            convert_to_numpy=True        )        return embeddings     def embed_query(self, query: str) -> np.ndarray:        """Embed a query with appropriate prefix for retrieval models."""        # BGE and similar models benefit from query prefix        if "bge" in self.model_name.lower():            query = f"Represent this sentence for searching relevant passages: {query}"        return self.embed([query])[0]     def embed_documents(self, documents: List[str]) -> np.ndarray:        """Embed documents for indexing."""        return self.embed(documents)  # E5 model with instructionsclass E5Embedder:    def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):        self.model = SentenceTransformer(model_name)     def embed_query(self, query: str) -> np.ndarray:        """E5 requires 'query:' prefix for queries."""        return self.model.encode(f"query: {query}")     def embed_document(self, document: str) -> np.ndarray:        """E5 requires 'passage:' prefix for documents."""        return self.model.encode(f"passage: {document}")``` ### Template 4: Chunking Strategies ```pythonfrom typing import List, Tupleimport re def chunk_by_tokens(    text: str,    chunk_size: int = 512,    chunk_overlap: int = 50,    tokenizer=None) -> List[str]:    """Chunk text by token count."""    import tiktoken    tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")     tokens = tokenizer.encode(text)    chunks = []     start = 0    while start < len(tokens):        end = start + chunk_size        chunk_tokens = tokens[start:end]        chunk_text = tokenizer.decode(chunk_tokens)        chunks.append(chunk_text)        start = end - chunk_overlap     return chunks  def chunk_by_sentences(    text: str,    max_chunk_size: int = 1000,    min_chunk_size: int = 100) -> List[str]:    """Chunk text by sentences, respecting size limits."""    import nltk    sentences = nltk.sent_tokenize(text)     chunks = []    current_chunk = []    current_size = 0     for sentence in sentences:        sentence_size = len(sentence)         if current_size + sentence_size > max_chunk_size and current_chunk:            chunks.append(" ".join(current_chunk))            current_chunk = []            current_size = 0         current_chunk.append(sentence)        current_size += sentence_size     if current_chunk:        chunks.append(" ".join(current_chunk))     return chunks  def chunk_by_semantic_sections(    text: str,    headers_pattern: str = r'^#{1,3}\s+.+$') -> List[Tuple[str, str]]:    """Chunk markdown by headers, preserving hierarchy."""    lines = text.split('\n')    chunks = []    current_header = ""    current_content = []     for line in lines:        if re.match(headers_pattern, line, re.MULTILINE):            if current_content:                chunks.append((current_header, '\n'.join(current_content)))            current_header = line            current_content = []        else:            current_content.append(line)     if current_content:        chunks.append((current_header, '\n'.join(current_content)))     return chunks  def recursive_character_splitter(    text: str,    chunk_size: int = 1000,    chunk_overlap: int = 200,    separators: List[str] = None) -> List[str]:    """LangChain-style recursive splitter."""    separators = separators or ["\n\n", "\n", ". ", " ", ""]     def split_text(text: str, separators: List[str]) -> List[str]:        if not text:            return []         separator = separators[0]        remaining_separators = separators[1:]         if separator == "":            # Character-level split            return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]         splits = text.split(separator)        chunks = []        current_chunk = []        current_length = 0         for split in splits:            split_length = len(split) + len(separator)             if current_length + split_length > chunk_size and current_chunk:                chunk_text = separator.join(current_chunk)                 # Recursively split if still too large                if len(chunk_text) > chunk_size and remaining_separators:                    chunks.extend(split_text(chunk_text, remaining_separators))                else:                    chunks.append(chunk_text)                 # Start new chunk with overlap                overlap_splits = []                overlap_length = 0                for s in reversed(current_chunk):                    if overlap_length + len(s) <= chunk_overlap:                        overlap_splits.insert(0, s)                        overlap_length += len(s)                    else:                        break                current_chunk = overlap_splits                current_length = overlap_length             current_chunk.append(split)            current_length += split_length         if current_chunk:            chunks.append(separator.join(current_chunk))         return chunks     return split_text(text, separators)``` ### Template 5: Domain-Specific Embedding Pipeline ```pythonimport refrom typing import List, Optionalfrom dataclasses import dataclass @dataclassclass EmbeddedDocument:    id: str    document_id: str    chunk_index: int    text: str    embedding: List[float]    metadata: dict class DomainEmbeddingPipeline:    """Pipeline for domain-specific embeddings."""     def __init__(        self,        embedding_model: str = "voyage-3-large",        chunk_size: int = 512,        chunk_overlap: int = 50,        preprocessing_fn=None    ):        self.embeddings = VoyageAIEmbeddings(model=embedding_model)        self.chunk_size = chunk_size        self.chunk_overlap = chunk_overlap        self.preprocess = preprocessing_fn or self._default_preprocess     def _default_preprocess(self, text: str) -> str:        """Default preprocessing."""        # Remove excessive whitespace        text = re.sub(r'\s+', ' ', text)        # Remove special characters (customize for your domain)        text = re.sub(r'[^\w\s.,!?-]', '', text)        return text.strip()     async def process_documents(        self,        documents: List[dict],        id_field: str = "id",        content_field: str = "content",        metadata_fields: Optional[List[str]] = None    ) -> List[EmbeddedDocument]:        """Process documents for vector storage."""        processed = []         for doc in documents:            content = doc[content_field]            doc_id = doc[id_field]             # Preprocess            cleaned = self.preprocess(content)             # Chunk            chunks = chunk_by_tokens(                cleaned,                self.chunk_size,                self.chunk_overlap            )             # Create embeddings            embeddings = await self.embeddings.aembed_documents(chunks)             # Create records            for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):                metadata = {"document_id": doc_id, "chunk_index": i}                 # Add specified metadata fields                if metadata_fields:                    for field in metadata_fields:                        if field in doc:                            metadata[field] = doc[field]                 processed.append(EmbeddedDocument(                    id=f"{doc_id}_chunk_{i}",                    document_id=doc_id,                    chunk_index=i,                    text=chunk,                    embedding=embedding,                    metadata=metadata                ))         return processed  # Code-specific pipelineclass CodeEmbeddingPipeline:    """Specialized pipeline for code embeddings."""     def __init__(self):        # Use Voyage's code-specific model        self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")     def chunk_code(self, code: str, language: str) -> List[dict]:        """Chunk code by functions/classes using tree-sitter."""        try:            import tree_sitter_languages            parser = tree_sitter_languages.get_parser(language)            tree = parser.parse(bytes(code, "utf8"))             chunks = []            # Extract function and class definitions            self._extract_nodes(tree.root_node, code, chunks)            return chunks        except ImportError:            # Fallback to simple chunking            return [{"text": code, "type": "module"}]     def _extract_nodes(self, node, source_code: str, chunks: list):        """Recursively extract function/class definitions."""        if node.type in ['function_definition', 'class_definition', 'method_definition']:            text = source_code[node.start_byte:node.end_byte]            chunks.append({                "text": text,                "type": node.type,                "name": self._get_name(node),                "start_line": node.start_point[0],                "end_line": node.end_point[0]            })        for child in node.children:            self._extract_nodes(child, source_code, chunks)     def _get_name(self, node) -> str:        """Extract name from function/class node."""        for child in node.children:            if child.type == 'identifier' or child.type == 'name':                return child.text.decode('utf8')        return "unknown"     async def embed_with_context(        self,        chunk: str,        context: str = ""    ) -> List[float]:        """Embed code with surrounding context."""        if context:            combined = f"Context: {context}\n\nCode:\n{chunk}"        else:            combined = chunk        return await self.embeddings.aembed_query(combined)``` ### Template 6: Embedding Quality Evaluation ```pythonimport numpy as npfrom typing import List, Dict def evaluate_retrieval_quality(    queries: List[str],    relevant_docs: List[List[str]],  # List of relevant doc IDs per query    retrieved_docs: List[List[str]],  # List of retrieved doc IDs per query    k: int = 10) -> Dict[str, float]:    """Evaluate embedding quality for retrieval."""     def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:        retrieved_k = retrieved[:k]        relevant_retrieved = len(set(retrieved_k) & relevant)        return relevant_retrieved / k if k > 0 else 0     def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:        retrieved_k = retrieved[:k]        relevant_retrieved = len(set(retrieved_k) & relevant)        return relevant_retrieved / len(relevant) if relevant else 0     def mrr(relevant: set, retrieved: List[str]) -> float:        for i, doc in enumerate(retrieved):            if doc in relevant:                return 1 / (i + 1)        return 0     def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:        dcg = sum(            1 / np.log2(i + 2) if doc in relevant else 0            for i, doc in enumerate(retrieved[:k])        )        ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))        return dcg / ideal_dcg if ideal_dcg > 0 else 0     metrics = {        f"precision@{k}": [],        f"recall@{k}": [],        "mrr": [],        f"ndcg@{k}": []    }     for relevant, retrieved in zip(relevant_docs, retrieved_docs):        relevant_set = set(relevant)        metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))        metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))        metrics["mrr"].append(mrr(relevant_set, retrieved))        metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))     return {name: np.mean(values) for name, values in metrics.items()}  def compute_embedding_similarity(    embeddings1: np.ndarray,    embeddings2: np.ndarray,    metric: str = "cosine") -> np.ndarray:    """Compute similarity matrix between embedding sets."""    if metric == "cosine":        # Normalize and compute dot product        norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)        norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)        return norm1 @ norm2.T    elif metric == "euclidean":        from scipy.spatial.distance import cdist        return -cdist(embeddings1, embeddings2, metric='euclidean')    elif metric == "dot":        return embeddings1 @ embeddings2.T    else:        raise ValueError(f"Unknown metric: {metric}")  def compare_embedding_models(    texts: List[str],    models: Dict[str, callable],    queries: List[str],    relevant_indices: List[List[int]],    k: int = 5) -> Dict[str, Dict[str, float]]:    """Compare multiple embedding models on retrieval quality."""    results = {}     for model_name, embed_fn in models.items():        # Embed all texts        doc_embeddings = np.array(embed_fn(texts))         retrieved_per_query = []        for query in queries:            query_embedding = np.array(embed_fn([query])[0])            # Compute similarities            similarities = compute_embedding_similarity(                query_embedding.reshape(1, -1),                doc_embeddings,                metric="cosine"            )[0]            # Get top-k indices            top_k_indices = np.argsort(similarities)[::-1][:k]            retrieved_per_query.append([str(i) for i in top_k_indices])         # Convert relevant indices to string IDs        relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]         results[model_name] = evaluate_retrieval_quality(            queries, relevant_docs, retrieved_per_query, k        )     return results``` ## Best Practices ### Do's - **Match model to use case**: Code vs prose vs multilingual- **Chunk thoughtfully**: Preserve semantic boundaries- **Normalize embeddings**: For cosine similarity search- **Batch requests**: More efficient than one-by-one- **Cache embeddings**: Avoid recomputing for static content- **Use Voyage AI for Claude apps**: Recommended by Anthropic ### Don'ts - **Don't ignore token limits**: Truncation loses information- **Don't mix embedding models**: Incompatible vector spaces- **Don't skip preprocessing**: Garbage in, garbage out- **Don't over-chunk**: Lose important context- **Don't forget metadata**: Essential for filtering and debugging