Fennec Logo Fennec
Fennec Community community/rag/streaming_rag.md

Streaming RAG — `streaming_rag` Module — Public API Reference


Table of Contents

  1. Module Overview
  2. StreamConfig
  3. StreamingRAG
  4. StreamEvent
  5. EventType Enumeration
  6. Protocols
  7. Generation Strategy Selection
  8. Event Stream Lifecycle
  9. Error Handling Reference
  10. Quick-Start Example

1. Module Overview

The streaming_rag package wraps any existing RAG backend and adds real-time token streaming with a rich event model. Instead of waiting for a full answer to be assembled before returning it, the system emits each word or token as it is produced, interleaved with structured metadata events (retrieval started, retrieval done, generation started, generation done, error).

Key capabilities:

  • Three-strategy generation — native async LLM stream → sync LLM stream (thread-bridged) → simulated word-by-word fallback, selected automatically at runtime.
  • Dual surface APIstream() / stream_events() for synchronous callers; astream() / astream_events() for async callers. All share the same underlying async implementation.
  • Rich StreamEvent model — every event carries a type, payload, and latency timestamp, giving consumers fine-grained control.
  • Configurable timeouts — independent retrieval and generation deadlines with graceful ERROR events on timeout.
  • Callback hookson_chunk and on_event fire on every chunk and every event respectively, enabling side-effect patterns (logging, databases, WebSocket push) without modifying the consumer loop.
  • Runtime statistics — cumulative stream count, chunk count, error count, and average latency.
  • Thread-safe sync bridgestream_events() spawns a private daemon thread with a stdlib queue.Queue (blocking, no busy-wait) so the async generator can be consumed from synchronous code safely.

Publicly exported symbols:

from fennec_community.rag.types.streaming_rag import (
    StreamingRAG,
    StreamConfig,
    SyncStreamableLLM,
    AsyncStreamableLLM,
    EventType,
)

2. StreamConfig

from fennec_community.rag.types.streaming_rag import StreamConfig

StreamConfig is a dataclass that centralises all tunable parameters for a StreamingRAG instance. All fields are optional — the defaults are production-ready for most use cases.

Constructor

StreamConfig(
    chunk_size: int = 10,
    sim_delay: float = 0.04,
    max_context_docs: int = 5,
    max_doc_chars: int = 500,
    emit_metadata: bool = True,
    retrieval_timeout: float = 10.0,
    generation_timeout: float = 60.0,
    prompt_template: str = <built-in>,
)
Parameter Type Default Description
chunk_size int 10 Number of words per simulated chunk when the fallback (Strategy 3) word-by-word simulator is used. Not applied to native streaming LLMs.
sim_delay float 0.04 Seconds of asyncio.sleep between simulated chunks. Controls the perceived streaming speed in fallback mode.
max_context_docs int 5 Maximum number of retrieved documents injected into the prompt context. Extra documents beyond this limit are silently discarded.
max_doc_chars int 500 Each retrieved document is truncated to this many characters before being inserted into the context block.
emit_metadata bool True Reserved for future use — intended to gate whether non-CHUNK events are emitted. Currently all events are always emitted regardless of this flag.
retrieval_timeout float 10.0 Maximum seconds allowed for the entire retrieval step. Exceeding this emits an ERROR event and terminates the stream.
generation_timeout float 60.0 Maximum wall-clock seconds allowed for the entire generation step. A sliding deadline is applied per token, not just at the start.
prompt_template str Built-in Python format string with {context} and {query} placeholders. The built-in template instructs the LLM to answer strictly from the provided context and to respond in the same language as the question.

Built-in prompt template:

You are a precise RAG assistant.

Strict rules:
- Use ONLY the provided context.
- Do NOT guess.
- If missing info, say: 'no information to answer '.
- The answer MUST be in the SAME language as the question.
- NEVER switch language.

Context:
{context}

Question: {query}

Final Answer:

Example:

from fennec_community.rag.types.streaming_rag import StreamConfig

