Fennec Logo Fennec
Fennec Community community/rag/domain_rag.md

Domain RAG — `domain_rag` Module — Public API Reference


Table of Contents

  1. Module Overview
  2. DomainDefinition
  3. PRESET_DOMAINS
  4. DomainSpecificRAG
  5. DomainMetrics
  6. Response Schema
  7. Domain Instantiation Patterns
  8. Query Processing Pipeline
  9. Quick-Start Example

1. Module Overview

The domain_rag package wraps any standard RAG backend and adds domain-specific intelligence: vocabulary-aware query enrichment, relevance gating, domain-boosted result ranking, and language-locked generation. Instead of returning whatever the vector DB finds, the system measures how relevant a query is to the configured domain, optionally rejects off-topic questions, expands queries with domain synonyms, re-ranks retrieved documents by domain term density, and injects domain-specific system instructions and language constraints into every LLM call.

Key capabilities:

  • Domain relevance scoring — two-layer (direct + partial) term-matching with configurable boost weight.
  • Strict mode — hard rejection of queries below the min_relevance threshold, returning the domain's configured rejection message.
  • Query enrichment — synonyms from the domain vocabulary are appended to the query before retrieval.
  • Domain-boosted re-ranking — combines vector-DB similarity scores with per-document domain term density.
  • Language locking — language instructions are injected into both the system prompt and the query string to guarantee LLM compliance.
  • LLM-powered auto term expansion — optional startup expansion of the domain vocabulary via the LLM.
  • Five preset domainsmedical, legal, technical, financial, academic — ready to use out of the box.
  • Full async supportaquery(), agenerate(), aretrieve(), and token-level astream().
  • JSON persistence — save and load domain definitions to/from disk.

Publicly exported symbols:

from fennec_community.rag.types.domain_rag import DomainDefinition, PRESET_DOMAINS, DomainSpecificRAG, DomainMetrics

2. DomainDefinition

from fennec_community.rag.types.domain_rag import DomainDefinition

DomainDefinition is the configuration object that fully describes a single domain — its vocabulary, synonyms, system prompt, rejection message, relevance thresholds, and any arbitrary metadata. It is the single source of truth consumed by DomainSpecificRAG.


2.1 Constructor

DomainDefinition(
    name: str,
    terms: List[str],
    display_name: str = "",
    language: str = "ar",
    synonyms: Dict[str, List[str]] = {},
    system_prompt: str = "",
    rejection_message: str = "",
    min_relevance: float = 0.10,
    boost_weight: float = 1.5,
    metadata: Dict[str, Any] = {},
)

Purpose: Define a domain with its full vocabulary, LLM instructions, and scoring parameters. Only name and terms are required; everything else has a sensible default.

Parameter Type Required Default Description
name str Yes Machine-readable domain identifier (e.g., "nutrition", "medical"). Used as a key in result dicts and file names.
terms List[str] Yes Primary vocabulary list. These are the anchor terms the relevance scorer uses to measure how on-topic a query is.
display_name str No name Human-readable label shown in responses and logs (e.g., "الطب والصحة"). Defaults to name if left empty.
language str No "ar" ISO 639-1 code of the domain vocabulary language. Controls which language-lock instruction is injected into the LLM ("ar", "en", "fr", "de", "es", or any custom string).
synonyms Dict[str, List[str]] No {} Synonym map: {"term": ["syn1", "syn2"]}. Synonyms are appended to queries during enrichment and included in the full term set for relevance scoring.
system_prompt str No "" Domain-specific instructions prepended to every LLM call. A language-lock instruction is always appended automatically.
rejection_message str No auto Message returned when strict_mode=True and the query is below min_relevance. Auto-generated as "your question is out of zone for this domain '<display_name>'." if left empty.
min_relevance float No 0.10 Minimum relevance score [0.0, 1.0] required to accept a query in strict mode.
boost_weight float No 1.5 Multiplier applied to the raw relevance score and to the per-document domain bonus during ranking. Higher values make domain matching more influential.
metadata Dict[str, Any] No {} Free-form key-value metadata for user-defined purposes (version, author, creation date, etc.). Not used internally.

Example:

from fennec_community.rag.types.domain_rag import DomainDefinition

nutrition = DomainDefinition(
    name="nutrition",
    display_name="التغذية والصحة",
    language="ar",
    terms=["بروتين", "كربوهيدرات", "فيتامين", "سعرات", "حمية"],
    synonyms={"سعرات": ["كالوري", "طاقة"], "حمية": ["رجيم", "دايت"]},
    system_prompt="أنت خبير تغذية. أجب بدقة علمية وأضف تحذيرات عند الضرورة.",
    rejection_message="هذا السؤال خارج نطاق التغذية والصحة.",
    min_relevance=0.15,
    boost_weight=1.8,
)

