Domain RAG — `domain_rag` Module — Public API Reference
Table of Contents
- Module Overview
- DomainDefinition
- PRESET_DOMAINS
- DomainSpecificRAG
- DomainMetrics
- Response Schema
- Domain Instantiation Patterns
- Query Processing Pipeline
- Quick-Start Example
1. Module Overview
The domain_rag package wraps any standard RAG backend and adds domain-specific intelligence: vocabulary-aware query enrichment, relevance gating, domain-boosted result ranking, and language-locked generation. Instead of returning whatever the vector DB finds, the system measures how relevant a query is to the configured domain, optionally rejects off-topic questions, expands queries with domain synonyms, re-ranks retrieved documents by domain term density, and injects domain-specific system instructions and language constraints into every LLM call.
Key capabilities:
- Domain relevance scoring — two-layer (direct + partial) term-matching with configurable boost weight.
- Strict mode — hard rejection of queries below the
min_relevancethreshold, returning the domain's configured rejection message. - Query enrichment — synonyms from the domain vocabulary are appended to the query before retrieval.
- Domain-boosted re-ranking — combines vector-DB similarity scores with per-document domain term density.
- Language locking — language instructions are injected into both the system prompt and the query string to guarantee LLM compliance.
- LLM-powered auto term expansion — optional startup expansion of the domain vocabulary via the LLM.
- Five preset domains —
medical,legal,technical,financial,academic— ready to use out of the box. - Full async support —
aquery(),agenerate(),aretrieve(), and token-levelastream(). - JSON persistence — save and load domain definitions to/from disk.
Publicly exported symbols:
from fennec_community.rag.types.domain_rag import DomainDefinition, PRESET_DOMAINS, DomainSpecificRAG, DomainMetrics2. DomainDefinition
from fennec_community.rag.types.domain_rag import DomainDefinitionDomainDefinition is the configuration object that fully describes a single domain — its vocabulary, synonyms, system prompt, rejection message, relevance thresholds, and any arbitrary metadata. It is the single source of truth consumed by DomainSpecificRAG.
2.1 Constructor
DomainDefinition(
name: str,
terms: List[str],
display_name: str = "",
language: str = "ar",
synonyms: Dict[str, List[str]] = {},
system_prompt: str = "",
rejection_message: str = "",
min_relevance: float = 0.10,
boost_weight: float = 1.5,
metadata: Dict[str, Any] = {},
)Purpose: Define a domain with its full vocabulary, LLM instructions, and scoring parameters. Only name and terms are required; everything else has a sensible default.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str |
Yes | — | Machine-readable domain identifier (e.g., "nutrition", "medical"). Used as a key in result dicts and file names. |
terms |
List[str] |
Yes | — | Primary vocabulary list. These are the anchor terms the relevance scorer uses to measure how on-topic a query is. |
display_name |
str |
No | name |
Human-readable label shown in responses and logs (e.g., "الطب والصحة"). Defaults to name if left empty. |
language |
str |
No | "ar" |
ISO 639-1 code of the domain vocabulary language. Controls which language-lock instruction is injected into the LLM ("ar", "en", "fr", "de", "es", or any custom string). |
synonyms |
Dict[str, List[str]] |
No | {} |
Synonym map: {"term": ["syn1", "syn2"]}. Synonyms are appended to queries during enrichment and included in the full term set for relevance scoring. |
system_prompt |
str |
No | "" |
Domain-specific instructions prepended to every LLM call. A language-lock instruction is always appended automatically. |
rejection_message |
str |
No | auto | Message returned when strict_mode=True and the query is below min_relevance. Auto-generated as "your question is out of zone for this domain '<display_name>'." if left empty. |
min_relevance |
float |
No | 0.10 |
Minimum relevance score [0.0, 1.0] required to accept a query in strict mode. |
boost_weight |
float |
No | 1.5 |
Multiplier applied to the raw relevance score and to the per-document domain bonus during ranking. Higher values make domain matching more influential. |
metadata |
Dict[str, Any] |
No | {} |
Free-form key-value metadata for user-defined purposes (version, author, creation date, etc.). Not used internally. |
Example:
from fennec_community.rag.types.domain_rag import DomainDefinition
nutrition = DomainDefinition(
name="nutrition",
display_name="التغذية والصحة",
language="ar",
terms=["بروتين", "كربوهيدرات", "فيتامين", "سعرات", "حمية"],
synonyms={"سعرات": ["كالوري", "طاقة"], "حمية": ["رجيم", "دايت"]},
system_prompt="أنت خبير تغذية. أجب بدقة علمية وأضف تحذيرات عند الضرورة.",
rejection_message="هذا السؤال خارج نطاق التغذية والصحة.",
min_relevance=0.15,
boost_weight=1.8,
)2.2 Properties
all_terms
domain.all_terms -> Set[str]Purpose: Return the complete, deduplicated, lower-cased vocabulary set — primary terms plus all values from synonyms. The result is computed lazily on first access and cached; calling add_terms() or add_synonym() invalidates the cache automatically.
Returns: Set[str] — all domain vocabulary in lower case.
Example:
print(nutrition.all_terms)
# {'بروتين', 'كربوهيدرات', 'فيتامين', 'سعرات', 'حمية', 'كالوري', 'طاقة', 'رجيم', 'دايت'}2.3 Term & Synonym Management
add_terms()
domain.add_terms(*terms: str) -> NonePurpose: Append one or more new terms to the domain vocabulary at runtime. Duplicate terms (already present in self.terms) and blank strings are silently ignored. The all_terms cache is invalidated so the next access reflects the new terms.
| Parameter | Type | Description |
|---|---|---|
*terms |
str |
One or more term strings to add. Each is stripped of leading/trailing whitespace. |
Returns: None
Example:
domain.add_terms("أوميغا 3", "ألياف", "معادن")
print(len(domain.all_terms)) # updated countadd_synonym()
domain.add_synonym(term: str, *synonyms: str) -> NonePurpose: Register one or more synonyms for an existing or new term. Duplicate synonyms for the same term are silently ignored. The all_terms cache is invalidated.
| Parameter | Type | Description |
|---|---|---|
term |
str |
The canonical term to attach synonyms to. |
*synonyms |
str |
One or more synonym strings. |
Returns: None
Example:
domain.add_synonym("بروتين", "بروتينات", "ببتيد")
print(domain.synonyms["بروتين"]) # ['بروتينات', 'ببتيد']2.4 Serialisation & Persistence
to_dict()
domain.to_dict() -> DictPurpose: Serialise the entire DomainDefinition to a plain Python dictionary suitable for JSON encoding. The internal cache field _all_terms is excluded.
Parameters: None.
Returns: Dict — all public fields as a plain dictionary.
Example:
import json
print(json.dumps(domain.to_dict(), ensure_ascii=False, indent=2))from_dict() (class method)
DomainDefinition.from_dict(data: Dict) -> DomainDefinitionPurpose: Reconstruct a DomainDefinition instance from a plain dictionary (e.g., parsed from JSON). The _all_terms cache key is stripped before construction.
| Parameter | Type | Required | Description |
|---|---|---|---|
data |
Dict |
Yes | Dictionary previously produced by to_dict() or a manually constructed equivalent. |
Returns: DomainDefinition instance.
Example:
data = json.loads(raw_json)
domain = DomainDefinition.from_dict(data)save()
domain.save(path: str) -> NonePurpose: Persist the domain definition to a UTF-8 encoded JSON file. Useful for sharing domain configurations between projects or storing user-defined domains alongside the application.
| Parameter | Type | Required | Description |
|---|---|---|---|
path |
str |
Yes | File path to write (e.g., "domains/nutrition.json"). The parent directory must already exist. |
Returns: None
Example:
domain.save("domains/nutrition.json")load() (class method)
DomainDefinition.load(path: str) -> DomainDefinitionPurpose: Deserialise a DomainDefinition from a JSON file previously written by save().
| Parameter | Type | Required | Description |
|---|---|---|---|
path |
str |
Yes | Path to the JSON file to read. |
Returns: DomainDefinition instance.
Raises: FileNotFoundError if the path does not exist. json.JSONDecodeError if the file is malformed.
Example:
domain = DomainDefinition.load("domains/nutrition.json")
rag = DomainSpecificRAG(my_rag, domain=domain)3. PRESET_DOMAINS
from fennec_community.rag.types.domain_rag import PRESET_DOMAINSPRESET_DOMAINS is a Dict[str, DomainDefinition] containing five ready-made domain configurations. Each preset includes curated Arabic terminology, a strict system prompt, and a rejection message. Presets are copied (not mutated) when passed to DomainSpecificRAG, so the originals are always clean.
| Key | display_name |
Focus |
|---|---|---|
"medical" |
الطب والصحة |
Medications, diagnoses, symptoms, dosages. Always advises consulting a doctor. |
"legal" |
القانون والتشريع |
Laws, articles, contracts, court rulings. Always advises consulting a lawyer. |
"technical" |
التقنية والبرمجة |
Code, APIs, algorithms, databases. Quotes code verbatim from documents. |
"financial" |
المال والاستثمار |
Stocks, trading, returns, assets. Always notes investment risk. |
"academic" |
الأبحاث والأكاديميا |
Research, studies, hypotheses, methodology. Distinguishes direct results from inferences. |
Usage:
from fennec_community.rag.types.domain_rag import PRESET_DOMAINS, DomainSpecificRAG
# Use directly by key string
rag = DomainSpecificRAG(my_rag, domain="medical")
# Inspect a preset
print(PRESET_DOMAINS["legal"].terms)
# Extend a preset before use
medical = DomainDefinition.from_dict(PRESET_DOMAINS["medical"].to_dict())
medical.add_terms("تصوير مقطعي", "رنين مغناطيسي")
rag = DomainSpecificRAG(my_rag, domain=medical)4. DomainSpecificRAG
from fennec_community.rag.types.domain_rag import DomainSpecificRAGDomainSpecificRAG is the main class of the package. It wraps any RAG backend with domain-aware retrieval, relevance gating, vocabulary-based re-ranking, and language-locked generation.
4.1 Constructor
DomainSpecificRAG(
rag_system: Any,
domain: Union[str, DomainDefinition],
domain_terms: Optional[List[str]] = None,
llm: Optional[Any] = None,
strict_mode: bool = False,
auto_expand_terms: bool = False,
)Purpose: Initialise a domain-specialised RAG system bound to a specific knowledge domain.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
rag_system |
Any |
Yes | — | Underlying RAG backend. Must expose .retrieve(query, top_k=...) and .generate(query, context). |
domain |
str | DomainDefinition |
Yes | — | Domain specification. Accepts: (1) a preset key string ("medical", "legal", etc.); (2) an arbitrary string — creates a minimal DomainDefinition with a warning; (3) a fully configured DomainDefinition object. |
domain_terms |
Optional[List[str]] |
No | None |
Additional terms merged into the domain vocabulary after resolution. Useful for extending presets without subclassing. |
llm |
Optional[Any] |
No | None |
Language model with .generate(prompt) -> str. Falls back to rag_system.llm if None. Required for auto_expand_terms=True. |
strict_mode |
bool |
No | False |
When True, queries scoring below domain.min_relevance are rejected immediately and domain.rejection_message is returned without any retrieval or generation. In non-strict mode, all queries proceed but are still enriched. |
auto_expand_terms |
bool |
No | False |
When True (and llm is available), the LLM is called at startup to generate up to 10 additional domain terms and add them to the vocabulary. Runs synchronously via asyncio.run(). |
Returns: DomainSpecificRAG instance.
Raises: TypeError — if domain is neither str nor DomainDefinition.
Domain resolution logic:
domain value |
Resolved as |
|---|---|
"medical" (or any preset key) |
Deep copy of PRESET_DOMAINS["medical"] |
"mycustomdomain" (unknown string) |
New minimal DomainDefinition(name="mycustomdomain", terms=[]) with a warning |
DomainDefinition(...) |
Used as-is (original object, not copied) |
Example:
from fennec_community.rag.types.domain_rag import DomainSpecificRAG, DomainDefinition
# Pattern 1 — preset by key
rag = DomainSpecificRAG(my_rag, domain="medical", strict_mode=True)
# Pattern 2 — preset + extra terms
rag = DomainSpecificRAG(
my_rag,
domain="medical",
domain_terms=["تصوير مقطعي", "رنين مغناطيسي"],
)
# Pattern 3 — custom DomainDefinition
nutrition = DomainDefinition(
name="nutrition",
terms=["بروتين", "فيتامين", "سعرات"],
language="ar",
)
rag = DomainSpecificRAG(my_rag, domain=nutrition, auto_expand_terms=True)4.2 Core Query Methods
query()
rag.query(
query: str,
context: Optional[Dict] = None,
) -> Dict[str, Any]Purpose: Synchronous domain-specific query. Executes the full domain pipeline: relevance scoring → optional rejection → query enrichment → retrieval → domain re-ranking → generation. This method is a synchronous wrapper that calls aquery() via asyncio.run().
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question from the user. |
context |
Optional[Dict] |
No | Arbitrary extra key-value pairs merged into the generation context passed to rag_system.generate(). |
Returns: Dict[str, Any] — see Response Schema for the full field reference.
Example:
result = rag.query("ما هي الجرعة اليومية الموصى بها من فيتامين د؟")
print(result["answer"])
print(f"Domain relevance : {result['domain_relevance']:.2%}")
print(f"Matched terms : {result['matched_terms']}")
print(f"Docs used : {result['docs_count']}")
print(f"Latency : {result['latency_ms']} ms")aquery()
await rag.aquery(
query: str,
context: Optional[Dict] = None,
) -> Dict[str, Any]Purpose: Async domain-specific query. Identical pipeline to query() but fully async — retrieval and generation are awaited without blocking the event loop. This is the preferred method in async applications.
Pipeline steps (in order):
- Measure domain relevance (
_measure_relevance). - If
strict_modeand score <min_relevance→ return rejection response immediately. - Enrich query with domain synonyms (
_enhance_query). - Retrieve top-15 documents asynchronously (
_retrieve). - Re-rank by combined RAG + domain score (
_rank_by_domain). - Generate answer with domain system prompt and language lock (
_generate). - Record metrics.
- Return enriched response dict.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Extra context merged into the generation call. |
Returns: Dict[str, Any] — see Response Schema.
Example:
import asyncio
async def main():
result = await rag.aquery("ما هي أسباب ارتفاع ضغط الدم؟")
print(result["answer"])
asyncio.run(main())4.3 Document Ingestion
add_document()
rag.add_document(
text: str,
metadata: Optional[Dict] = None,
) -> AnyPurpose: Index a single document into the underlying RAG backend. Automatically injects {"domain": domain.name} into the metadata so documents can later be filtered or attributed to the correct domain.
| Parameter | Type | Required | Description |
|---|---|---|---|
text |
str |
Yes | Raw text content to index. Must be non-empty after stripping. |
metadata |
Optional[Dict] |
No | Additional metadata key-value pairs merged with the auto-injected domain tag. |
Returns: Whatever rag_system.add_document() returns (implementation-dependent).
Raises: ValueError — if text is empty or whitespace-only.
Example:
rag.add_document(
"فيتامين د ضروري لصحة العظام وامتصاص الكالسيوم. الجرعة اليومية الموصى بها للبالغين هي 600-800 وحدة دولية.",
metadata={"source": "who_guidelines_2024", "page": 12},
)4.4 Domain Vocabulary Management
add_terms()
rag.add_terms(*terms: str) -> NonePurpose: Extend the live domain vocabulary with new terms at runtime. Changes take effect immediately — the next query() or aquery() call will use the updated vocabulary for relevance scoring and query enrichment. Delegates to domain_def.add_terms().
| Parameter | Type | Description |
|---|---|---|
*terms |
str |
One or more term strings to add to the domain. |
Returns: None
Example:
rag.add_terms("أوميغا 3", "حمض الفوليك", "الكولاجين")add_synonym()
rag.add_synonym(term: str, *synonyms: str) -> NonePurpose: Register one or more synonyms for a domain term at runtime. Synonyms are used in query enrichment (appended to the query before retrieval) and are included in the all_terms set for relevance scoring. Delegates to domain_def.add_synonym().
| Parameter | Type | Description |
|---|---|---|
term |
str |
The canonical term to attach synonyms to. |
*synonyms |
str |
One or more synonym strings. |
Returns: None
Example:
rag.add_synonym("فيتامين د", "كوليكالسيفيرول", "D3")4.5 Diagnostics & Persistence
get_metrics()
rag.get_metrics() -> Dict[str, Any]Purpose: Return a comprehensive summary of all per-session domain performance metrics. Useful for monitoring, dashboards, and evaluating the effectiveness of the domain configuration.
Parameters: None.
Returns: Dict[str, Any] with the following keys:
| Key | Type | Description |
|---|---|---|
domain |
str |
Domain identifier (domain_def.name). |
total_queries |
int |
Total number of queries processed (including rejected ones). |
rejected |
int |
Queries rejected due to strict_mode + low relevance. |
acceptance_rate |
str |
Formatted percentage of accepted queries (e.g., "85.7%"). |
avg_relevance |
float |
Mean domain relevance score across all queries, rounded to 3 decimal places. |
avg_docs_per_query |
float |
Mean number of retrieved documents per accepted query, rounded to 1 decimal place. |
errors |
int |
Count of retrieval or generation errors. |
total_terms |
int |
Current size of the full vocabulary set (len(domain_def.all_terms)). |
Example:
import json
print(json.dumps(rag.get_metrics(), indent=2, ensure_ascii=False)){
"domain": "medical",
"total_queries": 42,
"rejected": 3,
"acceptance_rate": "92.9%",
"avg_relevance": 0.612,
"avg_docs_per_query": 7.2,
"errors": 0,
"total_terms": 14
}save_domain()
rag.save_domain(path: str) -> NonePurpose: Persist the current domain definition (including any runtime additions from add_terms() and add_synonym()) to a JSON file. Useful for saving an evolved domain for reuse in future sessions.
| Parameter | Type | Required | Description |
|---|---|---|---|
path |
str |
Yes | File path to write the domain JSON (e.g., "domains/medical_extended.json"). |
Returns: None
Example:
rag.add_terms("تصوير مقطعي", "رنين مغناطيسي")
rag.save_domain("domains/medical_extended.json")
# Later — reload
domain = DomainDefinition.load("domains/medical_extended.json")
rag2 = DomainSpecificRAG(my_rag, domain=domain)4.6 Async API
agenerate()
await rag.agenerate(query: str, **kwargs) -> strPurpose: Async convenience method that runs aquery() and returns only the answer string, discarding all metadata. Ideal for integrations that only need the answer text and do not require domain diagnostics.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
**kwargs |
Any |
No | Currently unused; reserved for future extension. |
Returns: str — the generated answer string, or "" if generation produced no answer.
Example:
answer = await rag.agenerate("ما هو تأثير الكافيين على ضغط الدم؟")
print(answer)aretrieve()
await rag.aretrieve(query: str, **kwargs) -> List[Dict]Purpose: Async retrieval-only method — returns normalised documents without going through enrichment, re-ranking, or generation. Useful for inspecting raw retrieval results or building custom pipelines.
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Query string passed directly to the backend retriever (top_k=15). |
**kwargs |
Any |
No | Currently unused; reserved for future extension. |
Returns: List[Dict] — normalised document list. Each dict has keys text, score, and metadata. Returns [] on retrieval failure.
Note: This bypasses query enrichment and re-ranking. For the full domain pipeline use aquery().
Example:
docs = await rag.aretrieve("فيتامين د والكالسيوم")
for doc in docs:
print(f"[{doc['score']:.3f}] {doc['text'][:100]}")astream()
async for token in rag.astream(
query: str,
context: Optional[Dict] = None,
):
print(token, end="", flush=True)Purpose: Token-by-token async streaming generation with the full domain pipeline applied before streaming. Enables real-time display in chat interfaces.
Pipeline:
- Measure domain relevance.
- If
strict_modeand below threshold → streamrejection_messageword by word and return. - Enrich query, retrieve, and re-rank documents.
- Update metrics.
- If no documents found → stream a "not found" message.
- Stream the answer via one of three strategies (in priority order):
- Native async stream — if
rag_systemexposesastream(query, context). - Sync stream wrapped — if
rag_systemexposesstream(query, context). - Fallback word-by-word — generate the full answer, then yield one word at a time with
asyncio.sleep(0)between each word to keep the event loop responsive.
- Native async stream — if
| Parameter | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Natural-language question. |
context |
Optional[Dict] |
No | Extra context merged into the streaming generation call. |
Yields: str — individual tokens (native/sync stream) or words followed by a space (fallback mode).
Sentinel yields on rejection or no docs:
rejection_messagewords streamed one by one — whenstrict_modeand relevance below threshold."not found docs: <display_name>."words — when retrieval returns no documents.
Example:
import asyncio
async def stream():
print("Answer: ", end="")
async for token in rag.astream("كيف يؤثر السكر على الجهاز المناعي؟"):
print(token, end="", flush=True)
print()
asyncio.run(stream())4.7 Context Manager
DomainSpecificRAG supports the async context manager protocol:
async with DomainSpecificRAG(my_rag, domain="medical") as rag:
result = await rag.aquery("ما هي أعراض السكري؟")
print(result["answer"])
# __aexit__ is a no-op — used as a scope delimiterThe synchronous
withstatement is not supported. Useasync withor instantiate directly.
4.8 Representation
__repr__()
repr(rag)Purpose: Return a concise representation of the current system state for logging and REPL inspection.
Returns: str in the format:
DomainSpecificRAG(domain='medical', terms=14, strict=True, queries=42)5. DomainMetrics
from fennec_community.rag.types.domain_rag import DomainMetricsDomainMetrics is a lightweight statistics accumulator used internally by DomainSpecificRAG. It is accessible via rag.metrics but its public interface is primarily consumed through rag.get_metrics().
5.1 Attributes
| Attribute | Type | Description |
|---|---|---|
total_queries |
int |
Total number of queries recorded (accepted + rejected). |
rejected_queries |
int |
Number of queries rejected in strict mode. |
total_docs_used |
int |
Cumulative count of documents returned across all accepted queries. |
errors |
int |
Count of retrieval or generation exceptions (incremented directly by DomainSpecificRAG). |
avg_relevance |
float |
Running average of all recorded relevance scores. |
5.2 Methods
record()
metrics.record(
relevance: float,
docs: int,
rejected: bool = False,
) -> NonePurpose: Record the outcome of a single query — increments counters, appends the relevance score, and recomputes avg_relevance. Called automatically by DomainSpecificRAG at the end of every aquery() execution.
| Parameter | Type | Default | Description |
|---|---|---|---|
relevance |
float |
— | Domain relevance score for this query (0.0–1.0). |
docs |
int |
— | Number of documents returned for this query. |
rejected |
bool |
False |
Pass True if the query was rejected in strict mode. |
Returns: None
summary()
metrics.summary() -> Dict[str, Any]Purpose: Return all accumulated metrics as a plain dictionary. This is the data source for DomainSpecificRAG.get_metrics().
Parameters: None.
Returns: Dict[str, Any] with keys: total_queries, rejected, acceptance_rate, avg_relevance, avg_docs_per_query, errors.
Example:
print(rag.metrics.summary())
# {'total_queries': 10, 'rejected': 2, 'acceptance_rate': '80.0%',
# 'avg_relevance': 0.523, 'avg_docs_per_query': 6.0, 'errors': 0}6. Response Schema
Every call to query() and aquery() returns a Dict[str, Any] with the following guaranteed fields:
| Field | Type | Description |
|---|---|---|
answer |
str |
The LLM-generated answer (or the rejection/no-docs message). |
sources |
List[Dict] |
Up to 5 source excerpts. Each has text (≤200 chars), score (combined domain score), and metadata. Present even on rejection (empty list). |
domain |
str |
Domain machine identifier (e.g., "medical"). |
domain_display |
str |
Domain human-readable name (e.g., "الطب والصحة"). |
domain_relevance |
float |
Relevance score of the query to the domain, rounded to 3 decimal places. |
matched_terms |
List[str] |
Domain vocabulary terms that matched in the query. Empty list on rejection. |
enhanced_query |
str |
The query after synonym enrichment (what was actually sent to the retriever). Empty string on rejection. |
docs_count |
int |
Number of domain-ranked documents used for generation. 0 on rejection or empty retrieval. |
latency_ms |
float |
Total pipeline wall-clock time in milliseconds. 0.0 on immediate rejection. |
rejected |
bool |
Present and True only on strict-mode rejections; absent otherwise. |
error |
str |
Present only on generation exceptions; contains the exception message. |
Rejection response example:
{
"answer": "سؤالك خارج نطاق الطب والصحة.",
"domain": "medical",
"domain_display": "الطب والصحة",
"domain_relevance": 0.042,
"matched_terms": [],
"enhanced_query": "",
"docs_count": 0,
"latency_ms": 0.0,
"rejected": True,
"sources": [],
}Successful response example:
{
"answer": "الجرعة اليومية الموصى بها من فيتامين د للبالغين هي 600–800 وحدة دولية...",
"domain": "medical",
"domain_display": "الطب والصحة",
"domain_relevance": 0.743,
"matched_terms": ["فيتامين", "جرعة"],
"enhanced_query": "ما هي الجرعة اليومية الموصى بها من فيتامين د؟ عقار",
"docs_count": 8,
"latency_ms": 312.5,
"sources": [
{"text": "فيتامين د ضروري لصحة العظام...", "score": 1.234, "metadata": {}},
...
],
}7. Domain Instantiation Patterns
The following table summarises all four supported ways to instantiate a DomainSpecificRAG:
| Pattern | Code | Best for |
|---|---|---|
| Preset by key | DomainSpecificRAG(rag, domain="medical") |
Quick start with built-in domains |
| Preset + extra terms | DomainSpecificRAG(rag, domain="medical", domain_terms=["..."]) |
Extending a preset without subclassing |
Custom DomainDefinition |
DomainSpecificRAG(rag, domain=my_domain_def) |
Full control over all parameters |
| Load from JSON file | DomainSpecificRAG(rag, domain=DomainDefinition.load("path.json")) |
Sharing domain configs across projects |
8. Query Processing Pipeline
rag.query(query, context) / await rag.aquery(query, context)
│
├─ _measure_relevance(query)
│ ├─ Layer 1: direct word-set intersection with all_terms
│ ├─ Layer 2: partial string containment check
│ └─ score = (direct_hits * 1.0 + partial_hits * 0.5) / query_words
│ * boost_weight → capped at 1.0
│
├─ strict_mode and score < min_relevance
│ └─ metrics.record(rejected=True) → return _rejection_response()
│
├─ _enhance_query(query, matched_terms)
│ ├─ no matched terms → prepend "[display_name]" to query
│ └─ matched terms → append up to 6 synonyms (2 per term, top-3 terms)
│ → trimmed to 600 chars
│
├─ _retrieve(enhanced_query) [async, top_k=15]
│ ├─ rag_system.retrieve() called in thread pool
│ ├─ normalises tuples, dicts, and object formats uniformly
│ └─ returns List[Dict{text, score, metadata}]
│
├─ _rank_by_domain(docs)
│ ├─ for each doc: domain_hits = doc_words ∩ all_terms
│ ├─ domain_bonus = min(hits / len(all_terms), 0.4) * boost_weight
│ ├─ combined_score = rag_score + domain_bonus
│ ├─ sort descending by combined_score
│ ├─ strict_mode: filter docs with domain_hits == 0
│ └─ return top-10 docs
│
├─ _generate(query, docs, context) [async]
│ ├─ build system_prompt: domain.system_prompt + language-lock instruction
│ ├─ inject language instruction directly into query string
│ ├─ call rag_system.generate(lang_query, ctx) in thread pool
│ └─ auto-build sources list if not in response
│
├─ metrics.record(relevance, docs_count)
│
└─ return enriched Dict (answer + 8 diagnostic fields)Relevance scoring detail:
query = "ما هي جرعة الدواء؟"
all_terms = {"دواء", "علاج", "مرض", "أعراض", "تشخيص", "جرعة", "مريض", "طبيب", "عقار", "دواية"}
query_words = {"ما", "هي", "جرعة", "الدواء؟"}
direct_matches = {"جرعة", "دواء"} (normalised)
partial_matches = {"دواية"} (contains "دوا")
score_raw = (2 * 1.0 + 1 * 0.5) / 4 = 0.625
score = min(0.625 * 1.5, 1.0) = 0.9375 ✅ acceptedLanguage lock mechanism:
domain.language = "ar"
→ lang_instruction = "يجب أن تكون إجابتك باللغة العربية حصراً."
system_prompt = domain.system_prompt + "\n" + lang_instruction
enhanced_query = lang_instruction + "\n" + original_query
# Both channels used to maximise LLM compliance9. Quick-Start Example
import asyncio
import json
from fennec_community.rag.types.domain_rag import DomainSpecificRAG, DomainDefinition, PRESET_DOMAINS
from fennec_community.rag.core import RAGSystem
my_rag = RAGSystem(
vector_db=my_vector_db,
llm=my_llm,
chunker=my_chunker,
context_manager=my_ctx_mgr,
config=config,
enable_query_expansion=True,
query_expansion_variants=3,
)
# ── 1. Use a preset domain ────────────────────────────────────────────────
rag_medical = DomainSpecificRAG(
rag_system=my_rag,
domain="medical",
strict_mode=True,
)
# ── 2. Index domain documents ─────────────────────────────────────────────
rag_medical.add_document(
"فيتامين د ضروري لامتصاص الكالسيوم. الجرعة الموصى بها: 600–800 وحدة دولية يومياً.",
metadata={"source": "who_guidelines", "page": 5},
)
rag_medical.add_document(
"ارتفاع ضغط الدم (فوق 140/90) يستدعي تعديل نمط الحياة أو العلاج الدوائي.",
metadata={"source": "cardiology_manual", "chapter": 3},
)
# ── 3. Synchronous query ──────────────────────────────────────────────────
result = rag_medical.query("ما هي الجرعة اليومية الموصى بها من فيتامين د؟")
print(result["answer"])
print(f"Relevance : {result['domain_relevance']:.2%}")
print(f"Matched : {result['matched_terms']}")
print(f"Docs used : {result['docs_count']}")
print(f"Latency : {result['latency_ms']} ms")
# ── 4. Off-topic query (strict_mode=True rejects it) ─────────────────────
result_rej = rag_medical.query("من هو مخترع الهاتف؟")
print(result_rej["answer"]) # → rejection_message
print(result_rej.get("rejected")) # → True
# ── 5. Custom domain ──────────────────────────────────────────────────────
nutrition = DomainDefinition(
name="nutrition",
display_name="التغذية والصحة",
language="ar",
terms=["بروتين", "كربوهيدرات", "فيتامين", "سعرات", "حمية"],
synonyms={"سعرات": ["كالوري", "طاقة"], "حمية": ["رجيم", "دايت"]},
system_prompt="أنت خبير تغذية. أجب بدقة علمية.",
rejection_message="هذا السؤال خارج نطاق التغذية والصحة.",
min_relevance=0.15,
)
rag_nutrition = DomainSpecificRAG(
my_rag,
domain=nutrition,
strict_mode=False,
auto_expand_terms=True, # LLM adds 10 more terms at startup
)
# ── 6. Add runtime vocabulary ─────────────────────────────────────────────
rag_nutrition.add_terms("أوميغا 3", "حمض الفوليك")
rag_nutrition.add_synonym("فيتامين", "فيتامينات", "vitamins")
# ── 7. Async query ────────────────────────────────────────────────────────
async def async_demo():
result = await rag_nutrition.aquery("كيف يؤثر البروتين على بناء العضلات؟")
print(result["answer"])
asyncio.run(async_demo())
# ── 8. Async generate (answer only) ──────────────────────────────────────
async def gen_demo():
answer = await rag_nutrition.agenerate("ما هي أفضل مصادر الكربوهيدرات؟")
print(answer)
asyncio.run(gen_demo())
# ── 9. Streaming ──────────────────────────────────────────────────────────
async def stream_demo():
print("Streaming: ", end="")
async for token in rag_nutrition.astream("ما الفرق بين الكربوهيدرات البسيطة والمعقدة؟"):
print(token, end="", flush=True)
print()
asyncio.run(stream_demo())
# ── 10. Metrics ───────────────────────────────────────────────────────────
print(json.dumps(rag_medical.get_metrics(), indent=2, ensure_ascii=False))
print(json.dumps(rag_nutrition.get_metrics(), indent=2, ensure_ascii=False))
# ── 11. Save and reload domain ────────────────────────────────────────────
rag_nutrition.save_domain("domains/nutrition_v2.json")
loaded = DomainDefinition.load("domains/nutrition_v2.json")
rag_v2 = DomainSpecificRAG(my_rag, domain=loaded)
# ── 12. Inspect a preset ──────────────────────────────────────────────────
print(PRESET_DOMAINS["legal"].terms)
print(PRESET_DOMAINS["financial"].system_prompt)
# ── 13. Async context manager ─────────────────────────────────────────────
async def ctx_demo():
async with DomainSpecificRAG(my_rag, domain="academic") as rag:
result = await rag.aquery("ما هي الفرضية الأساسية في هذا البحث؟")
print(result["answer"])
asyncio.run(ctx_demo())
# ── 14. Raw retrieval (no generation) ────────────────────────────────────
async def retrieve_only():
docs = await rag_medical.aretrieve("أعراض السكري")
for doc in docs[:3]:
print(f"[{doc['score']:.3f}] {doc['text'][:80]}")
asyncio.run(retrieve_only())Simple Real Example
from fennec_community.llm import MistralInterface
from fennec_community.document_loaders import TextLoader
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem , LoadedDocument
from fennec_community.rag.types.domain_rag import DomainSpecificRAG , DomainDefinition
import nest_asyncio
nest_asyncio.apply()
base_rag = RAGSystem(
vector_db=vector_db, llm=llm,
chunker=chunker, context_manager=context_manager,
)
l1 = LoadedDocument(
page_content=" القلب هو العضو المركزي في الجهاز الدوري. يضخ الدم عبر الشرايين والأوردة. أمراض القلب تشمل: ارتفاع الضغط، الذبحة الصدرية، قصور القلب الاحتقاني.",
metadata={"source": "heart.txt"})
l2 = LoadedDocument(
page_content=" السكري مرض مزمن يؤثر على كيفية معالجة الجسم للسكر. النوع الأول يعتمد على الأنسولين، والنوع الثاني يرتبط بمقاومة الأنسولين.",
metadata={"source": "diabetes.txt"})
base_rag.add_documents(docs=[l1, l2])
# --- 2. طريقة 1: مجال جاهز ---
medical_rag = DomainSpecificRAG(
rag_system=base_rag,
domain="medical",
strict_mode=True,
)
# --- 3. طريقة 2: مجال مخصص كامل ---
medical_domain = DomainDefinition(
name="medical",
display_name="الطب والصحة",
terms=["قلب", "ضغط", "سكري", "جراحة", "أنسولين", "شرايين",
"أوردة", "دم", "علاج", "دواء", "مستشفى", "طبيب"],
system_prompt="أنت طبيب متخصص تُجيب بدقة علمية عالية.",
min_relevance=0.15,
boost_weight=1.5,
rejection_message="عذرًا، تخصصي في الطب فقط.",
synonyms={
"قلب": ["فؤاد", "عضلة القلب"],
"ضغط": ["ضغط الدم", "hypertension"],
},
metadata={"version": "2.0", "reviewed_by": "dr-ahmed"},
)
custom_rag = DomainSpecificRAG(
rag_system=base_rag,
domain=medical_domain,
strict_mode=True,
)
custom_rag.add_terms("نبضات القلب", "الأوعية الدموية")
custom_rag.add_synonym("علاج", "معالجة", "مداواة")
# --- 4. الاستعلام المتزامن ---
q1 = "ما هو علاج ارتفاع ضغط الدم؟"
result1 = custom_rag.query(q1)
print(f"❓ {q1}")
print(f"🎯 الصلة: {result1['domain_relevance']:.2%}")
print(f"🔍 المصطلحات: {result1['matched_terms']}")
print(f"📝 الإجابة: {result1['answer'][:200]}")
# سؤال خارج المجال — سيُرفض في strict_mode
q2 = "ما أفضل برنامج لتعلم البرمجة؟"
result2 = custom_rag.query(q2)
print(f"\n❓ {q2}")
print(f"❌ مرفوض: {result2.get('rejected', False)}")
print(f"💬 {result2['answer']}")
# --- 5. الإحصائيات ---
metrics = custom_rag.get_metrics()
print("\n📊 إحصائيات المجال:")
for k, v in metrics.items():
print(f" {k}: {v}")
# --- 6. حفظ المجال وتحميله ---
custom_rag.save_domain("medical_domain.json")
domain_from_file = DomainDefinition.load("medical_domain.json")
rag_from_file = DomainSpecificRAG(rag_system=base_rag, domain=domain_from_file)
community/rag/domain_rag.md