config = StreamConfig(
    chunk_size=5,
    max_context_docs=8,
    max_doc_chars=800,
    retrieval_timeout=15.0,
    generation_timeout=90.0,
    prompt_template=(
        "Context:\n{context}\n\n"
        "Question: {query}\n\n"
        "Answer concisely:"
    ),
)

3. StreamingRAG

from fennec_community.rag.types.streaming_rag import StreamingRAG

StreamingRAG is the main class of the package. It wraps any RAG backend and adds real-time streaming generation with a full event lifecycle.


3.1 Constructor

StreamingRAG(
    rag_system: Any,
    llm: Any = None,
    config: Optional[StreamConfig] = None,
    *,
    chunk_size: Optional[int] = None,
    on_chunk: Optional[Callable[[str], None]] = None,
    on_event: Optional[Callable[[StreamEvent], None]] = None,
)

Purpose: Instantiate a streaming RAG system by binding it to an existing RAG backend and optionally providing an LLM capable of streaming. The config object is shallow-copied at construction time so mutations to the original StreamConfig do not affect this instance.

Parameter Type Required Description
rag_system Any Yes The underlying RAG backend. Must expose .retrieve(query) -> Any (sync or async). Optionally exposes .generate(query, context) -> str | Dict for fallback (Strategy 3) generation.
llm Any No Language model instance. Checked against AsyncStreamableLLM (preferred) and SyncStreamableLLM protocols at runtime. Falls back to rag_system.llm if None. If no LLM is resolvable, Strategy 3 (word-by-word simulation) is used.
config Optional[StreamConfig] No Streaming configuration object. A StreamConfig() with all defaults is created if None.
chunk_size Optional[int] No Keyword-only shortcut to override config.chunk_size after copying. Useful for quick one-off overrides without creating a full StreamConfig.
on_chunk Optional[Callable[[str], None]] No Callback invoked synchronously on every CHUNK token before it is yielded. Exceptions in the callback are not caught — the caller is responsible for any error handling inside the callback.
on_event Optional[Callable[[StreamEvent], None]] No Callback invoked synchronously on every StreamEvent (including non-CHUNK events). Fires before the event is yielded to the consumer.

Internal state initialised:

Attribute Type Description
_total_streams int Count of astream_events() calls.
_total_chunks int Count of CHUNK events emitted across all streams.
_total_errors int Count of ERROR events emitted.
_total_latency float Cumulative elapsed seconds across all streams (used to compute avg_stream_latency_s).

Example:

from fennec_community.rag.types.streaming_rag import StreamingRAG, StreamConfig

def save_chunk(chunk: str):
    db.append_token(chunk)

config = StreamConfig(max_context_docs=6, retrieval_timeout=8.0)

rag = StreamingRAG(
    rag_system=my_rag,
    llm=my_llm,
    config=config,
    on_chunk=save_chunk,
)

3.2 Synchronous Streaming API

stream()

rag.stream(
    query: str,
    context: Optional[Dict] = None,
) -> Iterator[str]

Purpose: Synchronous generator that yields plain text chunks only. All metadata events (RETRIEVAL_START, RETRIEVAL_DONE, GENERATION_START, GENERATION_DONE, ERROR) are silently consumed internally — but on_chunk and on_event callbacks still fire for every event. This is the simplest consumer API for cases where only the answer text is needed.

Implementation: Internally calls stream_events() and filters for event.is_chunk.

Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Supplemental key-value context merged into the generation call (Strategy 3 only — forwarded to rag_system.generate(query, context)).

Yields: str — individual text chunks in the order they are produced by the LLM or simulator.

Example:

print("Answer: ", end="")
for chunk in rag.stream("What is retrieval-augmented generation?"):
    print(chunk, end="", flush=True)
print()  # newline after stream

stream_events()

rag.stream_events(
    query: str,
    context: Optional[Dict] = None,
) -> Iterator[StreamEvent]

