Fennec Logo Fennec
Fennec Community community/rag/federated_rag.md

`federated_rag` — Enterprise API Reference


Table of Contents

  1. Overview
  2. Architecture
  3. Quick Start
  4. Enumerations
  5. Data Models
  6. Class: FederatedRAG
  7. Return Value Reference
  8. Streaming Strategies by AggregationMethod
  9. Backend Compatibility
  10. Circuit Breaker Behaviour
  11. Caching Behaviour
  12. Error Codes
  13. Complete Examples

Overview

federated_rag is a production-grade federation layer that sits on top of multiple Retrieval-Augmented Generation (RAG) backends and exposes a unified query interface. Instead of calling one RAG system and accepting its single answer, FederatedRAG queries all registered backends in parallel, aggregates their responses according to a configurable strategy, and returns (or streams) the best possible answer.

Key capabilities at a glance:

Capability Detail
Parallel querying All enabled, healthy sources are queried concurrently via asyncio.gather
Aggregation strategies Weighted, Voting, Ranking, Merge, Simple
Real-time streaming Sync (stream) and async (astream) with per-chunk StreamChunk objects
Circuit breaking Automatically skips repeatedly-failing sources; self-recovers after a timeout
TTL caching Thread-safe in-memory cache; hits avoid redundant backend calls
Lifecycle hooks Pre- and post-query callables for logging, metrics, and tracing
Zero-config fallback If a backend has no stream() method, the federation emits word-level synthetic chunks

Architecture

┌─────────────────────────────────────────────────────────┐
│                      FederatedRAG                        │
│                                                          │
│  ┌─────────┐   ┌──────────────┐   ┌───────────────────┐ │
│  │TTLCache │   │LifecycleHooks│   │AggregationEngine  │ │
│  └─────────┘   └──────────────┘   └───────────────────┘ │
│                                                          │
│  ┌────────────────────────────────────────────────────┐  │
│  │               FederatedSource  (×N)                │  │
│  │  name · rag_system · weight · timeout              │  │
│  │  CircuitBreaker · stats tracking                   │  │
│  └────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘
         │ parallel asyncio.gather / asyncio tasks
         ▼
  [ RAG Backend A ]  [ RAG Backend B ]  [ RAG Backend C ]

Quick Start

from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod

# 1. Instantiate the federation
fed = FederatedRAG(
    aggregation_method=AggregationMethod.WEIGHTED,
    cache_ttl=120.0,
)

# 2. Register your RAG backends
fed.add_source("docs",   docs_rag,   weight=1.5, timeout=8.0)
fed.add_source("wiki",   wiki_rag,   weight=1.0, timeout=5.0)
fed.add_source("arxiv",  arxiv_rag,  weight=0.8, timeout=12.0)

# 3a. Buffered (synchronous)
result = fed.query("What is Retrieval-Augmented Generation?")
print(result["answer"])

# 3b. Buffered (async)
result = await fed.query_async("What is RAG?")

# 3c. Streaming (sync)
for chunk in fed.stream("Explain transformers"):
    print(chunk.text, end="", flush=True)

# 3d. Streaming (async)
async for chunk in fed.astream("Explain transformers"):
    print(chunk.text, end="", flush=True)

Enumerations

AggregationMethod

class AggregationMethod(str, Enum):
    WEIGHTED = "weighted"
    VOTING   = "voting"
    RANKING  = "ranking"
    SIMPLE   = "simple"
    MERGE    = "merge"

Controls how responses from multiple sources are combined into a single answer.

Member Value Description
WEIGHTED "weighted" Picks the source with the highest weight, breaking ties with score. Best for trusted primary sources.
VOTING "voting" Groups responses by their first 60 characters, then selects the group with the highest combined weight. Best for factual queries where consensus matters.
RANKING "ranking" Sorts all results by score × weight descending and returns the top result. Best when sources return numeric relevance scores.
SIMPLE "simple" Returns the answer from the first source that responds. Fastest but no intelligence applied.
MERGE "merge" Concatenates all unique answers, prefixed with [source_name]. Best for information-gathering queries where diversity is valued.

Streaming note: VOTING cannot be truly streamed (it requires all answers before voting). When astream / stream is called with VOTING, the federation buffers everything via query_async and emits one single StreamChunk. All other methods stream natively.


Data Models

StreamChunk

A dataclass representing a single unit of streamed output. Every call to stream() or astream() yields a sequence of these objects.