2.2 Properties

all_terms

domain.all_terms -> Set[str]

Purpose: Return the complete, deduplicated, lower-cased vocabulary set — primary terms plus all values from synonyms. The result is computed lazily on first access and cached; calling add_terms() or add_synonym() invalidates the cache automatically.

Returns: Set[str] — all domain vocabulary in lower case.

Example:

print(nutrition.all_terms)
# {'بروتين', 'كربوهيدرات', 'فيتامين', 'سعرات', 'حمية', 'كالوري', 'طاقة', 'رجيم', 'دايت'}

2.3 Term & Synonym Management

add_terms()

domain.add_terms(*terms: str) -> None

Purpose: Append one or more new terms to the domain vocabulary at runtime. Duplicate terms (already present in self.terms) and blank strings are silently ignored. The all_terms cache is invalidated so the next access reflects the new terms.

Parameter Type Description
*terms str One or more term strings to add. Each is stripped of leading/trailing whitespace.

Returns: None

Example:

domain.add_terms("أوميغا 3", "ألياف", "معادن")
print(len(domain.all_terms))  # updated count

add_synonym()

domain.add_synonym(term: str, *synonyms: str) -> None

Purpose: Register one or more synonyms for an existing or new term. Duplicate synonyms for the same term are silently ignored. The all_terms cache is invalidated.

Parameter Type Description
term str The canonical term to attach synonyms to.
*synonyms str One or more synonym strings.

Returns: None

Example:

domain.add_synonym("بروتين", "بروتينات", "ببتيد")
print(domain.synonyms["بروتين"])  # ['بروتينات', 'ببتيد']

2.4 Serialisation & Persistence

to_dict()

domain.to_dict() -> Dict

Purpose: Serialise the entire DomainDefinition to a plain Python dictionary suitable for JSON encoding. The internal cache field _all_terms is excluded.

Parameters: None.

Returns: Dict — all public fields as a plain dictionary.

Example:

import json
print(json.dumps(domain.to_dict(), ensure_ascii=False, indent=2))

from_dict() (class method)

DomainDefinition.from_dict(data: Dict) -> DomainDefinition

Purpose: Reconstruct a DomainDefinition instance from a plain dictionary (e.g., parsed from JSON). The _all_terms cache key is stripped before construction.

Parameter Type Required Description
data Dict Yes Dictionary previously produced by to_dict() or a manually constructed equivalent.

Returns: DomainDefinition instance.

Example:

data = json.loads(raw_json)
domain = DomainDefinition.from_dict(data)

save()

domain.save(path: str) -> None

Purpose: Persist the domain definition to a UTF-8 encoded JSON file. Useful for sharing domain configurations between projects or storing user-defined domains alongside the application.

Parameter Type Required Description
path str Yes File path to write (e.g., "domains/nutrition.json"). The parent directory must already exist.

Returns: None

Example:

domain.save("domains/nutrition.json")

load() (class method)

DomainDefinition.load(path: str) -> DomainDefinition

Purpose: Deserialise a DomainDefinition from a JSON file previously written by save().

Parameter Type Required Description
path str Yes Path to the JSON file to read.

Returns: DomainDefinition instance.

Raises: FileNotFoundError if the path does not exist. json.JSONDecodeError if the file is malformed.

Example:

domain = DomainDefinition.load("domains/nutrition.json")
rag = DomainSpecificRAG(my_rag, domain=domain)

3. PRESET_DOMAINS

from fennec_community.rag.types.domain_rag import PRESET_DOMAINS

PRESET_DOMAINS is a Dict[str, DomainDefinition] containing five ready-made domain configurations. Each preset includes curated Arabic terminology, a strict system prompt, and a rejection message. Presets are copied (not mutated) when passed to DomainSpecificRAG, so the originals are always clean.

Key display_name Focus
"medical" الطب والصحة Medications, diagnoses, symptoms, dosages. Always advises consulting a doctor.
"legal" القانون والتشريع Laws, articles, contracts, court rulings. Always advises consulting a lawyer.
"technical" التقنية والبرمجة Code, APIs, algorithms, databases. Quotes code verbatim from documents.
"financial" المال والاستثمار Stocks, trading, returns, assets. Always notes investment risk.
"academic" الأبحاث والأكاديميا Research, studies, hypotheses, methodology. Distinguishes direct results from inferences.

Usage:

from fennec_community.rag.types.domain_rag import PRESET_DOMAINS, DomainSpecificRAG

# Use directly by key string
rag = DomainSpecificRAG(my_rag, domain="medical")