Purpose: Synchronous generator that yields all StreamEvent objects — including metadata events and CHUNK events — in the order they occur. This is the richest synchronous consumer API, giving access to retrieval metadata, timing, and error events.

Implementation: Spawns a private daemon thread that runs a new asyncio event loop. The async generator astream_events() is executed inside that loop. Events are passed to the calling thread via a threading.Thread-safe stdlib queue.Queue with blocking get() (no polling, no busy-wait). A None sentinel is enqueued after the last event to signal completion.

Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Supplemental context forwarded to generation (Strategy 3 only).

Yields: StreamEvent — all events in lifecycle order (see §8 Event Stream Lifecycle).

Example:

for event in rag.stream_events("Explain neural networks"):
    if event.is_chunk:
        print(event.data, end="", flush=True)
    elif event.type == EventType.RETRIEVAL_DONE:
        print(f"\n[Retrieved {event.data['num_docs']} docs in {event.latency:.2f}s]")
    elif event.type == EventType.ERROR:
        print(f"\n[Error at {event.data['stage']}: {event.data['msg']}]")
        break

3.3 Non-Streaming Convenience

query()

rag.query(
    query: str,
    context: Optional[Dict] = None,
) -> Dict[str, Any]

Purpose: Non-streaming convenience method that collects the full stream internally and returns a single dictionary with the complete answer string and the full event list. Useful when you want the benefits of the streaming pipeline (timeouts, callbacks, event logging) without implementing a generator consumer loop.

Implementation: Internally calls stream_events(), buffers all chunks, and concatenates them.

Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Supplemental context forwarded to generation.

Returns: Dict[str, Any] with the following fields:

Key Type Description
answer str The complete concatenated answer text from all CHUNK events. Empty string if no chunks were emitted.
events List[StreamEvent] Ordered list of every StreamEvent emitted during the stream (metadata + chunks).

Example:

result = rag.query("What are the main benefits of RAG?")
print(result["answer"])

# Inspect the event trace
for ev in result["events"]:
    if ev.type == EventType.GENERATION_DONE:
        print(f"Total latency: {ev.data['total_latency']:.2f}s")

3.4 Asynchronous Streaming API

astream()

async for chunk in rag.astream(
    query: str,
    context: Optional[Dict] = None,
):
    print(chunk, end="", flush=True)

Purpose: Async generator that yields plain text chunks only. The async counterpart of stream(). All metadata events are silently consumed — callbacks still fire. Preferred in async applications (FastAPI, Starlette, etc.) over the synchronous API to avoid blocking the event loop.

Implementation: Delegates to astream_events() and filters for event.is_chunk.

Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Supplemental context forwarded to generation.

Yields: str — individual text chunks.

Example:

import asyncio

async def main():
    print("Answer: ", end="")
    async for chunk in rag.astream("How does vector similarity search work?"):
        print(chunk, end="", flush=True)
    print()

asyncio.run(main())

astream_events()

async for event in rag.astream_events(
    query: str,
    context: Optional[Dict] = None,
) -> AsyncIterator[StreamEvent]:

Purpose: The core implementation of the entire streaming system. All other methods (stream, stream_events, astream, query) delegate to this async generator. Yields all StreamEvent objects in lifecycle order with monotonic latency timestamps.

Full pipeline executed:

  1. Emit RETRIEVAL_START event.
  2. Await _retrieve(query) with retrieval_timeout. On timeout or exception → emit ERROR event and return.
  3. Emit RETRIEVAL_DONE event with num_docs and docs payload.
  4. Build the context string and format the prompt from StreamConfig.prompt_template.
  5. Emit GENERATION_START event with prompt_chars payload.
  6. Iterate _generate() with a sliding generation_timeout deadline (applied per token via asyncio.wait_for(__anext__())). On timeout or exception → emit ERROR event and return.
  7. For each token: fire on_chunk callback, emit CHUNK event.
  8. Always call gen.aclose() in finally to release LLM resources (e.g., open HTTP connections).
  9. Emit GENERATION_DONE event with total_latency payload.