@dataclass
class StreamChunk:
    text:     str
    source:   str
    done:     bool                     = False
    metadata: Optional[Dict[str, Any]] = None
    error:    Optional[str]            = None
Field Type Description
text str The incremental text fragment for this chunk. Empty string on terminal (done/error) chunks.
source str The name of the RAG backend that produced this chunk, or "__federation__" for system-level messages.
done bool True on the final chunk for a given source. Signals that the source has finished producing output.
metadata Optional[Dict[str, Any]] Optional per-chunk payload. May include score, latency_ms, weight, or aggregation_method.
error Optional[str] Non-None when this chunk signals a failure. Inspect this field to distinguish error sentinels from normal done signals.

Usage pattern:

async for chunk in fed.astream("What is RAG?"):
    if chunk.error:
        print(f"\n[ERROR from {chunk.source}]: {chunk.error}")
    elif chunk.done:
        print(f"\n[{chunk.source} finished — {chunk.metadata}]")
    else:
        print(chunk.text, end="", flush=True)

str(chunk) returns chunk.text, so print(chunk) works naturally in simple pipelines.


Class: FederatedRAG

The central class of the module. Manages source registration, parallel querying, aggregation, caching, and streaming.

from fennec_community.rag.types.federated_rag import FederatedRAG

Constructor — __init__

FederatedRAG(
    aggregation_method: AggregationMethod | str = AggregationMethod.WEIGHTED,
    min_sources:         int   = 1,
    cache_ttl:           float = 60.0,
    cache_max_size:      int   = 256,
    stream_race_timeout: float = 2.0,
    pre_query_hook:  Optional[Callable[[str, Optional[Dict]], None]] = None,
    post_query_hook: Optional[Callable[[Dict[str, Any]], None]]      = None,
)

Purpose: Creates a new FederatedRAG instance with the given configuration. No backends are registered at construction time; use add_source to add them.

Parameters:

Parameter Type Default Description
aggregation_method AggregationMethod | str AggregationMethod.WEIGHTED Strategy used to merge responses from multiple sources. Accepts the enum member or its string value (e.g., "weighted").
min_sources int 1 Minimum number of active (enabled + circuit-closed) sources required before a query is executed. Returns an error dict if this threshold is not met.
cache_ttl float 60.0 Time-to-live in seconds for cached query results. A value of 0.0 effectively disables caching.
cache_max_size int 256 Maximum number of entries in the in-memory TTL cache. When full, the oldest entry is evicted.
stream_race_timeout float 2.0 Seconds to wait during the RANKING streaming race. Sources that produce no first chunk within this window are excluded from the winner selection.
pre_query_hook Callable[[str, Optional[Dict]], None] | None None Called before every query (buffered and streamed) with (query, context). Use for logging, request-id injection, or rate-limit checks.
post_query_hook Callable[[Dict[str, Any]], None] | None None Called after every buffered query completes, receiving the full aggregated result dict. Not called on cache hits. Not called during streaming.

Returns: FederatedRAG instance.

Example:

import logging
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod

logging.basicConfig(level=logging.INFO)

def before(query, ctx):
    print(f"[PRE] query={query!r}  context={ctx}")

def after(result):
    print(f"[POST] sources={result['sources_used']}  cached={result['cached']}")

fed = FederatedRAG(
    aggregation_method=AggregationMethod.RANKING,
    min_sources=2,
    cache_ttl=300.0,
    cache_max_size=512,
    stream_race_timeout=1.5,
    pre_query_hook=before,
    post_query_hook=after,
)

Source Management

add_source

fed.add_source(
    name:       str,
    rag_system: Any,
    weight:     float = 1.0,
    timeout:    float = 10.0,
    metadata:   Optional[Dict] = None,
) -> FederatedRAG

Purpose: Registers a new RAG backend with the federation. The backend is immediately available for querying. Returns self to support method chaining.

Parameters:

Parameter Type Default Description
name str Unique identifier for this source within the federation. Used as the source field in results and StreamChunk objects.
rag_system Any The backend RAG object. Must expose at minimum a generate(query) method (see Backend Compatibility for full requirements).
weight float 1.0 Relative priority of this source. Must be strictly positive (> 0). Higher weight means this source is preferred in WEIGHTED, RANKING, and MERGE strategies, and is tried first in streaming.
timeout float 10.0 Per-source timeout in seconds. If the backend does not respond within this window, the source is skipped for this query and its circuit breaker records a failure.
metadata Optional[Dict] None Arbitrary key-value pairs attached to the source. Passed as extra keyword arguments to backends that declare **kwargs. Never passed as query.