# Inspect a preset
print(PRESET_DOMAINS["legal"].terms)

# Extend a preset before use
medical = DomainDefinition.from_dict(PRESET_DOMAINS["medical"].to_dict())
medical.add_terms("تصوير مقطعي", "رنين مغناطيسي")
rag = DomainSpecificRAG(my_rag, domain=medical)

4. DomainSpecificRAG

from fennec_community.rag.types.domain_rag import DomainSpecificRAG

DomainSpecificRAG is the main class of the package. It wraps any RAG backend with domain-aware retrieval, relevance gating, vocabulary-based re-ranking, and language-locked generation.


4.1 Constructor

DomainSpecificRAG(
    rag_system: Any,
    domain: Union[str, DomainDefinition],
    domain_terms: Optional[List[str]] = None,
    llm: Optional[Any] = None,
    strict_mode: bool = False,
    auto_expand_terms: bool = False,
)

Purpose: Initialise a domain-specialised RAG system bound to a specific knowledge domain.

Parameter Type Required Default Description
rag_system Any Yes Underlying RAG backend. Must expose .retrieve(query, top_k=...) and .generate(query, context).
domain str | DomainDefinition Yes Domain specification. Accepts: (1) a preset key string ("medical", "legal", etc.); (2) an arbitrary string — creates a minimal DomainDefinition with a warning; (3) a fully configured DomainDefinition object.
domain_terms Optional[List[str]] No None Additional terms merged into the domain vocabulary after resolution. Useful for extending presets without subclassing.
llm Optional[Any] No None Language model with .generate(prompt) -> str. Falls back to rag_system.llm if None. Required for auto_expand_terms=True.
strict_mode bool No False When True, queries scoring below domain.min_relevance are rejected immediately and domain.rejection_message is returned without any retrieval or generation. In non-strict mode, all queries proceed but are still enriched.
auto_expand_terms bool No False When True (and llm is available), the LLM is called at startup to generate up to 10 additional domain terms and add them to the vocabulary. Runs synchronously via asyncio.run().

Returns: DomainSpecificRAG instance.

Raises: TypeError — if domain is neither str nor DomainDefinition.

Domain resolution logic:

domain value Resolved as
"medical" (or any preset key) Deep copy of PRESET_DOMAINS["medical"]
"mycustomdomain" (unknown string) New minimal DomainDefinition(name="mycustomdomain", terms=[]) with a warning
DomainDefinition(...) Used as-is (original object, not copied)

Example:

from fennec_community.rag.types.domain_rag import DomainSpecificRAG, DomainDefinition

# Pattern 1 — preset by key
rag = DomainSpecificRAG(my_rag, domain="medical", strict_mode=True)

# Pattern 2 — preset + extra terms
rag = DomainSpecificRAG(
    my_rag,
    domain="medical",
    domain_terms=["تصوير مقطعي", "رنين مغناطيسي"],
)

# Pattern 3 — custom DomainDefinition
nutrition = DomainDefinition(
    name="nutrition",
    terms=["بروتين", "فيتامين", "سعرات"],
    language="ar",
)
rag = DomainSpecificRAG(my_rag, domain=nutrition, auto_expand_terms=True)

4.2 Core Query Methods

query()

rag.query(
    query: str,
    context: Optional[Dict] = None,
) -> Dict[str, Any]

Purpose: Synchronous domain-specific query. Executes the full domain pipeline: relevance scoring → optional rejection → query enrichment → retrieval → domain re-ranking → generation. This method is a synchronous wrapper that calls aquery() via asyncio.run().

Parameter Type Required Description
query str Yes Natural-language question from the user.
context Optional[Dict] No Arbitrary extra key-value pairs merged into the generation context passed to rag_system.generate().

Returns: Dict[str, Any] — see Response Schema for the full field reference.

Example:

result = rag.query("ما هي الجرعة اليومية الموصى بها من فيتامين د؟")

print(result["answer"])
print(f"Domain relevance : {result['domain_relevance']:.2%}")
print(f"Matched terms    : {result['matched_terms']}")
print(f"Docs used        : {result['docs_count']}")
print(f"Latency          : {result['latency_ms']} ms")

aquery()

await rag.aquery(
    query: str,
    context: Optional[Dict] = None,
) -> Dict[str, Any]

Purpose: Async domain-specific query. Identical pipeline to query() but fully async — retrieval and generation are awaited without blocking the event loop. This is the preferred method in async applications.

Pipeline steps (in order):

  1. Measure domain relevance (_measure_relevance).
  2. If strict_mode and score < min_relevance → return rejection response immediately.
  3. Enrich query with domain synonyms (_enhance_query).
  4. Retrieve top-15 documents asynchronously (_retrieve).
  5. Re-rank by combined RAG + domain score (_rank_by_domain).
  6. Generate answer with domain system prompt and language lock (_generate).
  7. Record metrics.
  8. Return enriched response dict.
Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Extra context merged into the generation call.