Parameter Type Required Description
query str Yes Natural-language question.
context Optional[Dict] No Supplemental context forwarded to generation (Strategy 3 only).

Yields: StreamEvent — all events in lifecycle order.

Example:

import asyncio
from fennec_community.rag.types.streaming_rag import EventType

async def rich_consumer():
    async for event in rag.astream_events("Describe transformer architecture"):
        match event.type:
            case EventType.RETRIEVAL_START:
                print(f"🔍 Retrieving... (t={event.latency:.3f}s)")
            case EventType.RETRIEVAL_DONE:
                print(f"📄 Got {event.data['num_docs']} docs (t={event.latency:.3f}s)")
            case EventType.GENERATION_START:
                print(f"✍️  Generating {event.data['prompt_chars']} char prompt...")
            case EventType.CHUNK:
                print(event.data, end="", flush=True)
            case EventType.GENERATION_DONE:
                print(f"\n✅ Done in {event.data['total_latency']:.2f}s")
            case EventType.ERROR:
                print(f"\n❌ Error at [{event.data['stage']}]: {event.data['msg']}")

asyncio.run(rich_consumer())

3.5 Runtime Statistics

stats (property)

rag.stats -> Dict[str, Any]

Purpose: Return a snapshot of all cumulative streaming statistics since the instance was created. Resets only when the object is garbage-collected.

Parameters: None — accessed as a property (rag.stats, not rag.stats()).

Returns: Dict[str, Any] with the following fields:

Key Type Description
total_streams int Total number of astream_events() calls initiated (including those that errored).
total_chunks int Total CHUNK events emitted across all streams. Directly proportional to the volume of generated text.
total_errors int Total ERROR events emitted (retrieval timeouts + generation timeouts + exceptions).
avg_stream_latency_s float Mean elapsed seconds per stream, rounded to 3 decimal places. Only streams that completed (reached GENERATION_DONE) contribute to this average.

Example:

import json

# After several queries:
print(json.dumps(rag.stats, indent=2))
# {
#   "total_streams": 12,
#   "total_chunks": 847,
#   "total_errors": 1,
#   "avg_stream_latency_s": 2.341
# }

3.6 Representation

__repr__()

repr(rag)

Purpose: Return a concise representation of the instance's LLM capability and chunk size. Useful for logging and REPL inspection.

Returns: str in one of three forms:

LLM capability Output
Async LLM (AsyncStreamableLLM) StreamingRAG(llm=async, chunk_size=10)
Sync LLM (SyncStreamableLLM) StreamingRAG(llm=sync, chunk_size=10)
No LLM (simulation fallback) StreamingRAG(llm=sim, chunk_size=10)

Example:

print(repr(rag))
# StreamingRAG(llm=async, chunk_size=10)

4. StreamEvent

from fennec_community.rag.types.streaming_rag import StreamEvent

StreamEvent is a frozen dataclass representing a single event emitted during a stream. It is the universal carrier for both text chunks and lifecycle metadata.


4.1 Attributes

Attribute Type Description
type EventType The event category. Determines how data should be interpreted.
data Any Event payload. A plain str for CHUNK events; a dict for all metadata events.
latency float Seconds elapsed since the stream started (monotonic clock), rounded to 4 decimal places. Always 0.0 for the very first event.

Frozen: StreamEvent instances are immutable (frozen=True). All fields must be set at construction time.


4.2 Properties

is_chunk

event.is_chunk -> bool

Purpose: Convenience predicate — True only when event.type is EventType.CHUNK. Allows consumers to distinguish text payload events from metadata events without importing EventType.

Returns: bool

Example:

for event in rag.stream_events("What is RAG?"):
    if event.is_chunk:
        print(event.data, end="")

4.3 Special Methods

__str__()

str(event)  # → str