Returns: FederatedRAG — the same instance, enabling chaining.

Raises: ValueError if weight <= 0.

Example:

# Chained registration
fed = (
    FederatedRAG()
    .add_source("primary_docs", docs_rag,   weight=2.0, timeout=5.0)
    .add_source("wiki",         wiki_rag,   weight=1.0, timeout=8.0)
    .add_source("arxiv",        arxiv_rag,  weight=0.5, timeout=15.0,
                metadata={"domain": "academic"})
)

remove_source

fed.remove_source(name: str) -> None

Purpose: Permanently removes a registered source from the federation. After removal, the source is no longer queried, and all its accumulated statistics are discarded.

Parameters:

Parameter Type Description
name str The name of the source to remove, as originally passed to add_source.

Returns: None

Raises: KeyError if the source name does not exist.

Example:

fed.remove_source("arxiv")
# "arxiv" will no longer be queried

enable_source

fed.enable_source(name: str, enabled: bool = True) -> None

Purpose: Temporarily enables or disables a source without removing it. Disabled sources are excluded from all queries (buffered and streaming) but retain their configuration and statistics. This is useful for maintenance windows or A/B testing.

Parameters:

Parameter Type Default Description
name str The name of the source to toggle, as originally passed to add_source.
enabled bool True Pass True to re-enable the source or False to disable it.

Returns: None

Raises: KeyError if the source name does not exist.

Example:

# Disable a source during maintenance
fed.enable_source("arxiv", enabled=False)

result = fed.query("What is RAG?")  # Only docs and wiki are queried

# Re-enable after maintenance
fed.enable_source("arxiv", enabled=True)

get_stats

fed.get_stats() -> Dict[str, Dict]

Purpose: Returns per-source operational statistics accumulated since the FederatedRAG instance was created (or since the source was registered). Useful for observability dashboards, alerting, and debugging slow or unreliable backends.

Parameters: None.

Returns: Dict[str, Dict] — a dictionary keyed by source name. Each value is a stats dictionary with the following fields:

Field Type Description
total_queries int Total number of times this source has been called (including failures).
total_errors int Number of calls that resulted in a timeout or exception.
avg_latency_ms float Mean response latency across all calls, in milliseconds.
error_rate float Ratio of total_errors / total_queries, in the range [0.0, 1.0].
circuit_state str Current circuit breaker state: "closed", "open", or "half_open".

Example:

stats = fed.get_stats()
for source_name, s in stats.items():
    print(
        f"{source_name}: "
        f"{s['total_queries']} calls | "
        f"avg {s['avg_latency_ms']} ms | "
        f"error rate {s['error_rate']:.1%} | "
        f"circuit {s['circuit_state']}"
    )

Output example:

primary_docs: 120 calls | avg 243.7 ms | error rate 1.7% | circuit closed
wiki:          89 calls | avg 510.2 ms | error rate 5.6% | circuit closed
arxiv:         45 calls | avg 1420.0 ms | error rate 20.0% | circuit open

Buffered Query API

These methods wait for all sources to respond, aggregate the results, and return a single complete response dictionary. Use them when you need the full answer before rendering.


query

fed.query(
    query:   str,
    context: Optional[Dict] = None,
    top_k:   int = 5,
) -> Dict[str, Any]

Purpose: Synchronous, buffered multi-source query. Internally runs query_async in the correct asyncio context, making it safe to call from both synchronous code and from within an already-running event loop (e.g., Jupyter notebooks, FastAPI background tasks).

Parameters:

Parameter Type Default Description
query str The natural-language question or search query to send to all active sources.
context Optional[Dict] None Optional contextual metadata forwarded to backends that declare a context parameter. A "language" key is handled specially — it is forwarded to backends that declare a language parameter.
top_k int 5 Number of documents each source should retrieve before generating an answer. Forwarded to backends that declare a top_k parameter.

Returns: Dict[str, Any] — See Return Value Reference.

Example:

result = fed.query(
    "What are the main types of neural networks?",
    context={"language": "en", "user_id": "u-42"},
    top_k=10,
)

print(result["answer"])
print("Sources used:", result["sources_used"])
print("Cached?", result["cached"])

query_async / aquery