Returns: Dict[str, Any] — see Response Schema.

Example:

import asyncio

async def main():
    result = await rag.aquery("ما هي أسباب ارتفاع ضغط الدم؟")
    print(result["answer"])

asyncio.run(main())

4.3 Document Ingestion

add_document()

rag.add_document(
    text: str,
    metadata: Optional[Dict] = None,
) -> Any

Purpose: Index a single document into the underlying RAG backend. Automatically injects {"domain": domain.name} into the metadata so documents can later be filtered or attributed to the correct domain.

Parameter Type Required Description
text str Yes Raw text content to index. Must be non-empty after stripping.
metadata Optional[Dict] No Additional metadata key-value pairs merged with the auto-injected domain tag.

Returns: Whatever rag_system.add_document() returns (implementation-dependent).

Raises: ValueError — if text is empty or whitespace-only.

Example:

rag.add_document(
    "فيتامين د ضروري لصحة العظام وامتصاص الكالسيوم. الجرعة اليومية الموصى بها للبالغين هي 600-800 وحدة دولية.",
    metadata={"source": "who_guidelines_2024", "page": 12},
)

4.4 Domain Vocabulary Management

add_terms()

rag.add_terms(*terms: str) -> None

Purpose: Extend the live domain vocabulary with new terms at runtime. Changes take effect immediately — the next query() or aquery() call will use the updated vocabulary for relevance scoring and query enrichment. Delegates to domain_def.add_terms().

Parameter Type Description
*terms str One or more term strings to add to the domain.

Returns: None

Example:

rag.add_terms("أوميغا 3", "حمض الفوليك", "الكولاجين")

add_synonym()

rag.add_synonym(term: str, *synonyms: str) -> None

Purpose: Register one or more synonyms for a domain term at runtime. Synonyms are used in query enrichment (appended to the query before retrieval) and are included in the all_terms set for relevance scoring. Delegates to domain_def.add_synonym().

Parameter Type Description
term str The canonical term to attach synonyms to.
*synonyms str One or more synonym strings.

Returns: None

Example:

rag.add_synonym("فيتامين د", "كوليكالسيفيرول", "D3")

4.5 Diagnostics & Persistence

get_metrics()

rag.get_metrics() -> Dict[str, Any]

Purpose: Return a comprehensive summary of all per-session domain performance metrics. Useful for monitoring, dashboards, and evaluating the effectiveness of the domain configuration.

Parameters: None.

Returns: Dict[str, Any] with the following keys:

Key Type Description
domain str Domain identifier (domain_def.name).
total_queries int Total number of queries processed (including rejected ones).
rejected int Queries rejected due to strict_mode + low relevance.
acceptance_rate str Formatted percentage of accepted queries (e.g., "85.7%").
avg_relevance float Mean domain relevance score across all queries, rounded to 3 decimal places.
avg_docs_per_query float Mean number of retrieved documents per accepted query, rounded to 1 decimal place.
errors int Count of retrieval or generation errors.
total_terms int Current size of the full vocabulary set (len(domain_def.all_terms)).

Example:

import json
print(json.dumps(rag.get_metrics(), indent=2, ensure_ascii=False))
{
  "domain": "medical",
  "total_queries": 42,
  "rejected": 3,
  "acceptance_rate": "92.9%",
  "avg_relevance": 0.612,
  "avg_docs_per_query": 7.2,
  "errors": 0,
  "total_terms": 14
}

save_domain()

rag.save_domain(path: str) -> None

Purpose: Persist the current domain definition (including any runtime additions from add_terms() and add_synonym()) to a JSON file. Useful for saving an evolved domain for reuse in future sessions.

Parameter Type Required Description
path str Yes File path to write the domain JSON (e.g., "domains/medical_extended.json").

Returns: None

Example:

rag.add_terms("تصوير مقطعي", "رنين مغناطيسي")
rag.save_domain("domains/medical_extended.json")

# Later — reload
domain = DomainDefinition.load("domains/medical_extended.json")
rag2 = DomainSpecificRAG(my_rag, domain=domain)

4.6 Async API

agenerate()

await rag.agenerate(query: str, **kwargs) -> str

Purpose: Async convenience method that runs aquery() and returns only the answer string, discarding all metadata. Ideal for integrations that only need the answer text and do not require domain diagnostics.

Parameter Type Required Description
query str Yes Natural-language question.
**kwargs Any No Currently unused; reserved for future extension.

Returns: str — the generated answer string, or "" if generation produced no answer.