Purpose: Return event.data when the event is a CHUNK, or "" (empty string) for all other event types. Enables a very concise consumer pattern where StreamEvent objects can be concatenated directly as strings.

Returns: str

Example:

# Ultra-compact consumer — works because str(non-chunk event) == ""
answer = "".join(str(ev) for ev in rag.stream_events("Summarise the context"))
print(answer)

5. EventType Enumeration

from fennec_community.rag.types.streaming_rag import EventType

EventType is a str-based Enum representing all possible event categories emitted during a stream. Because it inherits from str, values can be compared directly to string literals.

Enum Value String Value data payload Description
EventType.RETRIEVAL_START "retrieval_start" {"query": str} Emitted immediately before calling the retriever. Marks the start of the retrieval step.
EventType.RETRIEVAL_DONE "retrieval_done" {"num_docs": int, "docs": List[Dict]} Emitted after retrieval completes successfully. num_docs is the raw count before max_context_docs truncation.
EventType.GENERATION_START "generation_start" {"prompt_chars": int} Emitted immediately before calling the LLM. prompt_chars is the total character length of the formatted prompt.
EventType.CHUNK "chunk" str A text token or word chunk produced by the LLM. For native streaming this is one token; for the word-level simulator this is chunk_size words.
EventType.GENERATION_DONE "generation_done" {"total_latency": float} Emitted after the last chunk. total_latency is the full elapsed time since stream start in seconds.
EventType.ERROR "error" {"msg": str, "stage": str} Emitted on retrieval timeout, generation timeout, or any unhandled exception. stage is either "retrieval" or "generation". After an ERROR the generator returns — no further events are emitted.

Usage with match statement (Python 3.10+):

async for event in rag.astream_events(query):
    match event.type:
        case EventType.RETRIEVAL_DONE:
            print(f"Found {event.data['num_docs']} documents")
        case EventType.CHUNK:
            print(event.data, end="")
        case EventType.ERROR:
            print(f"Error: {event.data['msg']}")

6. Protocols

from fennec_community.rag.types.streaming_rag import SyncStreamableLLM, AsyncStreamableLLM

Both protocols are decorated with @runtime_checkable, which means isinstance(obj, SyncStreamableLLM) works at runtime. StreamingRAG uses this to select the correct generation strategy automatically.


6.1 SyncStreamableLLM

class SyncStreamableLLM(Protocol):
    def stream(self, prompt: str) -> Iterator[str]: ...

Purpose: Protocol that any synchronous streaming LLM must satisfy. Implement this protocol to use Strategy 2 (sync stream wrapped in a thread executor).

Required method:

Method Signature Description
stream (prompt: str) -> Iterator[str] Accept a complete prompt string and yield text tokens one at a time as they are produced.

Implementation example:

from fennec_community.rag.types.streaming_rag import SyncStreamableLLM
from typing import Iterator

class MyOpenAILLM:  # implicitly satisfies SyncStreamableLLM
    def stream(self, prompt: str) -> Iterator[str]:
        for chunk in openai_client.chat.completions.create(
            model="gpt-4o", messages=[{"role":"user","content":prompt}], stream=True
        ):
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

assert isinstance(MyOpenAILLM(), SyncStreamableLLM)  # → True

6.2 AsyncStreamableLLM

class AsyncStreamableLLM(Protocol):
    async def astream(self, prompt: str) -> AsyncIterator[str]: ...

Purpose: Protocol that any async streaming LLM must satisfy. Implement this protocol to use Strategy 1 (native async stream) — the highest-fidelity and lowest-latency option.

Required method:

Method Signature Description
astream async (prompt: str) -> AsyncIterator[str] Accept a complete prompt and asynchronously yield tokens as they are produced.

Implementation example:

from fennec_community.rag.types.streaming_rag import AsyncStreamableLLM
from typing import AsyncIterator

class MyAsyncLLM:  # implicitly satisfies AsyncStreamableLLM
    async def astream(self, prompt: str) -> AsyncIterator[str]:
        async for chunk in await async_openai_client.chat.completions.create(
            model="gpt-4o", messages=[{"role":"user","content":prompt}], stream=True
        ):
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