await fed.query_async(
    query:   str,
    context: Optional[Dict] = None,
    top_k:   int = 5,
) -> Dict[str, Any]

# Alias:
await fed.aquery(query, context, top_k)

Purpose: Async coroutine equivalent of query. Queries all active, circuit-closed sources concurrently via asyncio.gather, then aggregates and caches the result. This is the core query implementation; query delegates to it.

Parameters: Identical to query.

Returns: Dict[str, Any] — See Return Value Reference.

Behaviour details:

  1. Checks the TTL cache — returns immediately if a hit exists ("cached": True).
  2. Calls pre_query_hook(query, context) if configured.
  3. Calls all active, circuit-healthy sources in parallel.
  4. Aggregates successful results using the configured AggregationMethod.
  5. Writes the result to the TTL cache.
  6. Calls post_query_hook(result) if configured.
  7. Returns the aggregated dict.

Example:

import asyncio
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod

async def main():
    fed = FederatedRAG(aggregation_method=AggregationMethod.MERGE)
    fed.add_source("src_a", rag_a)
    fed.add_source("src_b", rag_b)

    result = await fed.query_async("Explain attention mechanisms", top_k=8)
    print(result["answer"])

asyncio.run(main())

aquery note: aquery is a direct alias for query_async. Both are identical in behaviour. Use whichever reads more naturally in your codebase.


Streaming API

These methods yield StreamChunk objects incrementally as the backend produces tokens. Use them when you want to display text to the user in real-time (e.g., chat UIs, terminals, SSE endpoints).


stream

fed.stream(
    query:   str,
    context: Optional[Dict] = None,
    top_k:   int = 5,
) -> Generator[StreamChunk, None, None]

Purpose: Synchronous streaming query. Yields StreamChunk objects one at a time as they arrive from the winning source. Safe to use in both synchronous and async contexts (e.g., inside Jupyter cells).

Parameters:

Parameter Type Default Description
query str The natural-language query to stream answers for.
context Optional[Dict] None Optional context dict forwarded to backends. See query for details.
top_k int 5 Number of documents to retrieve per source.

Returns: Generator[StreamChunk, None, None] — A standard Python generator. Each iteration yields one StreamChunk. The stream ends when all active sources have produced a terminal chunk (done=True).

Internal mechanism: Detects whether an event loop is already running. If yes, it collects all chunks in a background thread and yields them after collection. If no loop is running, it drives the async generator manually chunk-by-chunk using a fresh event loop.

Example:

print("Answer: ", end="")
for chunk in fed.stream("Describe the transformer architecture", top_k=5):
    if chunk.error:
        print(f"\n[ERROR: {chunk.error}]")
    elif not chunk.done:
        print(chunk.text, end="", flush=True)
print()  # newline at end

astream

async def astream(
    query:   str,
    context: Optional[Dict] = None,
    top_k:   int = 5,
) -> AsyncGenerator[StreamChunk, None]

Purpose: Async streaming query. The primary streaming implementation. Yields StreamChunk objects asynchronously as the backend produces output. The behaviour depends on the configured AggregationMethod (see Streaming Strategies).

Parameters:

Parameter Type Default Description
query str The natural-language query to stream answers for.
context Optional[Dict] None Optional context dict forwarded to backends. See query for details.
top_k int 5 Number of documents to retrieve per source.

Returns: AsyncGenerator[StreamChunk, None] — An async generator. Use with async for.

Behaviour details:

  1. Calls pre_query_hook(query, context) if configured.
  2. Checks that enough active sources exist; yields an error chunk and returns if not.
  3. Routes to the appropriate internal streaming strategy based on AggregationMethod.
  4. For each source: tries native astreamstreamgenerate_stream methods. Falls back to _call_generate + word-level synthetic chunks if the backend has no stream method.

FastAPI / SSE example:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream")
async def stream_answer(q: str):
    async def generate():
        async for chunk in fed.astream(q):
            if not chunk.done and not chunk.error:
                yield chunk.text
    return StreamingResponse(generate(), media_type="text/plain")

Metadata inspection example:

async for chunk in fed.astream("What is BERT?"):
    if chunk.done:
        meta = chunk.metadata or {}
        print(f"\n✓ {chunk.source}{meta.get('latency_ms')} ms")
    elif not chunk.error:
        print(chunk.text, end="", flush=True)

Return Value Reference

Both query and query_async return a Dict[str, Any] with the following guaranteed keys plus aggregation-method-specific extras.