Example:

answer = await rag.agenerate("ما هو تأثير الكافيين على ضغط الدم؟")
print(answer)

aretrieve()

await rag.aretrieve(query: str, **kwargs) -> List[Dict]

Purpose: Async retrieval-only method — returns normalised documents without going through enrichment, re-ranking, or generation. Useful for inspecting raw retrieval results or building custom pipelines.

Parameter Type Required Description
query str Yes Query string passed directly to the backend retriever (top_k=15).
**kwargs Any No Currently unused; reserved for future extension.

Returns: List[Dict] — normalised document list. Each dict has keys text, score, and metadata. Returns [] on retrieval failure.

Note: This bypasses query enrichment and re-ranking. For the full domain pipeline use aquery().

Example:

docs = await rag.aretrieve("فيتامين د والكالسيوم")
for doc in docs:
    print(f"[{doc['score']:.3f}] {doc['text'][:100]}")

astream()

async for token in rag.astream(
    query: str,
    context: Optional[Dict] = None,
):
    print(token, end="", flush=True)

Purpose: Token-by-token async streaming generation with the full domain pipeline applied before streaming. Enables real-time display in chat interfaces.

Pipeline:

  1. Measure domain relevance.
  2. If strict_mode and below threshold → stream rejection_message word by word and return.
  3. Enrich query, retrieve, and re-rank documents.
  4. Update metrics.
  5. If no documents found → stream a "not found" message.
  6. Stream the answer via one of three strategies (in priority order):
    • Native async stream — if rag_system exposes astream(query, context).
    • Sync stream wrapped — if rag_system exposes stream(query, context).
    • Fallback word-by-word — generate the full answer, then yield one word at a time with asyncio.sleep(0) between each word to keep the event loop responsive.
Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Extra context merged into the streaming generation call.

Yields: str — individual tokens (native/sync stream) or words followed by a space (fallback mode).

Sentinel yields on rejection or no docs:

  • rejection_message words streamed one by one — when strict_mode and relevance below threshold.
  • "not found docs: <display_name>." words — when retrieval returns no documents.

Example:

import asyncio

async def stream():
    print("Answer: ", end="")
    async for token in rag.astream("كيف يؤثر السكر على الجهاز المناعي؟"):
        print(token, end="", flush=True)
    print()

asyncio.run(stream())

4.7 Context Manager

DomainSpecificRAG supports the async context manager protocol:

async with DomainSpecificRAG(my_rag, domain="medical") as rag:
    result = await rag.aquery("ما هي أعراض السكري؟")
    print(result["answer"])
# __aexit__ is a no-op — used as a scope delimiter

The synchronous with statement is not supported. Use async with or instantiate directly.


4.8 Representation

__repr__()

repr(rag)

Purpose: Return a concise representation of the current system state for logging and REPL inspection.

Returns: str in the format:

DomainSpecificRAG(domain='medical', terms=14, strict=True, queries=42)

5. DomainMetrics

from fennec_community.rag.types.domain_rag import DomainMetrics

DomainMetrics is a lightweight statistics accumulator used internally by DomainSpecificRAG. It is accessible via rag.metrics but its public interface is primarily consumed through rag.get_metrics().


5.1 Attributes

Attribute Type Description
total_queries int Total number of queries recorded (accepted + rejected).
rejected_queries int Number of queries rejected in strict mode.
total_docs_used int Cumulative count of documents returned across all accepted queries.
errors int Count of retrieval or generation exceptions (incremented directly by DomainSpecificRAG).
avg_relevance float Running average of all recorded relevance scores.

5.2 Methods

record()

metrics.record(
    relevance: float,
    docs: int,
    rejected: bool = False,
) -> None

Purpose: Record the outcome of a single query — increments counters, appends the relevance score, and recomputes avg_relevance. Called automatically by DomainSpecificRAG at the end of every aquery() execution.

Parameter Type Default Description
relevance float Domain relevance score for this query (0.0–1.0).
docs int Number of documents returned for this query.
rejected bool False Pass True if the query was rejected in strict mode.

Returns: None


summary()

metrics.summary() -> Dict[str, Any]

Purpose: Return all accumulated metrics as a plain dictionary. This is the data source for DomainSpecificRAG.get_metrics().

Parameters: None.

Returns: Dict[str, Any] with keys: total_queries, rejected, acceptance_rate, avg_relevance, avg_docs_per_query, errors.

Example:

print(rag.metrics.summary())
# {'total_queries': 10, 'rejected': 2, 'acceptance_rate': '80.0%',
#  'avg_relevance': 0.523, 'avg_docs_per_query': 6.0, 'errors': 0}

6. Response Schema