assert isinstance(MyAsyncLLM(), AsyncStreamableLLM)  # → True

7. Generation Strategy Selection

StreamingRAG automatically selects the best available generation strategy at runtime, in priority order:

┌─────────────────────────────────────────────────────────────────┐
│              Generation Strategy Decision Tree                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  llm is not None                                                │
│  AND isinstance(llm, AsyncStreamableLLM)?                       │
│         │                                                       │
│         ├── YES → Strategy 1: Native async stream               │
│         │         Calls llm.astream(prompt)                     │
│         │         ✅ Best latency, true token-level streaming   │
│         │                                                       │
│         └── NO ──→ isinstance(llm, SyncStreamableLLM)?          │
│                          │                                      │
│                          ├── YES → Strategy 2: Sync stream      │
│                          │         Runs llm.stream(prompt)      │
│                          │         in a thread via executor     │
│                          │         Bridges sync → async safely  │
│                          │         ✅ Token-level, thread-safe  │
│                          │                                      │
│                          └── NO ──→ Strategy 3: Simulation      │
│                                    Calls rag_system.generate()  │
│                                    Splits answer into word      │
│                                    chunks of size=chunk_size    │
│                                    with sim_delay between each  │
│                                    ⚠️ Pseudo-streaming only     │
└─────────────────────────────────────────────────────────────────┘
Strategy Requires LLM method used Token-level?
1 — Native async llm implements AsyncStreamableLLM llm.astream(prompt) ✅ Yes
2 — Sync wrapped llm implements SyncStreamableLLM llm.stream(prompt) in thread ✅ Yes
3 — Simulation rag_system.generate() exists rag_system.generate(query, context) ⚠️ Word-level only

Strategy 3 error: If rag_system.generate is also absent, AttributeError is raised with a descriptive message instructing the caller to provide a proper LLM.


8. Event Stream Lifecycle

A complete, error-free stream always produces events in this exact order:

Time →
│
├─ [t=0.0000] RETRIEVAL_START  {"query": "..."}
│
│  [retrieval executes — up to retrieval_timeout seconds]
│
├─ [t=0.Xsec]  RETRIEVAL_DONE  {"num_docs": N, "docs": [...]}
│
│  [context built, prompt formatted]
│
├─ [t=0.Xsec]  GENERATION_START {"prompt_chars": N}
│
│  [LLM streaming begins]
│
├─ [t=0.Xsec]  CHUNK  "First few words "
├─ [t=0.Xsec]  CHUNK  "of the answer "
├─ [t=0.Xsec]  CHUNK  "..."...
├─ [t=0.Xsec]  CHUNK  "last chunk."
│
└─ [t=0.Xsec]  GENERATION_DONE {"total_latency": X.XXXX}


─── On retrieval error or timeout: ───────────────────────────────
│
├─ RETRIEVAL_START
└─ ERROR  {"msg": "Retrieval timed out", "stage": "retrieval"}
   [stream ends — no further events]


─── On generation error or timeout: ──────────────────────────────
│
├─ RETRIEVAL_START
├─ RETRIEVAL_DONE
├─ GENERATION_START
├─ CHUNK  (zero or more chunks may have been emitted before error)
└─ ERROR  {"msg": "Generation timed out", "stage": "generation"}
   [stream ends — no further events]

9. Error Handling Reference

Scenario ERROR.data["stage"] ERROR.data["msg"] Recovery
Retrieval timed out (exceeded retrieval_timeout) "retrieval" "Retrieval timed out" Stream ends. Retry with shorter query or increase retrieval_timeout.
Retrieval raised an exception "retrieval" Exception message Stream ends. Check rag_system.retrieve() implementation.
Generation timed out (sliding deadline exceeded) "generation" "Generation timed out" Stream ends. Increase generation_timeout or reduce max_context_docs.
Generation raised an exception "generation" Exception message Stream ends. Check LLM connectivity and stream() / astream() implementation.
Strategy 3: rag_system.generate missing Raised before stream starts AttributeError with descriptive message Provide an LLM implementing SyncStreamableLLM or AsyncStreamableLLM.