Guaranteed keys (all methods)

Key Type Description
answer str The final answer text selected or combined by the aggregation strategy.
sources_used List[str] Names of all sources that responded successfully.
num_sources int Number of sources that responded successfully.
cached bool True if the result was served from the TTL cache, False if freshly computed.

Error response keys

Returned when the federation itself fails (not a single source — single source failures are silently skipped).

Key Type Description
answer str Human-readable error message.
error str Machine-readable error code. See Error Codes.
sources_used List Always an empty list on federation-level errors.
num_sources int Always 0 on federation-level errors.

Extra keys by AggregationMethod

Method Extra Keys
WEIGHTED primary_source (str), all_results (List[Dict]), aggregation_method (str)
VOTING primary_source (str), vote_counts (Dict[str, int]), all_results (List[Dict]), aggregation_method (str)
RANKING primary_source (str), ranked_results (List[Dict]), aggregation_method (str)
MERGE all_results (List[Dict]), aggregation_method (str)
SIMPLE primary_source (str), all_results (List[Dict]), aggregation_method (str)

Per-source result dict (inside all_results / ranked_results)

Each element represents one backend's contribution:

Key Type Description
answer str The raw answer text from this backend.
source str The backend's registered name.
weight float The weight assigned to this source.
score float Relevance score from the backend's retriever (0.0 if not available).
latency_ms float Round-trip time to this source in milliseconds.

Streaming Strategies by AggregationMethod

AggregationMethod Streaming Behaviour
WEIGHTED Sources are sorted by weight (highest first). The federation streams from the first source that produces content without error.
SIMPLE Same as WEIGHTED.
RANKING All sources race for stream_race_timeout seconds. The source with the highest score × weight first chunk wins. The winner's remaining chunks are then streamed.
MERGE Sources are streamed sequentially in descending weight order. Each source's output is prefixed with [source_name] and separated by \n\n.
VOTING Cannot stream by nature. Falls back to query_async, then emits one single StreamChunk with the full answer.

Backend Compatibility

FederatedRAG uses signature-aware introspection to call backends. It never passes a parameter the backend's method does not declare, preventing TypeError on mixed-capability backends.

Minimum required interface

class MyRAGBackend:
    def generate(self, query: str) -> str | dict:
        ...

Full interface (all capabilities unlocked)

class FullRAGBackend:
    def retrieve(self, query: str, top_k: int = 5) -> List[Tuple[Document, float]]:
        """Returns list of (document, score) tuples."""
        ...

    async def aretrieve(self, query: str, top_k: int = 5) -> List[Tuple[Document, float]]:
        """Async variant of retrieve."""
        ...

    def generate(self, query: str, language: str = "en") -> str | dict:
        """Generate an answer. Return str or dict with at least {"answer": str}."""
        ...

    async def agenerate(self, query: str, language: str = "en") -> str | dict:
        """Async variant of generate."""
        ...

    def stream(self, query: str, top_k: int = 5) -> Iterable[str | dict]:
        """Yield text fragments or {"text": str, "score": float} dicts."""
        ...

    async def astream(self, query: str, top_k: int = 5) -> AsyncIterable[str | dict]:
        """Async variant of stream."""
        ...

Method resolution priority

Operation Priority order
Async generation agenerategenerate (coroutine) → generate (sync, run in thread)
Sync retrieval aretrieve (awaited) → retrieve (in executor)
Streaming astreamstreamgenerate_stream → buffered fallback

context and language forwarding

  • If a backend's method declares a context parameter and context is not None, it is forwarded.
  • If a backend's method declares a language parameter and context is a dict with a "language" key, that value is extracted and forwarded as language=<value>.

Circuit Breaker Behaviour

Each registered source has its own independent CircuitBreaker. The federation automatically skips sources whose circuit is open.

State machine

CLOSED ──(≥ threshold failures)──▶ OPEN ──(recovery_timeout elapsed)──▶ HALF_OPEN
   ▲                                                                          │
   └────────────────(success in HALF_OPEN)──────────────────────────────────┘
                              │
                    (failure in HALF_OPEN)
                              ▼
                            OPEN

Default thresholds

Parameter Default Description
failure_threshold 3 Consecutive failures before the circuit opens.
recovery_timeout 30.0 s Seconds before an open circuit transitions to HALF_OPEN and allows a probe request.