Every call to query() and aquery() returns a Dict[str, Any] with the following guaranteed fields:

Field Type Description
answer str The LLM-generated answer (or the rejection/no-docs message).
sources List[Dict] Up to 5 source excerpts. Each has text (≤200 chars), score (combined domain score), and metadata. Present even on rejection (empty list).
domain str Domain machine identifier (e.g., "medical").
domain_display str Domain human-readable name (e.g., "الطب والصحة").
domain_relevance float Relevance score of the query to the domain, rounded to 3 decimal places.
matched_terms List[str] Domain vocabulary terms that matched in the query. Empty list on rejection.
enhanced_query str The query after synonym enrichment (what was actually sent to the retriever). Empty string on rejection.
docs_count int Number of domain-ranked documents used for generation. 0 on rejection or empty retrieval.
latency_ms float Total pipeline wall-clock time in milliseconds. 0.0 on immediate rejection.
rejected bool Present and True only on strict-mode rejections; absent otherwise.
error str Present only on generation exceptions; contains the exception message.

Rejection response example:

{
    "answer": "سؤالك خارج نطاق الطب والصحة.",
    "domain": "medical",
    "domain_display": "الطب والصحة",
    "domain_relevance": 0.042,
    "matched_terms": [],
    "enhanced_query": "",
    "docs_count": 0,
    "latency_ms": 0.0,
    "rejected": True,
    "sources": [],
}

Successful response example:

{
    "answer": "الجرعة اليومية الموصى بها من فيتامين د للبالغين هي 600–800 وحدة دولية...",
    "domain": "medical",
    "domain_display": "الطب والصحة",
    "domain_relevance": 0.743,
    "matched_terms": ["فيتامين", "جرعة"],
    "enhanced_query": "ما هي الجرعة اليومية الموصى بها من فيتامين د؟ عقار",
    "docs_count": 8,
    "latency_ms": 312.5,
    "sources": [
        {"text": "فيتامين د ضروري لصحة العظام...", "score": 1.234, "metadata": {}},
        ...
    ],
}

7. Domain Instantiation Patterns

The following table summarises all four supported ways to instantiate a DomainSpecificRAG:

Pattern Code Best for
Preset by key DomainSpecificRAG(rag, domain="medical") Quick start with built-in domains
Preset + extra terms DomainSpecificRAG(rag, domain="medical", domain_terms=["..."]) Extending a preset without subclassing
Custom DomainDefinition DomainSpecificRAG(rag, domain=my_domain_def) Full control over all parameters
Load from JSON file DomainSpecificRAG(rag, domain=DomainDefinition.load("path.json")) Sharing domain configs across projects

8. Query Processing Pipeline

rag.query(query, context) / await rag.aquery(query, context)
│
├─ _measure_relevance(query)
│    ├─ Layer 1: direct word-set intersection with all_terms
│    ├─ Layer 2: partial string containment check
│    └─ score = (direct_hits * 1.0 + partial_hits * 0.5) / query_words
│              * boost_weight   →  capped at 1.0
│
├─ strict_mode and score < min_relevance
│    └─ metrics.record(rejected=True) → return _rejection_response()
│
├─ _enhance_query(query, matched_terms)
│    ├─ no matched terms → prepend "[display_name]" to query
│    └─ matched terms   → append up to 6 synonyms (2 per term, top-3 terms)
│                         → trimmed to 600 chars
│
├─ _retrieve(enhanced_query)            [async, top_k=15]
│    ├─ rag_system.retrieve() called in thread pool
│    ├─ normalises tuples, dicts, and object formats uniformly
│    └─ returns List[Dict{text, score, metadata}]
│
├─ _rank_by_domain(docs)
│    ├─ for each doc: domain_hits = doc_words ∩ all_terms
│    ├─ domain_bonus = min(hits / len(all_terms), 0.4) * boost_weight
│    ├─ combined_score = rag_score + domain_bonus
│    ├─ sort descending by combined_score
│    ├─ strict_mode: filter docs with domain_hits == 0
│    └─ return top-10 docs
│
├─ _generate(query, docs, context)      [async]
│    ├─ build system_prompt: domain.system_prompt + language-lock instruction
│    ├─ inject language instruction directly into query string
│    ├─ call rag_system.generate(lang_query, ctx) in thread pool
│    └─ auto-build sources list if not in response
│
├─ metrics.record(relevance, docs_count)
│
└─ return enriched Dict (answer + 8 diagnostic fields)

Relevance scoring detail:

query = "ما هي جرعة الدواء؟"
all_terms = {"دواء", "علاج", "مرض", "أعراض", "تشخيص", "جرعة", "مريض", "طبيب", "عقار", "دواية"}