10. Quick-Start Example

import asyncio
import json
from fennec_community.rag.types.streaming_rag import StreamingRAG, StreamConfig, EventType
from fennec_community.rag.types.streaming_rag import SyncStreamableLLM, AsyncStreamableLLM

# ═══════════════════════════════════════════════════════════════════
# 1. Implement the AsyncStreamableLLM protocol (Strategy 1)
# ═══════════════════════════════════════════════════════════════════
class MyAsyncLLM:
    async def astream(self, prompt: str):
        # Replace with a real async LLM call (OpenAI, Anthropic, etc.)
        words = f"Answer based on context: {prompt[:30]}...".split()
        for word in words:
            yield word + " "
            await asyncio.sleep(0.02)

# ═══════════════════════════════════════════════════════════════════
# 2. Configure the streamer
# ═══════════════════════════════════════════════════════════════════
config = StreamConfig(
    max_context_docs=5,
    max_doc_chars=600,
    retrieval_timeout=10.0,
    generation_timeout=60.0,
)

# ═══════════════════════════════════════════════════════════════════
# 3. Callbacks (optional)
# ═══════════════════════════════════════════════════════════════════
log = []

def on_chunk(chunk: str):
    log.append(chunk)          # side-effect: accumulate tokens elsewhere

def on_event(event):
    if event.type == EventType.ERROR:
        print(f"[CALLBACK] ❌ Error: {event.data}")

# ═══════════════════════════════════════════════════════════════════
# 4. Instantiate
# ═══════════════════════════════════════════════════════════════════
rag = StreamingRAG(
    rag_system=my_rag_system,
    llm=MyAsyncLLM(),
    config=config,
    on_chunk=on_chunk,
    on_event=on_event,
)
print(repr(rag))
# StreamingRAG(llm=async, chunk_size=10)

# ═══════════════════════════════════════════════════════════════════
# 5. Synchronous plain text streaming
# ═══════════════════════════════════════════════════════════════════
print("Answer: ", end="")
for chunk in rag.stream("What is retrieval-augmented generation?"):
    print(chunk, end="", flush=True)
print()

# ═══════════════════════════════════════════════════════════════════
# 6. Synchronous rich event streaming
# ═══════════════════════════════════════════════════════════════════
for event in rag.stream_events("Explain transformer attention"):
    if event.type == EventType.RETRIEVAL_DONE:
        print(f"\n[Docs: {event.data['num_docs']}  Latency: {event.latency:.3f}s]")
    elif event.type == EventType.CHUNK:
        print(event.data, end="", flush=True)
    elif event.type == EventType.GENERATION_DONE:
        print(f"\n[Done in {event.data['total_latency']:.2f}s]")
    elif event.type == EventType.ERROR:
        print(f"\n[Error at {event.data['stage']}: {event.data['msg']}]")
        break

# ═══════════════════════════════════════════════════════════════════
# 7. Non-streaming (collect full answer)
# ═══════════════════════════════════════════════════════════════════
result = rag.query("Summarise the key points about BERT")
print(result["answer"])
print(f"Events captured: {len(result['events'])}")

# ═══════════════════════════════════════════════════════════════════
# 8. Async plain text streaming
# ═══════════════════════════════════════════════════════════════════
async def async_text():
    print("Async: ", end="")
    async for chunk in rag.astream("How does vector search work?"):
        print(chunk, end="", flush=True)
    print()

asyncio.run(async_text())