Effect on queries

  • CLOSED: Source is queried normally.
  • OPEN: Source is excluded from this query entirely. No network call is made.
  • HALF_OPEN: One probe request is allowed. Success → CLOSED; failure → OPEN (timer resets).

Monitor circuit states via get_stats():

for name, s in fed.get_stats().items():
    if s["circuit_state"] == "open":
        print(f"⚠️  {name} is circuit-open — check that backend!")

Caching Behaviour

FederatedRAG uses a thread-safe, in-memory TTL cache (TTLCache) to avoid redundant backend calls for identical queries.

Cache key

The cache key is a SHA-256 hash of f"{query}|{sorted(context.items())}". Two calls are cache-equivalent only if both query and all context key-value pairs match exactly.

Cache hit behaviour

  • Hits are returned immediately with "cached": True.
  • post_query_hook is not called on cache hits.
  • pre_query_hook is not called on cache hits.
  • Streaming (stream / astream) does not check the cache — streams are always live.

Invalidation

There is no public method to invalidate individual entries. The entire cache can be cleared via:

fed._cache.invalidate()   # clears all entries

Entries also expire naturally after cache_ttl seconds from when they were written.

Eviction

When the cache reaches cache_max_size entries, the entry with the oldest insertion timestamp is evicted (LRU by insertion time, not access time).


Error Codes

Code Trigger condition answer content
"insufficient_sources" Fewer active sources available than min_sources. "Need ≥ N active source(s), found M"
"all_sources_failed" All sources returned None (timed out or raised exceptions). "All sources failed or timed out"

Error dicts have the shape:

{
    "answer":       "<human-readable message>",
    "error":        "<error_code>",
    "sources_used": [],
    "num_sources":  0,
}

Complete Examples

Example 1 — Basic multi-source query with stats

import logging
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod

logging.basicConfig(level=logging.INFO)

fed = FederatedRAG(aggregation_method=AggregationMethod.RANKING)
fed.add_source("docs",  docs_rag,  weight=2.0, timeout=5.0)
fed.add_source("wiki",  wiki_rag,  weight=1.0, timeout=8.0)

result = fed.query("What is retrieval-augmented generation?", top_k=5)

if "error" in result:
    print("Federation error:", result["error"])
else:
    print("Answer:", result["answer"])
    print("Primary source:", result.get("primary_source"))
    print("Ranked sources:", [r["source"] for r in result.get("ranked_results", [])])

print("\nSource Stats:")
for src, s in fed.get_stats().items():
    print(f"  {src}: {s['avg_latency_ms']} ms avg, {s['error_rate']:.0%} error rate")

Example 2 — Async streaming in a FastAPI SSE endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod

app = FastAPI()

fed = FederatedRAG(aggregation_method=AggregationMethod.WEIGHTED)
fed.add_source("primary", primary_rag, weight=2.0)
fed.add_source("fallback", fallback_rag, weight=0.5)

@app.get("/ask")
async def ask(q: str, lang: str = "en"):
    async def token_stream():
        ctx = {"language": lang}
        async for chunk in fed.astream(q, context=ctx, top_k=5):
            if chunk.done:
                yield f"\n[done: {chunk.source}]"
            elif chunk.error:
                yield f"\n[error from {chunk.source}: {chunk.error}]"
            else:
                yield chunk.text

    return StreamingResponse(token_stream(), media_type="text/plain")

Example 3 — MERGE strategy for research aggregation

import asyncio
from fennec_community.rag.types.federated_rag import FederatedRAG, AggregationMethod

async def research(question: str):
    fed = FederatedRAG(aggregation_method=AggregationMethod.MERGE)
    fed.add_source("papers",    papers_rag,    weight=3.0)
    fed.add_source("textbooks", textbooks_rag, weight=2.0)
    fed.add_source("web",       web_rag,       weight=1.0)

    result = await fed.query_async(question, top_k=10)
    # result["answer"] will contain [papers] ...\n\n[textbooks] ...\n\n[web] ...
    return result["answer"]

answer = asyncio.run(research("Explain the attention mechanism in transformers"))
print(answer)

Example 4 — Dynamic source management

from fennec_community.rag.types.federated_rag import FederatedRAG

fed = FederatedRAG()
fed.add_source("src_a", rag_a, weight=1.0)
fed.add_source("src_b", rag_b, weight=1.5)
fed.add_source("src_c", rag_c, weight=0.8)

# Disable a source for maintenance
fed.enable_source("src_c", enabled=False)
result = fed.query("Who invented the internet?")   # Only src_a and src_b queried