query_words = {"ما", "هي", "جرعة", "الدواء؟"}
direct_matches = {"جرعة", "دواء"}  (normalised)
partial_matches = {"دواية"}        (contains "دوا")

score_raw = (2 * 1.0 + 1 * 0.5) / 4 = 0.625
score = min(0.625 * 1.5, 1.0) = 0.9375  ✅ accepted

Language lock mechanism:

domain.language = "ar"
→ lang_instruction = "يجب أن تكون إجابتك باللغة العربية حصراً."

system_prompt  = domain.system_prompt + "\n" + lang_instruction
enhanced_query = lang_instruction + "\n" + original_query

# Both channels used to maximise LLM compliance

9. Quick-Start Example

import asyncio
import json
from fennec_community.rag.types.domain_rag import DomainSpecificRAG, DomainDefinition, PRESET_DOMAINS
from fennec_community.rag.core import RAGSystem

my_rag = RAGSystem(
    vector_db=my_vector_db,
    llm=my_llm,
    chunker=my_chunker,
    context_manager=my_ctx_mgr,
    config=config,
    enable_query_expansion=True,
    query_expansion_variants=3,
)
# ── 1. Use a preset domain ────────────────────────────────────────────────
rag_medical = DomainSpecificRAG(
    rag_system=my_rag,
    domain="medical",
    strict_mode=True,
)

# ── 2. Index domain documents ─────────────────────────────────────────────
rag_medical.add_document(
    "فيتامين د ضروري لامتصاص الكالسيوم. الجرعة الموصى بها: 600–800 وحدة دولية يومياً.",
    metadata={"source": "who_guidelines", "page": 5},
)
rag_medical.add_document(
    "ارتفاع ضغط الدم (فوق 140/90) يستدعي تعديل نمط الحياة أو العلاج الدوائي.",
    metadata={"source": "cardiology_manual", "chapter": 3},
)

# ── 3. Synchronous query ──────────────────────────────────────────────────
result = rag_medical.query("ما هي الجرعة اليومية الموصى بها من فيتامين د؟")
print(result["answer"])
print(f"Relevance  : {result['domain_relevance']:.2%}")
print(f"Matched    : {result['matched_terms']}")
print(f"Docs used  : {result['docs_count']}")
print(f"Latency    : {result['latency_ms']} ms")

# ── 4. Off-topic query (strict_mode=True rejects it) ─────────────────────
result_rej = rag_medical.query("من هو مخترع الهاتف؟")
print(result_rej["answer"])        # → rejection_message
print(result_rej.get("rejected"))  # → True

# ── 5. Custom domain ──────────────────────────────────────────────────────
nutrition = DomainDefinition(
    name="nutrition",
    display_name="التغذية والصحة",
    language="ar",
    terms=["بروتين", "كربوهيدرات", "فيتامين", "سعرات", "حمية"],
    synonyms={"سعرات": ["كالوري", "طاقة"], "حمية": ["رجيم", "دايت"]},
    system_prompt="أنت خبير تغذية. أجب بدقة علمية.",
    rejection_message="هذا السؤال خارج نطاق التغذية والصحة.",
    min_relevance=0.15,
)

rag_nutrition = DomainSpecificRAG(
    my_rag,
    domain=nutrition,
    strict_mode=False,
    auto_expand_terms=True,   # LLM adds 10 more terms at startup
)

# ── 6. Add runtime vocabulary ─────────────────────────────────────────────
rag_nutrition.add_terms("أوميغا 3", "حمض الفوليك")
rag_nutrition.add_synonym("فيتامين", "فيتامينات", "vitamins")

# ── 7. Async query ────────────────────────────────────────────────────────
async def async_demo():
    result = await rag_nutrition.aquery("كيف يؤثر البروتين على بناء العضلات؟")
    print(result["answer"])

asyncio.run(async_demo())

# ── 8. Async generate (answer only) ──────────────────────────────────────
async def gen_demo():
    answer = await rag_nutrition.agenerate("ما هي أفضل مصادر الكربوهيدرات؟")
    print(answer)

asyncio.run(gen_demo())

# ── 9. Streaming ──────────────────────────────────────────────────────────
async def stream_demo():
    print("Streaming: ", end="")
    async for token in rag_nutrition.astream("ما الفرق بين الكربوهيدرات البسيطة والمعقدة؟"):
        print(token, end="", flush=True)
    print()

asyncio.run(stream_demo())

# ── 10. Metrics ───────────────────────────────────────────────────────────
print(json.dumps(rag_medical.get_metrics(), indent=2, ensure_ascii=False))
print(json.dumps(rag_nutrition.get_metrics(), indent=2, ensure_ascii=False))

