`core` in Rag Module — Public API Reference
Table of Contents
- Module Overview
- RAGConfig
- RAGSystem
- BaseRAGSystem
- PromptRouter
- Reranker
- RerankerConfig
- QueryExpander
- RetrievalCache
- Logging Utilities
- Exception Hierarchy
- Data Classes
- Module-level Helpers
- Enumerations
- Quick-Start Example
1. Module Overview
The core package is the heart of the Fennec Community library. It provides a complete, production-ready pipeline for Retrieval-Augmented Generation (RAG): indexing documents into a vector database, retrieving the most relevant passages for any given query, and generating grounded natural-language answers through an LLM.
Key capabilities:
- Plug-in architecture — bring your own vector DB, LLM, chunker, and context manager.
- Smart query expansion (LLM + curated synonym table, bilingual Arabic/English).
- Intelligent prompt routing — selects the optimal prompt template based on detected language and question type.
- Multi-mode reranking — heuristic, LLM-assisted, or hybrid.
- TTL-aware LRU retrieval cache for repeated queries.
- Full async support with streaming generation.
- Structured exception hierarchy for precise error handling.
- Centralized, structured logging with optional JSON output.
Publicly exported symbols:
from fennec_community.rag.core import (
RAGConfig,
BaseRAGSystem,
RAGSystem,
RerankerConfig,
RerankMode,
Reranker,
PromptRouter,
QuestionType,
)2. RAGConfig
from fennec_community.rag.core import RAGConfigRAGConfig is a dataclass that holds all tunable parameters for a RAGSystem instance. Every field has a sensible default, so you only need to override what you want to change.
Constructor
RAGConfig(
chunk_size: int = 512,
overlap: int = 128,
top_k: int = 5,
min_score: float = 0.0,
enable_reranking: bool = False,
rerank_mode: str = "heuristic",
rerank_top_k: Optional[int] = None,
rerank_original_weight: float = 0.40,
rerank_llm_weight: float = 0.40,
rerank_diversity_weight: float = 0.10,
rerank_length_weight: float = 0.10,
enable_prompt_routing: bool = True,
prompt_language: Optional[str] = None,
include_few_shot: bool = True,
context_max_length: int = 4000,
enable_retrieval_cache: bool = True,
cache_maxsize: int = 256,
cache_ttl: float = 300.0,
ingest_batch_size: int = 64,
)| Parameter | Type | Default | Description |
|---|---|---|---|
chunk_size |
int |
512 |
Maximum number of tokens/characters per chunk. |
overlap |
int |
128 |
Overlap between consecutive chunks (must be < chunk_size). |
top_k |
int |
5 |
Number of chunks to retrieve per query. |
min_score |
float |
0.0 |
Minimum similarity score threshold; chunks below this are filtered. |
enable_reranking |
bool |
False |
Activate the reranker after initial retrieval. |
rerank_mode |
str |
"heuristic" |
Reranking strategy: "heuristic", "llm", or "hybrid". |
rerank_top_k |
Optional[int] |
None |
Number of results to keep after reranking (None = same as top_k). |
rerank_original_weight |
float |
0.40 |
Weight of the original vector DB similarity score in the composite score. |
rerank_llm_weight |
float |
0.40 |
Weight of the LLM relevance score in the composite score. |
rerank_diversity_weight |
float |
0.10 |
Weight of the diversity/novelty bonus. |
rerank_length_weight |
float |
0.10 |
Weight of the length optimality bonus. |
enable_prompt_routing |
bool |
True |
Use PromptRouter to automatically select prompt templates. |
prompt_language |
Optional[str] |
None |
Force prompt language ("ar" or "en"); None = auto-detect. |
include_few_shot |
bool |
True |
Include few-shot examples in the routed prompt. |
context_max_length |
int |
4000 |
Maximum character length of the context block passed to the LLM. |
enable_retrieval_cache |
bool |
True |
Cache retrieval results in an in-memory LRU cache. |
cache_maxsize |
int |
256 |
Maximum number of distinct queries stored in the cache. |
cache_ttl |
float |
300.0 |
Cache entry lifetime in seconds; 0 disables expiry. |
ingest_batch_size |
int |
64 |
Number of chunks sent to the vector DB per write batch. |
Constraint: The four rerank weights (
rerank_original_weight + rerank_llm_weight + rerank_diversity_weight + rerank_length_weight) must sum to exactly 1.0.
validate()
config.validate() -> NonePurpose: Validates all configuration fields and raises ValueError for any illegal combination (e.g., non-positive chunk_size, overlap ≥ chunk_size, invalid rerank_mode, weights that do not sum to 1.0).
Called automatically by BaseRAGSystem.__init__, so manual calls are rarely needed.
Parameters: None.
Returns: None
Raises: ValueError with a descriptive message for any invalid field.
3. RAGSystem
from fennec_community.rag.core import RAGSystemRAGSystem is the main public class you interact with. It wires together a vector database, an LLM, a chunker, and a context manager into a complete RAG pipeline.
3.1 Constructor
RAGSystem(
vector_db: Any,
llm: Any,
chunker: Any,
context_manager: Any,
config: Optional[RAGConfig] = None,
prompt: Optional[PromptInput] = None,
enable_query_expansion: bool = True,
query_expansion_variants: int = 3,
)Purpose: Instantiate a fully wired RAG system. All four core components are required.
| Parameter | Type | Default | Description |
|---|---|---|---|
vector_db |
Any |
— | Vector database. Must expose .add(chunks) and .search(query, top_k, score_threshold). |
llm |
Any |
— | Language model. Must expose .generate(prompt) -> str. |
chunker |
Any |
— | Text chunker. Must expose .chunk_text(text, doc_id) or .chunk(text, doc_id). |
context_manager |
Any |
— | Context builder. Must expose .build(query, chunks) -> str. |
config |
Optional[RAGConfig] |
None |
Configuration object; defaults are applied when None. |
prompt |
Optional[PromptInput] |
None |
Custom prompt: a PromptTemplate, a callable(context, query) -> str, or a format string. Pass None to use PromptRouter. |
enable_query_expansion |
bool |
True |
Expand queries with LLM-generated variants and synonyms before searching. |
query_expansion_variants |
int |
3 |
Maximum number of alternative queries to generate per user query. |
Returns: RAGSystem instance.
Raises: RAGInitializationError if any of the four required components is None.
Example:
from fennec_community.rag.core import RAGSystem, RAGConfig
config = RAGConfig(top_k=8, enable_reranking=True, rerank_mode="hybrid")
rag = RAGSystem(
vector_db=my_vector_db,
llm=my_llm,
chunker=my_chunker,
context_manager=my_ctx_mgr,
config=config,
)3.2 Document Ingestion
add_text()
rag.add_text(
text: str,
doc_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> intPurpose: Index a single plain-text string directly — the simplest ingestion path for short texts (employee records, FAQ entries, notes, etc.).
| Parameter | Type | Required | Description |
|---|---|---|---|
text |
str |
Yes | The raw text content to index. |
doc_id |
Optional[str] |
No | Unique identifier. Auto-generated from timestamp if omitted. |
metadata |
Optional[Dict[str, Any]] |
No | Arbitrary key-value metadata attached to all chunks of this document. |
Returns: int — number of chunks created from the text.
Example:
n = rag.add_text(
"Name: Ahmed Al-Farsi, Phone: +966 50 123 4567",
doc_id="emp_001",
metadata={"department": "Engineering"},
)
print(f"Indexed in {n} chunk(s)")add_texts()
rag.add_texts(
texts: Dict[str, str],
metadata: Optional[Dict[str, Dict[str, Any]]] = None,
) -> Dict[str, int]Purpose: Batch-index multiple documents from a {doc_id: text} dictionary. Preferred over calling add_text() in a loop because it performs a single vector DB write.
| Parameter | Type | Required | Description |
|---|---|---|---|
texts |
Dict[str, str] |
Yes | Mapping of document ID → raw text content. |
metadata |
Optional[Dict[str, Dict[str, Any]]] |
No | Mapping of document ID → metadata dict. |
Returns: Dict[str, int] — mapping of document ID → number of chunks created (0 for failed or empty documents).
Example:
chunks = rag.add_texts(
{
"doc_ar_1": "اسم: محمد، الهاتف: 0501234567",
"doc_ar_2": "اسم: سارة، الهاتف: 0557654321",
},
metadata={
"doc_ar_1": {"team": "Sales"},
"doc_ar_2": {"team": "HR"},
},
)
# → {"doc_ar_1": 1, "doc_ar_2": 1}add_documents()
rag.add_documents(docs: List[LoadedDocument]) -> Dict[str, int]Purpose: Low-level ingestion entry point used internally by add_text and add_texts. Accepts a list of LoadedDocument objects, chunks each document, and writes all chunks to the vector DB in a single batch.
| Parameter | Type | Required | Description |
|---|---|---|---|
docs |
List[LoadedDocument] |
Yes | List of LoadedDocument instances. |
Returns: Dict[str, int] — {doc_id: num_chunks}. Documents that fail chunking or are empty receive a count of 0.
Notes:
- The retrieval cache is automatically invalidated after successful ingestion.
- Documents with empty or whitespace-only
page_contentare skipped with a warning.
3.3 Retrieval
retrieve()
rag.retrieve(
query: str,
top_k: Optional[int] = None,
) -> List[Tuple[Any, float]]Purpose: Retrieve the most relevant document chunks for a query. Internally applies smart query expansion, merges results across all variants, and optionally reranks.
Pipeline:
- Check the retrieval cache — return immediately on a hit.
- Expand the query into semantically equivalent variants.
- Search the vector DB for each variant.
- Merge results, keeping the highest score per unique chunk.
- Optionally rerank the merged results.
- Store the result in the cache.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | User question or search phrase. |
top_k |
Optional[int] |
No | Override the configured top_k for this call only. |
Returns: List[Tuple[Any, float]] — list of (chunk, score) pairs sorted by score descending. Empty list if the query is blank or no results are found.
Raises: RAGRetrievalError on unexpected search failures.
Example:
results = rag.retrieve("ما رقم هاتف أحمد؟", top_k=3)
for chunk, score in results:
print(f"[{score:.2f}] {chunk.text}")3.4 Generation
generate()
rag.generate(
query: str,
include_sources: bool = False,
language: Optional[str] = None,
**llm_kwargs: Any,
) -> strPurpose: End-to-end RAG inference: retrieves relevant chunks, builds a context block, selects an appropriate prompt, and calls the LLM to produce a grounded answer.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | User question. |
include_sources |
bool |
No | If True, appends a formatted source list (doc IDs + scores) to the answer. |
language |
Optional[str] |
No | Override language for prompt selection ("ar" or "en"). Auto-detected from the query if None. |
**llm_kwargs |
Any |
No | Extra keyword arguments forwarded verbatim to llm.generate() (e.g., temperature, max_tokens). |
Returns: str — the LLM's answer. Never raises — errors are returned as human-readable strings (e.g., "❌ Search error occurred.").
Behaviour on edge cases:
- Empty query →
"⚠️ Please enter a valid question." - No relevant chunks found →
"No relevant information found." - Retrieval error →
"❌ Search error occurred."
Example:
answer = rag.generate(
"What is the phone number of Ahmed?",
include_sources=True,
temperature=0.2,
)
print(answer)ask()
rag.ask(
question: str,
include_sources: bool = False,
language: Optional[str] = None,
**llm_kwargs: Any,
) -> strPurpose: Friendly alias for generate() — identical behaviour with a more conversational method name.
| Parameter | Type | Required | Description |
|---|---|---|---|
question |
str |
Yes | User question. |
include_sources |
bool |
No | Append source attribution to the answer. |
language |
Optional[str] |
No | Force language for prompt selection. |
**llm_kwargs |
Any |
No | Extra kwargs forwarded to the LLM. |
Returns: str — the generated answer.
Example:
answer = rag.ask("ما اسم المدير العام؟")3.5 Prompt Management
set_prompt()
rag.set_prompt(prompt: Optional[PromptInput]) -> NonePurpose: Replace or remove the custom prompt template at runtime, without recreating the entire RAGSystem. Passing None restores the default PromptRouter behaviour (if prompt routing is enabled in config).
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt |
Optional[PromptInput] |
Yes | A PromptTemplate with a .format(context, question) method, a callable(context, query) -> str, a Python format string with {context} and {question} placeholders, or None to restore the default. |
Returns: None
Example:
# Use a custom format string
rag.set_prompt("Context:\n{context}\n\nQuestion: {question}\nAnswer:")
# Restore PromptRouter
rag.set_prompt(None)3.6 Persistence
save()
rag.save(path: str) -> NonePurpose: Persist the vector database index and system statistics to disk so the system can be restored later without re-indexing all documents.
| Parameter | Type | Required | Description |
|---|---|---|---|
path |
str |
Yes | Directory path where the system state will be saved. Created automatically if it does not exist. |
Returns: None
Raises: VectorDBPersistenceError if the save operation fails.
Directory layout created:
<path>/
├── vector_db/ ← serialized vector database
└── stats.json ← query and document statisticsload() (class method)
RAGSystem.load(
path: str,
vector_db: Any,
llm: Any,
chunker: Any,
context_manager: Any,
config: Optional[RAGConfig] = None,
enable_query_expansion: bool = True,
) -> RAGSystemPurpose: Reconstruct a previously saved RAGSystem from disk, restoring the vector index and statistics. No re-indexing is required.
| Parameter | Type | Required | Description |
|---|---|---|---|
path |
str |
Yes | Directory path previously passed to save(). |
vector_db |
Any |
Yes | An empty vector DB instance — the saved index is loaded into it. |
llm |
Any |
Yes | Language model instance. |
chunker |
Any |
Yes | Text chunker instance. |
context_manager |
Any |
Yes | Context builder instance. |
config |
Optional[RAGConfig] |
No | Override configuration. |
enable_query_expansion |
bool |
True |
Whether to enable query expansion on the loaded system. |
Returns: RAGSystem — fully initialized and ready to serve queries.
Raises: FileNotFoundError if the path does not exist. VectorDBPersistenceError if the index cannot be loaded.
Example:
rag = RAGSystem.load(
path="./saved_rag",
vector_db=fresh_vector_db,
llm=my_llm,
chunker=my_chunker,
context_manager=my_ctx_mgr,
)
answer = rag.ask("What documents are in this system?")remove_document()
rag.remove_document(doc_id: str) -> intPurpose: Remove all chunks belonging to a specific document from the vector database, and update internal statistics accordingly.
| Parameter | Type | Required | Description |
|---|---|---|---|
doc_id |
str |
Yes | The document identifier used during ingestion. |
Returns: int — number of chunks removed. Returns 0 if the document was not found or if removal failed.
3.7 Cache Management
invalidate_cache()
rag.invalidate_cache() -> NonePurpose: Manually flush the entire retrieval cache. Useful when documents are updated or removed externally (e.g., via a direct vector DB operation that bypasses RAGSystem).
Parameters: None.
Returns: None
get_cache_stats()
rag.get_cache_stats() -> Dict[str, Any]Purpose: Return runtime statistics for the retrieval cache to help diagnose performance and tune cache parameters.
Parameters: None.
Returns: Dict[str, Any] with the following keys:
| Key | Type | Description |
|---|---|---|
size |
int |
Current number of entries in the cache. |
maxsize |
int |
Maximum capacity. |
ttl |
float | None |
TTL in seconds; None = no expiry. |
hits |
int |
Total cache hits since last clear. |
misses |
int |
Total cache misses since last clear. |
hit_rate |
float |
hits / (hits + misses), rounded to 3 decimal places. |
Returns an empty dict if the cache is disabled (enable_retrieval_cache=False).
3.8 Statistics & Diagnostics
get_stats()
rag.get_stats() -> Dict[str, Any]Purpose: Return a snapshot of all system-wide statistics including document counts, query counts, component types, and cache metrics. Useful for monitoring and dashboards.
Parameters: None.
Returns: Dict[str, Any] containing:
| Key | Type | Description |
|---|---|---|
total_queries |
int |
Total calls to generate() / ask(). |
successful_queries |
int |
Queries that produced an answer. |
failed_queries |
int |
Queries that returned an error or no results. |
total_documents |
int |
Documents successfully indexed. |
total_chunks |
int |
Total chunks stored in the vector DB. |
vector_db_size |
int |
Current number of vectors in the DB (from vector_db.size). |
llm_type |
str |
Class name of the LLM component. |
chunker_type |
str |
Class name of the chunker component. |
query_expansion |
bool |
Whether query expansion is active. |
reranking |
bool |
Whether reranking is active. |
cache |
dict |
Output of get_cache_stats(). |
validate_connection()
rag.validate_connection(test_query: str) -> DictPurpose: Health-check the entire pipeline by running a test retrieval and generation. Returns a structured result indicating which components are working.
| Parameter | Type | Required | Description |
|---|---|---|---|
test_query |
str |
Yes | A sample query used to exercise the pipeline. |
Returns: Dict with the following structure:
# Success
{
"success": True,
"reason": "✅ System working successfully",
"components": {"retrieval": True, "generation": True}
}
# Partial (retrieval works, generation unavailable)
{
"success": True,
"reason": "✅ Retrieval works, generation unavailable",
"components": {"retrieval": True, "generation": False, "generation_error": "..."}
}
# Failure
{
"success": False,
"reason": "❌ Error: ...",
"components": None
}
# No documents
{
"success": False,
"reason": "⚠️ No documents in the system",
"components": None
}reset_stats()
rag.reset_stats() -> NonePurpose: Reset query-level counters (total_queries, successful_queries, failed_queries) back to zero. Document and chunk counts are preserved.
Parameters: None.
Returns: None
3.9 Async API
All async methods mirror their synchronous counterparts but are safe for use in asyncio event loops.
aadd_documents()
await rag.aadd_documents(docs: Dict[str, str]) -> Dict[str, int]Purpose: Async document ingestion. Chunks all documents concurrently using asyncio.gather, then writes to the vector DB (using the native async method aadd if available, otherwise asyncio.to_thread).
| Parameter | Type | Description |
|---|---|---|
docs |
Dict[str, str] |
{doc_id: text} mapping. |
Returns: Dict[str, int] — {doc_id: num_chunks}.
aretrieve()
await rag.aretrieve(
query: str,
top_k: Optional[int] = None,
) -> List[Tuple[Any, float]]Purpose: Async retrieval with concurrent query expansion. All variant searches are fired simultaneously via asyncio.gather.
| Parameter | Type | Description |
|---|---|---|
query |
str |
User question. |
top_k |
Optional[int] |
Override configured top_k. |
Returns: List[Tuple[Any, float]] — scored chunks, sorted descending.
agenerate()
await rag.agenerate(
query: str,
include_sources: bool = False,
language: Optional[str] = None,
**llm_kwargs: Any,
) -> strPurpose: Async end-to-end generation. Uses the async generate method of the LLM (generate_async) if available.
Returns: str — the generated answer.
astream()
async for token in rag.astream(
query: str,
language: Optional[str] = None,
**llm_kwargs: Any,
):
print(token, end="", flush=True)Purpose: Async streaming generation — yields tokens as they are produced by the LLM. Enables real-time display of long answers.
| Parameter | Type | Description |
|---|---|---|
query |
str |
User question. |
language |
Optional[str] |
Force language for prompt selection. |
**llm_kwargs |
Any |
Extra kwargs forwarded to the LLM streaming method. |
Yields: str — individual tokens or word chunks.
3.10 Context Manager
RAGSystem supports both synchronous and asynchronous context managers:
# Synchronous
with RAGSystem(vector_db, llm, chunker, ctx_mgr) as rag:
rag.add_texts({"doc1": "some content"})
print(rag.ask("What is in doc1?"))
# cleanup() called automatically on exit
# Asynchronous
async with RAGSystem(vector_db, llm, chunker, ctx_mgr) as rag:
await rag.aadd_documents({"doc1": "some content"})
print(await rag.agenerate("What is in doc1?"))cleanup()
rag.cleanup() -> NonePurpose: Release resources held by all four components (calls cleanup() on each if the method exists). Called automatically when exiting a context manager.
Parameters: None.
Returns: None
4. BaseRAGSystem
from fennec_community.rag.core import BaseRAGSystemBaseRAGSystem is the abstract base class that all RAG system implementations must extend. It defines the required interface and provides default implementations of utility methods.
4.1 Abstract Methods
Subclasses must implement these:
| Method | Signature | Description |
|---|---|---|
add_documents |
(docs: Dict[str, str]) -> Dict[str, int] |
Add and index documents. |
retrieve |
(query: str, top_k: Optional[int]) -> List[Tuple] |
Retrieve relevant chunks. |
generate |
(query: str, **kwargs) -> str |
Generate an answer. |
4.2 Concrete Methods
remove_document()
system.remove_document(doc_id: str) -> intPurpose: Remove a document by ID. The base implementation raises NotImplementedError; override in subclasses.
Returns: int — number of deleted chunks.
save()
system.save(path: str) -> NonePurpose: Persist the system to disk. Raises NotImplementedError in the base class.
load() (class method)
BaseRAGSystem.load(path: str, **kwargs) -> BaseRAGSystemPurpose: Load a previously saved system. Raises NotImplementedError in the base class.
get_stats()
system.get_stats() -> Dict[str, Any]Purpose: Return a snapshot of system statistics.
Returns: Dict with keys: total_queries, successful_queries, failed_queries, total_documents, total_chunks.
reset_stats()
system.reset_stats() -> NonePurpose: Zero out query counters while preserving document and chunk counts.
validate_connection()
system.validate_connection(test_query: str) -> DictPurpose: Run a pipeline health-check. See §3.8 for the full return schema.
cleanup()
system.cleanup() -> NonePurpose: Release resources. Override in subclasses as needed.
4.3 Async Methods
| Method | Description |
|---|---|
aadd_documents(docs) |
Async wrapper around add_documents via asyncio.to_thread. |
aretrieve(query, top_k) |
Async wrapper around retrieve. |
agenerate(query, **kwargs) |
Async wrapper around generate. |
5. PromptRouter
from fennec_community.rag.core import PromptRouter, QuestionTypePromptRouter analyses each user query, detects its language and semantic type, and selects the best-fit prompt template. It supports bilingual operation (Arabic and English) with optional few-shot examples.
Constructor
PromptRouter(include_few_shot: bool = True)| Parameter | Type | Default | Description |
|---|---|---|---|
include_few_shot |
bool |
True |
Prepend a relevant few-shot example to every generated prompt. |
detect_language()
router.detect_language(text: str) -> strPurpose: Determine whether a text is predominantly Arabic or English using Unicode character ratio analysis.
| Parameter | Type | Description |
|---|---|---|
text |
str |
The text to examine. |
Returns: "ar" if the ratio of Arabic Unicode characters exceeds 25%, otherwise "en". Returns "ar" for empty strings.
Example:
router = PromptRouter()
print(router.detect_language("ما هي عاصمة مصر؟")) # → "ar"
print(router.detect_language("What is the capital of Egypt?")) # → "en"detect_question_type()
router.detect_question_type(
query: str,
language: Optional[str] = None,
) -> QuestionTypePurpose: Classify a query into one of eight semantic question types using regex pattern matching on language-specific keyword sets.
| Parameter | Type | Description |
|---|---|---|
query |
str |
The user query. |
language |
Optional[str] |
"ar" or "en"; auto-detected if None. |
Returns: QuestionType enum value. Falls back to QuestionType.GENERAL if no pattern matches.
Supported types:
QuestionType |
English triggers | Arabic triggers |
|---|---|---|
FACTUAL |
who, when, where, how many | من هو، متى، أين، كم |
ANALYTICAL |
why, analyze, explain, causes | لماذا، أسباب، تحليل، اشرح |
COMPARATIVE |
vs, versus, compare, difference | الفرق، مقارنة، أيهما، بين |
PROCEDURAL |
how to, steps to, guide to | كيف، خطوات، طريقة |
DEFINITIONAL |
what is, define, meaning of | ما هو، ما هي، تعريف |
EVALUATIVE |
should, evaluate, pros and cons | قيّم، هل يجب، ما أفضل |
CAUSAL |
cause, leads to, effect of | سبب، نتيجة، أدى إلى |
GENERAL |
(fallback) | (fallback) |
build()
router.build(
query: str,
context: str,
language: Optional[str] = None,
question_type: Optional[QuestionType] = None,
) -> strPurpose: Assemble a complete, ready-to-send LLM prompt by combining the appropriate instruction, optional few-shot example, retrieved context, and the user's query.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | User question. |
context |
str |
Yes | Retrieved context text from the vector DB. |
language |
Optional[str] |
No | Force language ("ar" or "en"); auto-detected if None. |
question_type |
Optional[QuestionType] |
No | Override type detection. |
Returns: str — a fully formatted prompt string ready to be passed to llm.generate().
Example:
router = PromptRouter(include_few_shot=True)
prompt = router.build(
query="How does RAG work?",
context="RAG combines retrieval with generation...",
)
answer = llm.generate(prompt)6. Reranker
from fennec_community.rag.core import RerankerReranker re-scores and re-orders an initial list of retrieved (chunk, score) pairs to improve answer quality. It supports three modes and includes near-duplicate filtering.
Constructor
Reranker(
config: Optional[RerankerConfig] = None,
llm: Any = None,
)| Parameter | Type | Default | Description |
|---|---|---|---|
config |
Optional[RerankerConfig] |
None |
Reranking configuration. Defaults applied if None. |
llm |
Any |
None |
LLM instance required for RerankMode.LLM and RerankMode.HYBRID. If absent, falls back to HEURISTIC. |
rerank()
reranker.rerank(
query: str,
chunks: List[Tuple[Any, float]],
) -> List[Tuple[Any, float]]Purpose: The main entry point. Applies near-duplicate filtering (if enabled), scores each chunk according to the configured mode, and returns the top results sorted by composite score.
Scoring modes:
- HEURISTIC: Combines original vector score with a length optimality bonus and a diversity/term-overlap bonus. Fast, no LLM calls.
- LLM: Calls the LLM in parallel to rate each chunk's relevance on a 0–10 scale, then combines with the original score and length bonus.
- HYBRID: Combines all factors — original score, LLM relevance, length bonus, and diversity bonus.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | The user query (used for scoring and term-overlap computation). |
chunks |
List[Tuple[Any, float]] |
Yes | Initial retrieval results: list of (chunk, score) pairs. |
Returns: List[Tuple[Any, float]] — re-ranked (chunk, composite_score) pairs, descending. Length is min(len(input), config.top_k).
Example:
from fennec_commuinty.rag.core import Reranker, RerankerConfig, RerankMode
rc = RerankerConfig(mode=RerankMode.HYBRID, top_k=5)
reranker = Reranker(config=rc, llm=my_llm)
raw_results = vector_db.search("phone number of Ahmed", top_k=20)
reranked = reranker.rerank("phone number of Ahmed", raw_results)clear_cache()
reranker.clear_cache() -> NonePurpose: Clear the internal LRU cache that stores LLM relevance scores for (query, text) pairs. Useful to free memory or force re-evaluation.
Parameters: None.
Returns: None
7. RerankerConfig
from fennec_community.rag.core import RerankerConfig, RerankModeDataclass holding all parameters for the Reranker.
Constructor
RerankerConfig(
mode: RerankMode = RerankMode.HEURISTIC,
top_k: Optional[int] = None,
original_score_weight: float = 0.40,
llm_score_weight: float = 0.40,
diversity_weight: float = 0.10,
length_weight: float = 0.10,
llm_eval_language: str = "ar",
ideal_min_words: int = 30,
ideal_max_words: int = 300,
dedup_enabled: bool = True,
dedup_threshold: float = 0.85,
)| Parameter | Type | Default | Description |
|---|---|---|---|
mode |
RerankMode |
HEURISTIC |
Reranking strategy. |
top_k |
Optional[int] |
None |
Max results to return; None = same as input size. |
original_score_weight |
float |
0.40 |
Weight of the raw vector DB score in the composite formula. |
llm_score_weight |
float |
0.40 |
Weight of the LLM-assigned relevance score (ignored in HEURISTIC mode; merged into original_score_weight). |
diversity_weight |
float |
0.10 |
Weight of the diversity + term-overlap bonus. |
length_weight |
float |
0.10 |
Weight of the length optimality bonus. |
llm_eval_language |
str |
"ar" |
Language for LLM scoring prompts ("ar" or "en"). |
ideal_min_words |
int |
30 |
Lower bound of the optimal chunk word count. |
ideal_max_words |
int |
300 |
Upper bound of the optimal chunk word count. |
dedup_enabled |
bool |
True |
Filter near-duplicate chunks before scoring. |
dedup_threshold |
float |
0.85 |
Jaccard similarity threshold above which two chunks are considered duplicates. |
Constraint:
original_score_weight + llm_score_weight + diversity_weight + length_weightmust equal 1.0.
effective_weights()
config.effective_weights(mode: RerankMode) -> Dict[str, float]Purpose: Return mode-adjusted weight dictionary. In HEURISTIC mode, llm_score_weight is folded into original_score_weight so no LLM weight is wastefully applied.
| Parameter | Type | Description |
|---|---|---|
mode |
RerankMode |
The mode to compute weights for. |
Returns: Dict[str, float] — keys: orig, llm, len, div.
8. QueryExpander
from fennec_community.rag.core import QueryExpanderQueryExpander generates semantically equivalent rephrasings of a query to improve recall. It uses an LLM as the primary strategy and falls back to a curated bilingual synonym table when the LLM is unavailable.
Constructor
QueryExpander(
llm: Optional[Any] = None,
max_variants: int = 3,
use_llm: bool = True,
fallback_synonyms: bool = True,
)| Parameter | Type | Default | Description |
|---|---|---|---|
llm |
Optional[Any] |
None |
LLM instance with .generate(prompt) -> str. Pass None for synonym-only mode. |
max_variants |
int |
3 |
Maximum number of alternative queries to generate (excluding the original). |
use_llm |
bool |
True |
Whether to attempt LLM expansion. Automatically disabled if llm is None. |
fallback_synonyms |
bool |
True |
Use the synonym table if LLM expansion fails or is disabled. |
expand()
expander.expand(query: str) -> List[str]Purpose: Return the original query plus up to max_variants semantically equivalent alternatives. The first element is always the original query.
Strategy:
- If
use_llm=True, call the LLM with a language-appropriate expansion prompt. - On LLM failure, fall back to the synonym table.
- If both are disabled, return
[original_query].
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | The original user query. |
Returns: List[str] — [original, variant_1, variant_2, ...]. Always contains at least one element.
Example:
expander = QueryExpander(llm=my_llm, max_variants=3)
queries = expander.expand("رقم التليفون")
# → ["رقم التليفون", "رقم الهاتف", "رقم الجوال", "رقم الموبايل"]merge_retrieval_results() (module-level function)
from fennec_community.rag.core import merge_retrieval_results
merge_retrieval_results(
results_per_query: List[List[Tuple[Any, float]]],
top_k: int,
) -> List[Tuple[Any, float]]Purpose: Combine retrieval results from multiple query variants into a single deduplicated list. When the same chunk appears across multiple variant results, the highest score is kept.
| Parameter | Type | Required | Description |
|---|---|---|---|
results_per_query |
List[List[Tuple[Any, float]]] |
Yes | One list of (chunk, score) pairs per query variant. |
top_k |
int |
Yes | Maximum number of results to return. |
Returns: List[Tuple[Any, float]] — merged, deduplicated, score-sorted list of up to top_k results.
Example:
results_a = vector_db.search("phone number", top_k=5)
results_b = vector_db.search("contact number", top_k=5)
merged = merge_retrieval_results([results_a, results_b], top_k=5)9. RetrievalCache
from fennec_community.rag.core import RetrievalCacheRetrievalCache is a thread-safe, TTL-aware LRU cache for storing retrieval results. It is managed internally by RAGSystem but can also be used standalone.
Constructor
RetrievalCache(
maxsize: int = 256,
ttl: Optional[float] = 300.0,
)| Parameter | Type | Default | Description |
|---|---|---|---|
maxsize |
int |
256 |
Maximum number of distinct query entries. Oldest entries are evicted when full. |
ttl |
Optional[float] |
300.0 |
Entry lifetime in seconds. Pass None to disable expiry. |
get()
cache.get(query: str, top_k: int) -> Optional[List[Tuple[Any, float]]]Purpose: Retrieve a cached result for a query and top_k combination. Returns None on a cache miss or if the entry has expired. Updates the LRU order on a hit.
| Parameter | Type | Description |
|---|---|---|
query |
str |
The query string (normalized for the cache key). |
top_k |
int |
The top_k value used during retrieval. |
Returns: Cached List[Tuple[Any, float]] on a hit, None on a miss or expiry.
set()
cache.set(
query: str,
top_k: int,
results: List[Tuple[Any, float]],
) -> NonePurpose: Store a retrieval result in the cache. If the cache is at capacity, the oldest entry is evicted.
| Parameter | Type | Description |
|---|---|---|
query |
str |
The query string. |
top_k |
int |
The top_k value used during retrieval. |
results |
List[Tuple[Any, float]] |
The retrieval results to cache. |
Returns: None
invalidate()
cache.invalidate(query: str, top_k: int) -> boolPurpose: Remove a specific entry from the cache by query and top_k.
Returns: True if the entry existed and was removed; False otherwise.
clear()
cache.clear() -> NonePurpose: Evict all entries and reset hit/miss counters.
Returns: None
stats()
cache.stats() -> Dict[str, Any]Purpose: Return current cache metrics.
Returns: Dict with keys size, maxsize, ttl, hits, misses, hit_rate.
10. Logging Utilities
from fennec_community.rag.core.logger import get_logger, setup_logging, FennecLoggersetup_logging()
setup_logging(
level: str = "INFO",
log_file: Optional[str] = None,
json_format: bool = False,
use_colour: bool = True,
max_bytes: int = 10_485_760, # 10 MB
backup_count: int = 3,
propagate: bool = False,
) -> NonePurpose: Configure the library-wide fennec logger once at application startup. Sets up a colour-coded console handler and optionally a rotating file handler. Calling this more than once is safe — it clears and rebuilds handlers each time.
| Parameter | Type | Default | Description |
|---|---|---|---|
level |
str |
"INFO" |
Minimum log level: "DEBUG", "INFO", "WARNING", or "ERROR". |
log_file |
Optional[str] |
None |
Path for the rotating log file. Directory is created automatically. |
json_format |
bool |
False |
Write JSON-lines to the file (ideal for Datadog, ELK, Splunk). |
use_colour |
bool |
True |
Enable ANSI colour codes in console output (disabled automatically if the terminal doesn't support it). |
max_bytes |
int |
10 MB |
Maximum file size before rotation. |
backup_count |
int |
3 |
Number of rotated backup files to retain. |
propagate |
bool |
False |
Whether log records propagate to the root Python logger. |
Returns: None
Example:
from fennec_community.rag.core.logger import setup_logging
setup_logging(
level="DEBUG",
log_file="logs/fennec.log",
json_format=True,
)get_logger()
get_logger(name: str) -> logging.LoggerPurpose: Return a child logger registered under the fennec hierarchy. Performs lazy bootstrap if setup_logging() has not been called yet. All library modules call this internally.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Typically __name__ of the calling module. |
Returns: logging.Logger bound under fennec.<name> (or fennec if name already starts with fennec).
Example:
from fennec_community.rag.core.logger import get_logger
logger = get_logger(__name__)
logger.info("RAG system initialised with %d documents", n)
logger.debug("Query variants: %s", variants)FennecLogger (class)
A thin static wrapper for users who prefer a class-based logging API:
FennecLogger.setup(level="DEBUG", log_file="fennec.log")
log = FennecLogger.get("mymodule")
log.info("processing started")| Method | Equivalent to |
|---|---|
FennecLogger.setup(**kwargs) |
setup_logging(**kwargs) |
FennecLogger.get(name) |
get_logger(name) |
11. Exception Hierarchy
from fennec_community.rag.core.exceptions import *All exceptions inherit from FennecError, enabling callers to catch either the specific subclass or the general base.
FennecError
├── ConfigurationError
│ ├── MissingAPIKeyError(provider)
│ └── InvalidConfigValueError(field, value, reason)
├── DocumentError
│ ├── EmptyDocumentError(doc_id)
│ ├── DocumentNotFoundError(doc_id)
│ └── ChunkingError(doc_id, reason)
├── VectorDatabaseError
│ ├── VectorDBConnectionError(db_type, reason)
│ ├── VectorDBWriteError
│ ├── VectorDBSearchError
│ └── VectorDBPersistenceError(operation, path, reason)
├── EmbeddingError
│ ├── EmbeddingModelNotFoundError(model_name)
│ ├── EmbeddingDimensionMismatchError(expected, got)
│ └── EmbeddingAPIError(provider, reason)
├── LLMError
│ ├── LLMConnectionError(provider, reason)
│ ├── LLMGenerationError(provider, reason)
│ ├── LLMTimeoutError(provider, timeout_seconds)
│ └── LLMRateLimitError(provider)
├── RAGError
│ ├── RAGInitializationError(reason)
│ ├── RAGRetrievalError(query, reason)
│ ├── RAGGenerationError(reason)
│ ├── NoRelevantDocumentsError(query)
│ └── QueryExpansionError(reason)
├── PromptError
│ └── PromptFormattingError(template_name, missing_vars)
├── CacheError
│ ├── CacheReadError
│ └── CacheWriteError
└── LoaderError
├── UnsupportedFileTypeError(extension)
└── FileReadError(filepath, reason)Base class attributes:
| Attribute | Type | Description |
|---|---|---|
message |
str |
Human-readable error description. |
details |
Any |
Optional extra context. |
code |
str |
Machine-readable error code (e.g., "RAG_RETRIEVAL_FAILED"). |
Example error handling:
from fennec_community.rag.core.exceptions import FennecError, RAGRetrievalError, LLMRateLimitError
try:
answer = rag.generate("What is X?")
except LLMRateLimitError as e:
print(f"Rate limited by {e.provider} — retry later")
except RAGRetrievalError as e:
print(f"Retrieval failed: {e.reason}")
except FennecError as e:
print(f"Unexpected Fennec error [{e.code}]: {e.message}")12. Data Classes
LoadedDocument
from fennec_community.rag.core import LoadedDocumentRepresents a single document ready for ingestion.
| Attribute | Type | Description |
|---|---|---|
page_content |
str |
Raw text content of the document. |
metadata |
Dict[str, Any] |
Arbitrary key-value metadata. |
doc_id |
Optional[str] |
Unique identifier. Auto-generated from MD5 hash + timestamp if omitted. |
Methods:
to_dict() -> Dict[str, Any]— serialize to a plain dictionary.
13. Module-level Helpers
merge_retrieval_results()
Documented in §8. Exported from core.query_expansion.
14. Enumerations
QuestionType
from fennec_community.rag.core import QuestionType| Value | String | Description |
|---|---|---|
FACTUAL |
"factual" |
Queries about specific facts. |
ANALYTICAL |
"analytical" |
Requests for analysis or explanation. |
COMPARATIVE |
"comparative" |
Comparison between two or more entities. |
PROCEDURAL |
"procedural" |
Step-by-step how-to queries. |
DEFINITIONAL |
"definitional" |
Requests for definitions or explanations of terms. |
EVALUATIVE |
"evaluative" |
Evaluation or recommendation queries. |
CAUSAL |
"causal" |
Cause-and-effect queries. |
GENERAL |
"general" |
Fallback for unclassified queries. |
RerankMode
from fennec_community.rag.core import RerankMode| Value | String | Description |
|---|---|---|
HEURISTIC |
"heuristic" |
Fast scoring using only local signals (no LLM calls). |
LLM |
"llm" |
LLM-assisted scoring for maximum relevance accuracy. |
HYBRID |
"hybrid" |
Combination of heuristic and LLM scoring. |
15. Quick-Start Example
The following example demonstrates the complete workflow from installation to querying:
from fennec_community.rag.core import RAGSystem, RAGConfig
from fennec_community.rag.core.logger import setup_logging
# 1. Configure logging
setup_logging(level="INFO", log_file="logs/app.log", json_format=False)
# 2. Build configuration
config = RAGConfig(
top_k=8,
enable_reranking=True,
rerank_mode="hybrid",
enable_retrieval_cache=True,
cache_ttl=600.0,
)
# 3. Initialise the system (supply your own components)
rag = RAGSystem(
vector_db=my_vector_db,
llm=my_llm,
chunker=my_chunker,
context_manager=my_ctx_mgr,
config=config,
enable_query_expansion=True,
query_expansion_variants=3,
)
# 4. Index documents
rag.add_texts({
"hr_001": "Name: Ahmed Al-Farsi, Role: Software Engineer, Phone: +966 50 111 2222",
"hr_002": "Name: Sara Al-Qahtani, Role: HR Manager, Phone: +966 55 333 4444",
})
# 5. Ask questions
answer = rag.ask("What is Ahmed's phone number?", include_sources=True)
print(answer)
# 6. Check system health
health = rag.validate_connection("test query")
print(health)
# 7. Inspect statistics
print(rag.get_stats())
# 8. Persist and reload
rag.save("./saved_rag")
rag2 = RAGSystem.load(
"./saved_rag",
vector_db=fresh_vector_db,
llm=my_llm,
chunker=my_chunker,
context_manager=my_ctx_mgr,
)
# 9. Async streaming
import asyncio
async def stream_answer():
async for token in rag.astream("Describe Ahmed's role"):
print(token, end="", flush=True)
asyncio.run(stream_answer())Simple Real Example
from fennec_community.llm import GeminiInterface
from fennec_community.document_loaders import TextLoader
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem
loader = TextLoader("./data_kn/faq.txt").load()
chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = GeminiInterface(api_key=llm_api)
context_manager = ContextManager()
rag_system = RAGSystem(llm=llm, vector_db=vector_db,chunker=chunker
, context_manager=context_manager)
rag_system.add_documents(loader)
query = "ما هي طرق الدفع المتاحة؟"
response = rag_system.generate(query,include_sources=True)
print("Response:", response)
community/rag/core.md