# ═══════════════════════════════════════════════════════════════════
# 9. Async rich event streaming
# ═══════════════════════════════════════════════════════════════════
async def async_events():
    async for event in rag.astream_events("Describe the RAG pipeline"):
        match event.type:
            case EventType.RETRIEVAL_START:
                print(f"🔍 Retrieving (t={event.latency:.3f}s)")
            case EventType.RETRIEVAL_DONE:
                print(f"📄 Retrieved {event.data['num_docs']} docs")
            case EventType.GENERATION_START:
                print(f"✍️  Generating ({event.data['prompt_chars']} chars)")
            case EventType.CHUNK:
                print(event.data, end="", flush=True)
            case EventType.GENERATION_DONE:
                print(f"\n✅ Total: {event.data['total_latency']:.2f}s")
            case EventType.ERROR:
                print(f"\n❌ [{event.data['stage']}] {event.data['msg']}")

asyncio.run(async_events())

# ═══════════════════════════════════════════════════════════════════
# 10. Runtime statistics
# ═══════════════════════════════════════════════════════════════════
print(json.dumps(rag.stats, indent=2))
# {
#   "total_streams": 4,
#   "total_chunks": 312,
#   "total_errors": 0,
#   "avg_stream_latency_s": 1.847
# }

# ═══════════════════════════════════════════════════════════════════
# 11. Protocol compliance check
# ═══════════════════════════════════════════════════════════════════
from fennec_community.rag.types.streaming_rag import AsyncStreamableLLM, SyncStreamableLLM

print(isinstance(MyAsyncLLM(), AsyncStreamableLLM))  # → True

class MySyncLLM:
    def stream(self, prompt: str):
        for word in "Hello world from sync LLM".split():
            yield word + " "

print(isinstance(MySyncLLM(), SyncStreamableLLM))  # → True

# Switch to sync LLM
rag2 = StreamingRAG(
    rag_system=my_rag_system,
    llm=MySyncLLM(),
    chunk_size=5,            # override via constructor shortcut
)
print(repr(rag2))
# StreamingRAG(llm=sync, chunk_size=5)

# ═══════════════════════════════════════════════════════════════════
# 12. Fallback (no LLM — word-level simulation)
# ═══════════════════════════════════════════════════════════════════
rag3 = StreamingRAG(
    rag_system=my_rag_system,   # must expose .generate()
    # no llm provided — Strategy 3 (simulation) activated
    config=StreamConfig(chunk_size=3, sim_delay=0.05),
)
print(repr(rag3))
# StreamingRAG(llm=sim, chunk_size=3)

for chunk in rag3.stream("Quick question for simulation mode"):
    print(chunk, end="", flush=True)
print()

Simple Real Example

from fennec_community.llm import GeminiInterface
from fennec_community.document_loaders import TextLoader 
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem 
from fennec_community.rag.types.streaming_rag import StreamingRAG, StreamConfig, EventType

loader_1 = TextLoader("./data_kn/faq.txt").load()
chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = GeminiInterface(api_key=llm_api)
context_manager = ContextManager()
rag_system = RAGSystem(llm=llm, vector_db=vector_db,chunker=chunker, context_manager=context_manager)

rag_system.add_documents(loader_1)
config = StreamConfig(
    chunk_size=3,
    sim_delay=0.05,
    retrieval_timeout=15.0,
    generation_timeout=90.0,
    max_context_docs=5,
    max_doc_chars=800,
)

def on_chunk_side_effect(token: str):
    # مثال: إرسال عبر WebSocket
    # websocket.send(token)
    pass  # ← لا طباعة هنا

def on_event_side_effect(event):
    if event.type == EventType.ERROR:
        print("خطأ في %s: %s", event.data["stage"], event.data["msg"])


rag = StreamingRAG(
    rag_system=rag_system,
    llm=llm,
    config=config,
    on_chunk=on_chunk_side_effect,   # جانب خفي فقط
    on_event=on_event_side_effect,
)
for token in rag.stream("ماهي طرق الدفع المتاحة؟"):
    print(token, end="", flush=True)   # ← مكان الطباعة الوحيد
print()
Source: community/rag/streaming_rag.md