# Re-enable after maintenance
fed.enable_source("src_c", enabled=True)

# Add a new source at runtime
fed.add_source("src_d", rag_d, weight=2.0)

# Remove a decommissioned source
fed.remove_source("src_a")

print(f"Active federation: {fed}")  # FederatedRAG(sources=['src_b', 'src_c', 'src_d'], ...)
print(f"Total sources: {len(fed)}")

Example 5 — Lifecycle hooks for observability

import time
from fennec_community.rag.types.federated_rag import FederatedRAG

request_log = []

def pre_hook(query: str, context):
    request_log.append({"query": query, "ts": time.time(), "status": "started"})
    print(f"[METRIC] query started: {query[:50]!r}")

def post_hook(result: dict):
    request_log.append({
        "sources": result["sources_used"],
        "cached": result["cached"],
        "ts": time.time(),
        "status": "done",
    })
    print(f"[METRIC] query done | sources={result['sources_used']} cached={result['cached']}")

fed = FederatedRAG(
    pre_query_hook=pre_hook,
    post_query_hook=post_hook,
)
fed.add_source("main", my_rag)
fed.query("What is machine learning?")

Simple Real Example

import asyncio
from fennec_community.llm import MistralInterface
from fennec_community.document_loaders import TextLoader 
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem 
from fennec_community.rag.types.federated_rag import FederatedRAG , AggregationMethod   

chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = MistralInterface(api_key=llm_api)
context_manager = ContextManager()

vdb1     = FAISSVectorDatabase(embedder=embedder)
rag_tech = RAGSystem(vector_db=vdb1, llm=llm, chunker=chunker, context_manager=context_manager)
rag_tech.add_text(text= "بايثون لغة برمجة عالية المستوى متعددة الاستخدامات، صُمِّمت لتكون سهلة القراءة والكتابة. "
            "تدعم البرمجة الإجرائية والكائنية والوظيفية. تُستخدم على نطاق واسع في علم البيانات، "
            "والذكاء الاصطناعي، وتطوير الويب، وأتمتة المهام. تتميز بمكتبة قياسية ضخمة ومجتمع "
            "نشط يوفر آلاف الحزم الجاهزة عبر PyPI.",doc_id="tech_python", metadata={"source": "tech_docs", "category": "programming"})
        
 
vdb2     = FAISSVectorDatabase(embedder=embedder)
rag_wiki = RAGSystem(vector_db=vdb2, llm=llm, chunker=chunker, context_manager=context_manager)
rag_wiki.add_text(text="بدأت بحوث الذكاء الاصطناعي رسمياً في مؤتمر دارتموث عام 1956. "
            "مرّ المجال بفترتين من شتاء الذكاء الاصطناعي بسبب توقف التمويل. "
            "انتعش مجدداً مطلع الألفية الثالثة بفضل البيانات الضخمة والقدرة الحوسبية, "
            "ثم شهد طفرة مع ظهور نماذج اللغة الكبيرة.", doc_id="wiki_ai_history", metadata={"source": "wikipedia", "category": "history"})
 
vdb3         = FAISSVectorDatabase(embedder=embedder)
rag_internal = RAGSystem(vector_db=vdb3, llm=llm, chunker=chunker, context_manager=context_manager)
rag_internal.add_text(text="شركتنا TechAI Solutions تأسست عام 2020 وتتخصص في حلول الذكاء الاصطناعي. "
            "نخدم أكثر من 150 عميلاً في قطاعات المال والصحة والتجزئة. "
            "يضم فريقنا 80 موظفاً منهم 45 مهندساً وباحثاً.", doc_id="internal_company_overview", metadata={"source": "internal", "category": "company_info"})
 

fed = FederatedRAG(
        aggregation_method  = AggregationMethod.WEIGHTED,
        min_sources         = 1,
        cache_ttl           = 120.0,
        stream_race_timeout = 3.0,
    )
fed.add_source("tech_docs", rag_tech,     weight=2.0, timeout=15.0) \
       .add_source("wikipedia", rag_wiki,     weight=1.0, timeout=10.0) \
       .add_source("internal",  rag_internal, weight=1.5, timeout=8.0)
 
print(f"عدد المصادر: {len(fed)}")
print(repr(fed))

result = fed.query("ما هو  لغه بايثون", top_k=5)
print(result)

Source: community/rag/federated_rag.md