# ── 11. Save and reload domain ────────────────────────────────────────────
rag_nutrition.save_domain("domains/nutrition_v2.json")
loaded = DomainDefinition.load("domains/nutrition_v2.json")
rag_v2  = DomainSpecificRAG(my_rag, domain=loaded)

# ── 12. Inspect a preset ──────────────────────────────────────────────────
print(PRESET_DOMAINS["legal"].terms)
print(PRESET_DOMAINS["financial"].system_prompt)

# ── 13. Async context manager ─────────────────────────────────────────────
async def ctx_demo():
    async with DomainSpecificRAG(my_rag, domain="academic") as rag:
        result = await rag.aquery("ما هي الفرضية الأساسية في هذا البحث؟")
        print(result["answer"])

asyncio.run(ctx_demo())

# ── 14. Raw retrieval (no generation) ────────────────────────────────────
async def retrieve_only():
    docs = await rag_medical.aretrieve("أعراض السكري")
    for doc in docs[:3]:
        print(f"[{doc['score']:.3f}] {doc['text'][:80]}")

asyncio.run(retrieve_only())

Simple Real Example

from fennec_community.llm import MistralInterface
from fennec_community.document_loaders import TextLoader 
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem , LoadedDocument
from fennec_community.rag.types.domain_rag import DomainSpecificRAG , DomainDefinition


import nest_asyncio
nest_asyncio.apply()
base_rag = RAGSystem(
    vector_db=vector_db, llm=llm,
    chunker=chunker, context_manager=context_manager,
)
l1 = LoadedDocument(
    page_content=" القلب هو العضو المركزي في الجهاز الدوري. يضخ الدم عبر الشرايين والأوردة. أمراض القلب تشمل: ارتفاع الضغط، الذبحة الصدرية، قصور القلب الاحتقاني.",
    metadata={"source": "heart.txt"})
l2 = LoadedDocument(
    page_content=" السكري مرض مزمن يؤثر على كيفية معالجة الجسم للسكر. النوع الأول يعتمد على الأنسولين، والنوع الثاني يرتبط بمقاومة الأنسولين.",
    metadata={"source": "diabetes.txt"})

base_rag.add_documents(docs=[l1, l2])

# --- 2. طريقة 1: مجال جاهز ---
medical_rag = DomainSpecificRAG(
    rag_system=base_rag,
    domain="medical",
    strict_mode=True,
)

# --- 3. طريقة 2: مجال مخصص كامل ---
medical_domain = DomainDefinition(
    name="medical",
    display_name="الطب والصحة",
    terms=["قلب", "ضغط", "سكري", "جراحة", "أنسولين", "شرايين",
           "أوردة", "دم", "علاج", "دواء", "مستشفى", "طبيب"],
    system_prompt="أنت طبيب متخصص تُجيب بدقة علمية عالية.",
    min_relevance=0.15,
    boost_weight=1.5,
    rejection_message="عذرًا، تخصصي في الطب فقط.",
    synonyms={
        "قلب": ["فؤاد", "عضلة القلب"],
        "ضغط": ["ضغط الدم", "hypertension"],
    },
    metadata={"version": "2.0", "reviewed_by": "dr-ahmed"},
)

custom_rag = DomainSpecificRAG(
    rag_system=base_rag,
    domain=medical_domain,
    strict_mode=True,
)

custom_rag.add_terms("نبضات القلب", "الأوعية الدموية")
custom_rag.add_synonym("علاج", "معالجة", "مداواة")

# --- 4. الاستعلام المتزامن ---
q1 = "ما هو علاج ارتفاع ضغط الدم؟"
result1 = custom_rag.query(q1)
print(f"❓ {q1}")
print(f"🎯 الصلة: {result1['domain_relevance']:.2%}")
print(f"🔍 المصطلحات: {result1['matched_terms']}")
print(f"📝 الإجابة: {result1['answer'][:200]}")

# سؤال خارج المجال — سيُرفض في strict_mode
q2 = "ما أفضل برنامج لتعلم البرمجة؟"
result2 = custom_rag.query(q2)
print(f"\n❓ {q2}")
print(f"❌ مرفوض: {result2.get('rejected', False)}")
print(f"💬 {result2['answer']}")

# --- 5. الإحصائيات ---
metrics = custom_rag.get_metrics()
print("\n📊 إحصائيات المجال:")
for k, v in metrics.items():
    print(f"  {k}: {v}")

# --- 6. حفظ المجال وتحميله ---
custom_rag.save_domain("medical_domain.json")
domain_from_file = DomainDefinition.load("medical_domain.json")
rag_from_file = DomainSpecificRAG(rag_system=base_rag, domain=domain_from_file)
Source: community/rag/domain_rag.md