Fennec Logo Fennec
Fennec Memory memory/cache_module_docs.md

cache_module_docs

Multi-Level Intelligent LLM Cache System


Table of Contents

  1. Overview
  2. System Architecture
  3. Core Concepts
  4. Quick Start Guide
  5. Configuration Reference
  6. Public API Reference
  7. Security Model
  8. Storage Backends
  9. Observability & Metrics
  10. Advanced Usage
  11. Edge Cases & Failure Handling
  12. MultiLevelCache — General-Purpose Cache Layer
  13. Low-Level Data Models
  14. CacheManager vs MultiLevelCache

1. Overview

The Fennec Cache Module is a production-grade, multi-level intelligent caching system designed specifically for LLM pipelines. Rather than paying the cost—in latency and money—of an LLM call for every request, Fennec intercepts queries before they reach the model, serves cached responses when semantically equivalent answers already exist, and learns over time which responses are most valuable to keep.

The system combines three complementary lookup strategies (exact key matching, persistent storage lookup, and FAISS-powered semantic vector search) with a Reinforcement Learning eviction policy that continuously improves cache quality based on user and system feedback. All of this is wrapped in a multi-tenant security layer with per-tenant quota enforcement, PII scrubbing, and HMAC content integrity verification.

Why It Exists

LLM APIs charge per token and incur hundreds of milliseconds of latency per call. Real-world applications—chatbots, RAG pipelines, APIs—frequently receive semantically identical or near-identical queries that do not need a fresh LLM response. Fennec exploits this by storing and reusing responses intelligently, reducing cost by orders of magnitude while delivering sub-millisecond latency for cache hits.

Real-World Use Cases

Scenario Benefit
Chatbots / Q&A systems Serve repeated questions instantly; eliminate redundant LLM spend
RAG pipelines Cache retrieved responses alongside embeddings for repeated document queries
Multi-tenant SaaS Full tenant isolation with per-tenant quotas, shared-cache opt-in
High-traffic APIs Reduce p99 latency from seconds to milliseconds for hot query paths
Cost monitoring Real-time USD savings tracking and ROI reporting per tenant

2. System Architecture

Pipeline Overview

Every get() call traverses the following ordered pipeline, short-circuiting at the first hit:

Query
  │
  ▼
SecurityGuard          ← validate query; reject injections, length violations
  │
  ▼
TenantManager          ← check & charge RPM quota
  │
  ▼
QueryNormalizer        ← Unicode NFC, lowercase, punctuation strip,
  │                      synonym expansion → canonical form + SHA-256 key
  ▼
L1 Exact Cache         ← in-process OrderedDict LRU (fastest path, ~μs)
  │  MISS
  ▼
Storage Exact Lookup   ← Redis / SQLite / Memory (persistent exact match)
  │  MISS                → promote hit to L1
  ▼
EmbeddingIndex         ← embed query (coalesced for concurrent requests)
  │                      → FAISS cosine similarity search
  ▼
PolicyLearner          ← Thompson Sampling re-ranks candidates by expected reward
  │
  ▼
DecisionEngine         ← cost-aware utility function; decides SEMANTIC_HIT vs LLM_FALLBACK
  │
  ├── CACHE HIT  → return CacheLookupResult (hit=True)
  │
  └── LLM_FALLBACK → caller invokes LLM → put() → persist + embed + index

Store Pipeline (put())

SecurityGuard          ← validate query & response
  │
  ▼
PII Scrubber           ← redact sensitive data (if enabled)
  │
  ▼
QuotaCheck             ← entry count + memory headroom
  │
  ▼
QueryNormalizer        ← normalize + compute exact key
  │
  ▼
CachedEmbedder         ← embed normalized query vector
  │
  ▼
CostModel              ← compute cost record (embedding + LLM costs)
  │
  ▼
IntelligentCacheEntry  ← build entry with RL bandit arm, HMAC hash, metadata
  │
  ▼
L1 put + Storage.set + EmbeddingIndex.add
  │
  ▼
QuotaAccounting        ← increment entry count + charge memory

Component Responsibilities

Component Responsibility
_L1ExactCache In-process LRU OrderedDict; sub-microsecond exact lookup
BaseStorage (+ backends) Persistent exact-match store; cross-process durability
EmbeddingIndex FAISS vector index; semantic nearest-neighbour search
CachedEmbedder Embedding model wrapper with in-process LRU for vectors
_InflightCoalescer Deduplicates concurrent embedding calls for identical queries
QueryNormalizer Canonical form transformation; deterministic SHA-256 key generation
CachePolicyLearner Thompson Sampling bandit; ranks candidates; drives eviction
DecisionEngine Cost/quality/latency utility function; makes final routing decision
TenantManager Registration, quota enforcement, namespace isolation
SecurityGuard Injection detection, PII scrubbing, HMAC integrity verification
CacheMetricsCollector Thread-safe hit/miss/latency/cost counters

Design Philosophy

Fennec is async-first: the full pipeline is built around asyncio with thread-safe primitives for background tasks (eviction timer, coalescer). Sync wrappers (get_sync, put_sync, feedback_sync) are provided for non-async callers. The system is pluggable—storage backends, embedding providers, and eviction policies are all swappable without changing application code. All quota and access decisions are fail-closed: a quota breach or security violation returns LLM_FALLBACK rather than raising, so the application always has a safe path forward.


3. Core Concepts

3.1 Multi-Level Caching

Fennec implements three cache layers with different speed/durability tradeoffs:

L1 — In-Process Exact Cache An OrderedDict-backed LRU cache keyed on the SHA-256 hash of the normalized query, scoped per tenant. Lookups are in-process memory accesses (~microseconds). Capacity is bounded by l1_max_items (default: 512). Entries are automatically promoted from Storage on a hit to warm the L1.

Storage Exact Lookup Persistent exact-match lookup against the configured backend (Redis, SQLite, or Memory). Used when the L1 is cold or the entry was evicted. A hit here promotes the entry back to L1 for subsequent requests.

L2 — Semantic Search (FAISS) When exact match fails, the query is embedded into a dense vector and searched against a FAISS index using cosine similarity. Candidates above semantic.similarity_threshold * 0.80 are loaded, filtered for tenant accessibility and integrity, and re-ranked by the RL policy. The DecisionEngine then decides whether the best candidate is good enough to serve or whether an LLM call is required.

3.2 Semantic Search & Embeddings

Query embedding is performed by CachedEmbedder, which wraps the configured embedding provider with an in-process LRU (embedding_cache_size, default: 4096 vectors). Concurrent requests for the same query are coalesced by _InflightCoalescer so the embedding model is called exactly once per unique query in flight, regardless of concurrency.

Supported embedding providers:

Provider Value Notes
OpenAI EmbeddingProvider.OPENAI Requires OPENAI_API_KEY; default model text-embedding-3-small
HuggingFace EmbeddingProvider.HUGGINGFACE Sentence-transformers; runs locally
Ollama EmbeddingProvider.OLLAMA Local inference at ollama_url
Mock EmbeddingProvider.MOCK Deterministic random vectors; for tests and offline use

3.3 RL-Based Caching (Thompson Sampling)

Every IntelligentCacheEntry carries a BanditArm—a Beta distribution parameterised by (alpha, beta) representing accumulated successes and failures. The system uses Thompson Sampling: each arm is scored by drawing a sample from Beta(alpha, beta), and candidates are ranked by this sampled reward. This provides principled exploration (low-confidence entries can still win) while favouring entries with a strong positive feedback history.

Feedback signals update the bandit arm:

  • positive=Truealpha += magnitude
  • positive=Falsebeta += magnitude

The adaptive_threshold feature adjusts the cosine similarity threshold automatically: positive feedback relaxes it (finds more hits), negative feedback tightens it (reduces false positives).

3.4 Multi-Tenancy & Isolation

Every entry is tagged with a tenant_id. The TenantManager enforces three independent quota dimensions per tenant: memory (MB), entry count, and requests per minute (RPM). Namespace keys are prefixed as tenant_id::key, providing hard storage-level separation. A "default" tenant is always registered for single-tenant deployments.

Cross-tenant shared cache access is opt-in: a tenant must have allow_shared_read=True to read entries marked is_shared=True, and allow_shared_write=True to publish shared entries. The is_accessible() method on TenantManager enforces these rules uniformly across all lookup paths.

3.5 Quota Enforcement

Three quota types are enforced:

Quota Unit Enforcement Point
request_quota_rpm Requests per minute get() entry (before any work)
max_entries Entry count put() before storing
memory_quota_mb Megabytes put() before storing

When any quota is exceeded, QuotaExceeded is raised internally and the operation returns LLM_FALLBACK (for get()) or None (for put()). An optional quota_event_hook callback fires on each violation for integration with external alerting.

Unknown tenant IDs are auto-registered with default quotas and a WARNING log entry. For production multi-tenant deployments, register tenants explicitly before first use.

QuotaExceeded Exception

The QuotaExceeded exception exposes structured context for alerting and logging:

class QuotaExceeded(Exception):
    tenant_id: str   # identifier of the tenant that exceeded quota
    reason: str      # human-readable reason (e.g., "RPM quota 1000 exceeded")
from fennec_memory.cache import QuotaExceeded

try:
    tenant_manager.check_and_charge_request("acme")
except QuotaExceeded as e:
    print(f"Tenant {e.tenant_id} exceeded quota: {e.reason}")

3.6 Eviction

A background threading.Timer runs the eviction cycle every eviction.check_interval_s seconds (default: 300). Each cycle:

  1. Removes expired entries (TTL exceeded).
  2. Removes entries exceeding eviction.max_age_s (if configured).
  3. Evicts the bottom 5% of entries by composite eviction_score (or more if storage exceeds the soft cap).
  4. Applies RL reward decay (usage_decay) to all surviving entries.

The composite eviction score combines bandit expected reward, normalized usage count, and recency, weighted by reward_weight, usage_weight, and recency_weight.


4. Quick Start Guide

Installation

pip install fennec-memory

Minimal Working Example

import asyncio
from fennec_memory.cache import CacheManager, CacheConfig, StorageBackend
from fennec_memory.cache import EmbeddingConfig, EmbeddingProvider

async def main():
    # 1. Create manager (async factory; warms up embedding model)
    config = CacheConfig(
        storage_backend=StorageBackend.SQLITE,
        embedding=EmbeddingConfig(provider=EmbeddingProvider.OPENAI),
        default_ttl_s=3600.0,
    )
    manager = await CacheManager.create(config)

    query = "What is retrieval-augmented generation?"

    # 2. Attempt cache lookup
    result = await manager.get(query)

    if result.hit:
        answer = result.response
        print(f"Cache hit ({result.decision.value}), saved ${result.cost_saved_usd:.4f}")
    else:
        # 3. LLM fallback
        answer = await your_llm_call(query)

        # 4. Store for future requests
        await manager.put(query, answer, response_tokens=350)

    # 5. Teardown
    await manager.aclose()

asyncio.run(main())

Sync Usage (Flask, scripts)

# No async context needed
result = manager.get_sync("What is RAG?")
if not result.hit:
    answer = your_llm_call_sync("What is RAG?")
    manager.put_sync("What is RAG?", answer)

5. Configuration Reference

All configuration is handled through a single root CacheConfig dataclass. Pass one instance to CacheManager.create().

CacheConfig — Root Configuration

from fennec_memory.cache import CacheConfig, StorageBackend

config = CacheConfig(
    storage_backend=StorageBackend.SQLITE,   # storage backend selection
    l1_max_items=512,                        # L1 in-process LRU capacity
    default_ttl_s=3600.0,                    # default entry TTL in seconds
    log_level="INFO",                        # logging level
)
Field Type Default Description
storage_backend StorageBackend SQLITE Persistent backend: REDIS, SQLITE, or MEMORY
l1_max_items int 512 Maximum entries in the in-process L1 LRU
l2_max_items int 4096 Maximum entries in the L2 in-process cache
default_ttl_s Optional[float] 3600.0 Default TTL for stored entries (seconds). None = no expiry
log_level str "INFO" Python logging level
embedding EmbeddingConfig see below Embedding provider settings
semantic SemanticConfig see below Similarity thresholds and index settings
cost CostConfig see below Token pricing and utility weights
rl RLConfig see below Thompson Sampling hyperparameters
tenant TenantConfig see below Default tenant quotas
security SecurityConfig see below Security and PII settings
eviction EvictionConfig see below Eviction policy and scheduling
redis RedisConfig see below Redis connection settings
sqlite SQLiteConfig see below SQLite file and performance settings

from_env() — Environment Variable Loading

config = CacheConfig.from_env()
Environment Variable Config Field Default
CACHE_L1_MAX_ITEMS l1_max_items 100
CACHE_L2_MAX_ITEMS l2_max_items 1000
CACHE_L3_MAX_ITEMS l3_max_items 10000
CACHE_DIR cache_dir ./cache_storage
CACHE_DEFAULT_TTL default_ttl_s None
OPENAI_API_KEY embedding.openai_api_key
REDIS_HOST redis.host localhost
REDIS_PORT redis.port 6379
REDIS_DB redis.db 0
REDIS_PASSWORD redis.password
REDIS_SSL redis.ssl false
SQLITE_PATH sqlite.db_path ./fennec_cache.db
FENNEC_HMAC_SECRET security.hmac_secret
LOG_LEVEL log_level INFO

to_dict() — Serialisation

d = config.to_dict()

Converts the full CacheConfig to a plain dictionary. Useful for logging, auditing, or persisting configuration state.


EmbeddingConfig

Controls the embedding model used for semantic search.

from fennec_memory.cache import EmbeddingConfig, EmbeddingProvider

EmbeddingConfig(
    provider=EmbeddingProvider.OPENAI,
    model_name="text-embedding-3-small",
    dimension=1536,
    batch_size=64,
    cache_embeddings=True,
    embedding_cache_size=4096,
    request_timeout=10.0,
)
Field Type Default Description
provider EmbeddingProvider MOCK Embedding backend
model_name str "text-embedding-3-small" Model identifier (for OpenAI)
dimension int 1536 Vector dimension; must match the chosen model
batch_size int 64 Embedding batch size
cache_embeddings bool True Enable in-process LRU for computed vectors
embedding_cache_size int 4096 LRU capacity for cached vectors
openai_api_key Optional[str] $OPENAI_API_KEY API key (auto-read from env)
hf_model_name str "sentence-transformers/all-MiniLM-L6-v2" HuggingFace model name
ollama_url str "http://localhost:11434" Ollama server URL
request_timeout float 10.0 HTTP timeout for embedding requests (seconds)

SemanticConfig

Controls the FAISS vector index and similarity threshold behaviour.

Field Type Default Description
similarity_threshold float 0.85 Minimum cosine similarity to accept a semantic hit
adaptive_threshold bool True Auto-tune threshold based on feedback signals
threshold_min float 0.70 Minimum adaptive threshold floor
threshold_max float 0.97 Maximum adaptive threshold ceiling
threshold_step float 0.01 Step size for each threshold adjustment
index_type str "flat" FAISS index type: "flat", "ivf", or "hnsw"
max_index_size int 100_000 Maximum vectors in the FAISS index

SecurityConfig

Field Type Default Description
max_query_length int 8192 Maximum allowed query character length
max_response_length int 65536 Maximum allowed response character length
enable_injection_check bool True Enable prompt injection detection
injection_patterns_path Optional[str] None Path to a file of custom regex patterns (one per line)
enable_pii_scrub bool False Enable PII redaction before storage
hmac_secret Optional[str] $FENNEC_HMAC_SECRET HMAC signing key for entry integrity verification

TenantConfig

Defines default quotas applied to auto-registered or unspecified tenants.

Field Type Default Description
default_memory_quota_mb float 512.0 Default memory quota per tenant (MB)
default_request_quota_rpm int 1000 Default requests-per-minute limit
default_max_entries int 10_000 Default maximum entry count per tenant
enable_shared_cache bool False Grant default tenant shared read/write access
isolation_strict bool True Enforce hard namespace separation

RLConfig

Thompson Sampling hyperparameters for the bandit policy.

Field Type Default Description
prior_alpha float 1.0 Beta distribution prior for successes (uniform prior = no prior knowledge)
prior_beta float 1.0 Beta distribution prior for failures
exploration_bonus float 0.05 Additional reward bonus for under-explored entries
positive_feedback_reward float 1.0 Reward magnitude added on positive feedback
negative_feedback_penalty float 1.0 Penalty magnitude added on negative feedback
similarity_bonus_scale float 0.3 Reward multiplier for high-similarity hits
usage_decay float 0.995 Multiplicative reward decay applied each eviction cycle
min_reward_to_keep float 0.05 Entries with expected reward below this are eligible for eviction

EvictionConfig

Field Type Default Description
policy EvictionPolicy REWARD_LRU Eviction strategy: REWARD_LRU, LRU, LFU, or TTL
check_interval_s int 300 Seconds between eviction cycle runs
max_age_s Optional[int] None Hard maximum age for any entry; None = unlimited
reward_weight float 0.50 Weight of bandit reward in composite eviction score
usage_weight float 0.30 Weight of usage count in composite eviction score
recency_weight float 0.20 Weight of recency (time since last access) in composite score

RedisConfig

Field Type Default / Env
host str $REDIS_HOSTlocalhost
port int $REDIS_PORT6379
db int $REDIS_DB0
password Optional[str] $REDIS_PASSWORD
ssl bool $REDIS_SSLfalse
socket_timeout float 2.0
max_connections int 50
key_prefix str "fennec:"

SQLiteConfig

Field Type Default / Env
db_path str $SQLITE_PATH./fennec_cache.db
wal_mode bool True (Write-Ahead Logging; better concurrency)
cache_size_kb int 65536 (64 MB page cache)

PerformanceConfig

Controls low-level async and concurrency behaviour. These settings tune how the system handles inflight requests, parallel embedding calls, and I/O threading. In most cases the defaults are appropriate; adjust only when profiling indicates a bottleneck.

from fennec_memory.cache import PerformanceConfig

PerformanceConfig(
    enable_async=True,
    coalescing_window_ms=10,
    embedding_batch_size=32,
    io_threads=4,
    max_concurrent_llm_calls=20,
)
Field Type Default Description
enable_async bool True Enable async execution mode. Set to False only for purely synchronous deployments where no event loop is ever present.
coalescing_window_ms int 10 Time window in milliseconds during which concurrent identical embedding requests are deduplicated by _InflightCoalescer. Higher values increase deduplication efficiency at the cost of added latency.
embedding_batch_size int 32 Number of texts to embed in a single provider call. Larger batches reduce HTTP overhead; smaller batches reduce per-request latency variance.
io_threads int 4 Size of the thread pool used for storage I/O operations executed via asyncio.to_thread. Increase for high-concurrency deployments with slow storage.
max_concurrent_llm_calls int 20 Maximum number of in-flight LLM calls the system will allow simultaneously. Requests above this limit queue until a slot is free.

Note: PerformanceConfig is nested inside CacheConfig as the performance field and is automatically constructed with defaults. Pass an explicit instance only when you need non-default values.

from fennec_memory.cache import CacheConfig, PerformanceConfig

config = CacheConfig(
    performance=PerformanceConfig(
        coalescing_window_ms=20,   # longer window for very high concurrency
        io_threads=8,
        max_concurrent_llm_calls=50,
    )
)

6. Public API Reference

CacheManager.create

Async factory method. The preferred way to instantiate CacheManager. Builds all subsystems and warms up the embedding model with a no-op call.

@classmethod
async def create(cls, config: Optional[CacheConfig] = None) -> "CacheManager"

Parameters

Name Type Required Description
config CacheConfig No Full configuration object. Uses defaults if None.

Returns A fully initialised CacheManager instance, ready for use.

Behaviour Constructs all subsystems (normalizer, security guard, embedder, storage, policy learner, cost model, tenant manager, metrics collector, coalescer, L1 cache, vector index). Starts the background eviction timer. Sends a "warmup" string to the embedding model to pre-load it. If warmup fails, a WARNING is logged and initialisation continues normally.

Important: Always use CacheManager.create() rather than calling __init__ directly. The factory guarantees embedding model warmup and proper subsystem wiring.

from fennec_memory.cache import CacheManager, CacheConfig, StorageBackend
from fennec_memory.cache import EmbeddingConfig, EmbeddingProvider

config = CacheConfig(
    storage_backend=StorageBackend.SQLITE,
    embedding=EmbeddingConfig(
        provider=EmbeddingProvider.OPENAI,
        model_name="text-embedding-3-small",
    ),
    default_ttl_s=3600.0,
)

manager = await CacheManager.create(config)

CacheManager.get

Primary cache lookup. Traverses all cache layers in order, returning a routing decision on every call.

async def get(
    self,
    query: str,
    tenant_id: str = "default",
    top_k: int = 1,
) -> CacheLookupResult

Parameters

Name Type Required Description
query str Yes Raw query string from the user or application
tenant_id str No Tenant namespace. Defaults to "default".
top_k int No Number of semantic candidates to retrieve from FAISS (default: 1)

Returns CacheLookupResult

Field Type Description
hit bool True if a cached response was found
decision RoutingDecision EXACT_HIT, SEMANTIC_HIT, or LLM_FALLBACK
entry Optional[IntelligentCacheEntry] The matched entry, or None on a miss
similarity float Cosine similarity score (1.0 for exact hits)
latency_ms float Total lookup time in milliseconds
cost_saved_usd float Estimated USD saved by avoiding an LLM call
response Any Shortcut property: entry.response if entry is not None

Internal Lookup Sequence

  1. SecurityGuard.validate_query() — rejects injections; returns LLM_FALLBACK on violation without raising.
  2. TenantManager.check_and_charge_request() — deducts one RPM token; returns LLM_FALLBACK if quota exceeded.
  3. QueryNormalizer.normalize()exact_cache_key() — produce canonical form and SHA-256 key.
  4. _L1ExactCache.get() — in-process LRU lookup; verifies HMAC integrity; evicts corrupted entries.
  5. Storage.get() — persistent exact lookup; promotes hit to L1.
  6. CachedEmbedder.embed_single() (coalesced) — embed the normalized query.
  7. EmbeddingIndex.search() — FAISS nearest-neighbour search; filters by similarity_threshold * 0.80.
  8. Load and filter candidates: expired, inaccessible, and integrity-failed entries are discarded.
  9. CachePolicyLearner.rank_candidates() — Thompson Sampling re-ranks surviving candidates.
  10. DecisionEngine.decide() — applies cost/quality/latency utility function; returns SEMANTIC_HIT or LLM_FALLBACK.
  11. Promote hit to L1; record metrics.
result = await manager.get("What is retrieval-augmented generation?", tenant_id="acme")

if result.hit:
    print(f"[{result.decision.value}] similarity={result.similarity:.2f}")
    print(f"Saved: ${result.cost_saved_usd:.4f} | latency: {result.latency_ms:.1f}ms")
    answer = result.response
else:
    # Call your LLM here
    answer = await your_llm(query)
    await manager.put(query, answer, tenant_id="acme")

CacheManager.put

Stores a query-response pair. Call this immediately after a successful LLM call when get() returns hit=False.

async def put(
    self,
    query: str,
    response: Any,
    tenant_id: str = "default",
    ttl_s: Optional[float] = None,
    quality_score: float = 1.0,
    response_tokens: int = 0,
    is_shared: bool = False,
    input_tokens: int = 0,
) -> Optional[IntelligentCacheEntry]

Parameters

Name Type Required Description
query str Yes Original raw query string
response Any Yes LLM response to cache
tenant_id str No Tenant namespace. Defaults to "default".
ttl_s Optional[float] No Entry TTL in seconds. Falls back to config.default_ttl_s if None.
quality_score float No External quality signal in [0, 1]. Used to seed the bandit arm.
response_tokens int No Approximate output token count (for cost tracking)
is_shared bool No Mark entry as readable by other tenants with allow_shared_read=True
input_tokens int No Approximate input token count (for cost tracking)

Returns IntelligentCacheEntry on success, or None if rejected by security or quota.

Internal Store Sequence

  1. SecurityGuard.validate_query() and validate_response().
  2. SecurityGuard.scrub_pii() on query and response (if enable_pii_scrub=True).
  3. TenantManager.check_entry_quota() and check_memory_quota().
  4. Normalize query → compute exact key.
  5. Embed normalized query (no coalescing; each put() computes its own vector).
  6. Build CostRecord from token counts and configured pricing.
  7. Construct IntelligentCacheEntry with bandit arm, content hash, and metadata.
  8. Write to L1, Storage, and FAISS index.
  9. Update tenant memory and entry count quotas.
entry = await manager.put(
    query="Explain transformer attention mechanism",
    response=llm_answer,
    tenant_id="acme",
    ttl_s=7200.0,
    quality_score=0.95,
    response_tokens=350,
    input_tokens=12,
)

if entry:
    print(f"Stored: {entry.entry_id[:8]}... cost={entry.cost_record.total_usd:.6f} USD")

CacheManager.feedback

Records a quality signal for a cached entry. Updates the Thompson Sampling bandit arm and adjusts the adaptive similarity threshold. Use this whenever you have a signal about response quality—user ratings, LLM-as-judge scores, or implicit engagement metrics.

async def feedback(
    self,
    entry_id: str,
    positive: bool,
    magnitude: float = 1.0,
    tenant_id: str = "default",
    source: str = "user",
) -> None

Parameters

Name Type Required Description
entry_id str Yes entry_id from CacheLookupResult.entry.entry_id
positive bool Yes True = response was good; False = response was wrong or unhelpful
magnitude float No Signal strength in [0, ∞). Default 1.0. Use higher values for high-confidence signals.
tenant_id str No Tenant namespace
source str No Signal origin: "user", "llm_eval", or "auto"

Returns None. Fire-and-forget; does not raise on unknown entry_id.

Internal Behaviour

  1. Loads the entry from Storage or L1.
  2. Calls CachePolicyLearner.record_feedback() → updates bandit_arm.alpha (positive) or bandit_arm.beta (negative).
  3. Updates confidence_score on the entry.
  4. Adjusts the adaptive similarity threshold: positive feedback relaxes it by threshold_step * 0.5; negative feedback tightens it by threshold_step.
  5. Persists the updated entry back to Storage.

If entry_id is not found (e.g., TTL expired), logs a WARNING and returns silently.

result = await manager.get(query, tenant_id="acme")

if result.hit:
    answer = result.response
    # After user interaction...
    user_satisfied = True  # e.g., from thumbs-up button

    await manager.feedback(
        entry_id=result.entry.entry_id,
        positive=user_satisfied,
        magnitude=1.0,
        tenant_id="acme",
        source="user",
    )

CacheManager.get_sync / put_sync / feedback_sync

Synchronous wrappers for non-async callers. Suitable for use in Flask views, Django handlers, Celery tasks, scripts, and Jupyter notebooks.

def get_sync(self, query: str, tenant_id: str = "default") -> CacheLookupResult

def put_sync(
    self,
    query: str,
    response: Any,
    tenant_id: str = "default",
    **kwargs,       # same keyword arguments as put()
) -> Optional[IntelligentCacheEntry]

def feedback_sync(
    self,
    entry_id: str,
    positive: bool,
    tenant_id: str = "default",
) -> None

Behaviour Each method calls _run_sync(), which detects the calling context:

  • If a running event loop exists in the current thread (FastAPI, Jupyter): submits via asyncio.run_coroutine_threadsafe() and blocks on the returned Future.
  • Otherwise (plain script, thread pool, Celery worker): uses asyncio.run() to create an isolated loop for the duration of the call.

Warning: Do not call sync wrappers from inside an async def function. If you are already in an async context, use the async methods directly.

# Flask route
@app.route("/ask")
def ask():
    query = request.args["q"]
    result = manager.get_sync(query, tenant_id="webapp")

    if result.hit:
        return jsonify({"answer": result.response, "cached": True})

    answer = call_llm_sync(query)
    manager.put_sync(query, answer, tenant_id="webapp", response_tokens=300)
    return jsonify({"answer": answer, "cached": False})

CacheManager.register_tenant

Registers a new tenant with custom quotas and permissions.

def register_tenant(self, reg: TenantRegistration) -> None

Parameters

Name Type Required Description
reg TenantRegistration Yes Tenant registration data

TenantRegistration Fields

Field Type Default Description
tenant_id str Required Unique tenant identifier
display_name str "" Human-readable tenant name
memory_quota_mb float 512.0 Memory quota in MB
max_entries int 10_000 Maximum entry count
request_quota_rpm int 1_000 Requests per minute limit
allow_shared_read bool False Can this tenant read is_shared=True entries from other tenants?
allow_shared_write bool False Can this tenant publish is_shared=True entries?
custom_ttl_s Optional[float] None Override default TTL for this tenant's entries
metadata Dict[str, str] {} Arbitrary metadata for billing or routing

If a tenant_id already exists, the registration is updated and a warning is logged.

from fennec_memory.cache import TenantRegistration

manager.register_tenant(TenantRegistration(
    tenant_id="enterprise_client_a",
    display_name="ACME Corp",
    memory_quota_mb=2048.0,
    max_entries=50_000,
    request_quota_rpm=5_000,
    allow_shared_read=True,
))

CacheManager.flush_tenant

Evicts all cache entries belonging to a tenant. Removes from Storage, L1, and the FAISS vector index. Executes synchronously.

def flush_tenant(self, tenant_id: str) -> int

Parameters

Name Type Required Description
tenant_id str Yes Tenant whose entries should be removed

Returns int — number of entries deleted.

removed = manager.flush_tenant("enterprise_client_a")
print(f"Flushed {removed} entries")

CacheManager.get_metrics

Returns a full system-wide metrics snapshot. All counters are cumulative since the CacheManager was created.

def get_metrics(self) -> Dict[str, object]

Returned Keys

Key Type Description
total_requests int Total get() calls
overall_hit_rate float Fraction of requests served from cache (0–1)
exact_hit_rate float Fraction served by exact match
semantic_hit_rate float Fraction served by semantic match
llm_fallback_rate float Fraction routed to LLM
total_saved_usd float Cumulative USD saved by cache hits
roi_multiplier float total_saved_usd / total_spent_usd
latency_overall dict Histogram with p50_ms, p90_ms, p99_ms
latency_exact dict Latency histogram for exact-hit requests
latency_semantic dict Latency histogram for semantic-hit requests
vector_index_size int Number of vectors in the FAISS index
l1_size int Current entry count in L1
sim_threshold float Current adaptive similarity threshold
tenants list Per-tenant stats (see get_tenant_metrics)
errors dict Error counts keyed by type (e.g., "security_violation", "quota_exceeded")
decision_engine dict Decision engine internal stats
policy_learner dict RL policy stats (reward mean, p10, p90, feedback rate)
metrics = manager.get_metrics()

print(f"Hit rate:  {metrics['overall_hit_rate']:.1%}")
print(f"ROI:       {metrics['roi_multiplier']}x")
print(f"p99 latency: {metrics['latency_overall']['p99_ms']:.1f}ms")
print(f"Saved: ${metrics['total_saved_usd']:.2f}")

if metrics["errors"].get("security_violation", 0) > 100:
    alert("High rate of injection attempts detected")

CacheManager.get_tenant_metrics

Per-tenant metrics snapshot.

def get_tenant_metrics(self, tenant_id: str) -> Dict[str, object]

Returned Keys

Key Type Description
tenant_id str Tenant identifier
rpm float Requests in the last 60 seconds
cost_saved float USD saved for this tenant
memory_used_mb float Current memory usage
memory_quota_mb float Configured memory limit
memory_pct float Memory utilisation percentage (0–100)
entry_count int Number of entries owned by this tenant
max_entries int Configured entry limit
requests_this_min int Requests in the current minute window
rpm_quota int Configured RPM limit

CacheManager.close / aclose

Releases all resources. Stops the background eviction timer and closes the storage connection.

def close(self) -> None
async def aclose(self) -> None

Supports use as a context manager:

# Sync context manager
with manager:
    result = manager.get_sync("question")

# Async context (manual)
await manager.aclose()

QueryNormalizer

Transforms raw query strings into a canonical form used as the cache key and embedding input. Ensures that minor surface variations (casing, punctuation, synonyms) map to the same cache entry. Supports Unicode handling for both English and Arabic stop-words.

class QueryNormalizer:
    def __init__(self, config: Optional[NormalizationConfig] = None) -> None
    def normalize(self, query: str) -> str
    def exact_cache_key(self, tenant_id: str, normalized_query: str) -> str

Normalization Pipeline (applied in order)

  1. Unicode NFC normalization
  2. Remove control and zero-width characters
  3. Lowercase
  4. Remove punctuation (default: enabled)
  5. Collapse whitespace
  6. Synonym expansion (e.g., "llm""large language model")
  7. Stop-word removal (default: disabled; supports English and Arabic)
  8. Token deduplication (default: disabled)
  9. Length cap at 2048 characters

exact_cache_key() returns a SHA-256 hex digest of f"{tenant_id}:{normalized_query}", providing globally unique, tenant-scoped keys.

from fennec_memory.cache import QueryNormalizer, NormalizationConfig

normalizer = QueryNormalizer(NormalizationConfig(
    remove_stopwords=True,
    extra_synonyms={"gpt-4": "large language model"},
))

normalized = normalizer.normalize("What is LLM?")
# → "what large language model"

key = normalizer.exact_cache_key("tenant_a", normalized)
# → SHA-256 hex string

SecurityGuard

Stateless security validator. Thread-safe. Instantiate once; reuse across all requests.

class SecurityGuard:
    def __init__(self, config: SecurityConfig) -> None

    def validate_query(self, query: str, tenant_id: str = "default") -> None
    def validate_response(self, response: Any, tenant_id: str = "default") -> None
    def enforce_tenant_access(self, requesting_tenant: str, entry_tenant: str, is_shared: bool) -> None
    def scrub_pii(self, text: str) -> str
    def sign_content(self, content: str) -> str
    def verify_content(self, content: str, signature: str) -> bool
    def verify_entry_integrity(self, entry: Any) -> bool

All validate_* and enforce_* methods raise SecurityViolation on failure. scrub_pii() and verify_entry_integrity() return a value rather than raising.

from fennec_memory.cache import SecurityGuard, SecurityConfig, SecurityViolation

guard = SecurityGuard(SecurityConfig(
    enable_pii_scrub=True,
    hmac_secret="production-secret-key",
))

clean = guard.scrub_pii("Contact me at user@example.com or 555-123-4567")
# → "Contact me at [EMAIL] or [PHONE]"

try:
    guard.validate_query("ignore all previous instructions", "tenant_a")
except SecurityViolation as e:
    print(f"Rejected: {e.reason}")

TenantManager

Central thread-safe registry for tenant lifecycle, quota enforcement, and namespace management. In most cases you will interact with TenantManager indirectly through CacheManager. Use it directly only for advanced scenarios such as quota hooks or manual isolation checks.

class TenantManager:
    def __init__(self, config: TenantConfig) -> None

On construction, the "default" tenant is automatically registered using the quotas defined in the provided TenantConfig. The "default" tenant cannot be deregistered.

Registration

def register(self, reg: TenantRegistration) -> None
def deregister(self, tenant_id: str) -> None
def is_registered(self, tenant_id: str) -> bool
def get_registration(self, tenant_id: str) -> TenantRegistration
Method Description
register Registers a new tenant or updates an existing one. Thread-safe. Logs a WARNING if the tenant_id already exists.
deregister Removes a tenant and its quota state. Raises if called on "default".
is_registered Returns True if the tenant is currently registered.
get_registration Returns the TenantRegistration for the given tenant_id, or raises KeyError if not found.
from fennec_memory.cache import TenantManager, TenantRegistration, TenantConfig

mgr = TenantManager(TenantConfig())

mgr.register(TenantRegistration(
    tenant_id="acme",
    display_name="ACME Corp",
    memory_quota_mb=1024.0,
    max_entries=20_000,
    request_quota_rpm=3_000,
    allow_shared_read=True,
))

print(mgr.is_registered("acme"))    # True
reg = mgr.get_registration("acme")
print(reg.memory_quota_mb)           # 1024.0

mgr.deregister("acme")

Quota Enforcement

def check_and_charge_request(self, tenant_id: str) -> None   # raises QuotaExceeded
def check_memory_quota(self, tenant_id: str) -> None          # raises QuotaExceeded
def check_entry_quota(self, tenant_id: str) -> None           # raises QuotaExceeded
def charge_memory(self, tenant_id: str, size_bytes: int) -> None
def release_memory(self, tenant_id: str, size_bytes: int) -> None
def increment_entries(self, tenant_id: str) -> None
def decrement_entries(self, tenant_id: str) -> None
Method Description
check_and_charge_request Verifies RPM quota and deducts one request. Raises QuotaExceeded if the limit is reached.
check_memory_quota Raises QuotaExceeded if the tenant's memory usage has reached memory_quota_mb.
check_entry_quota Raises QuotaExceeded if the tenant's entry count has reached max_entries.
charge_memory Increments the tenant's tracked memory usage by size_bytes.
release_memory Decrements tracked memory usage; floors at 0.
increment_entries Increments the entry counter by 1.
decrement_entries Decrements the entry counter by 1; floors at 0.

Namespace / Key Management

def namespace_key(self, tenant_id: str, key: str) -> str
def extract_tenant(self, namespaced_key: str) -> str
Method Description
namespace_key Returns a globally unique key in the form tenant_id::key.
extract_tenant Parses the tenant_id from a namespaced key. Returns "default" if the key contains no :: separator.
ns_key = mgr.namespace_key("acme", "query_abc123")
# → "acme::query_abc123"

tenant = mgr.extract_tenant("acme::query_abc123")
# → "acme"

tenant = mgr.extract_tenant("orphan_key")
# → "default"

Cross-Tenant Shared Cache

def can_read_shared(self, tenant_id: str) -> bool
def can_write_shared(self, tenant_id: str) -> bool
def is_accessible(self, requesting_tenant: str, entry: IntelligentCacheEntry) -> bool
Method Description
can_read_shared Returns True if the tenant has allow_shared_read=True.
can_write_shared Returns True if the tenant has allow_shared_write=True.
is_accessible Enforces the full isolation ruleset against a specific entry.

is_accessible() Rules

  1. The owning tenant always has access to its own entries.
  2. Entries marked is_shared=True are accessible to any tenant with allow_shared_read=True.
  3. All other combinations are denied.
entry = storage.get("acme::some_key")
if mgr.is_accessible(requesting_tenant="beta_corp", entry=entry):
    return entry
else:
    raise PermissionError("Cross-tenant access denied")

Monitoring & Stats

def set_quota_event_hook(self, hook: Callable[[str, str], None]) -> None
def get_tenant_stats(self, tenant_id: str) -> Dict[str, object]
def get_all_tenant_stats(self) -> List[Dict[str, object]]
def list_tenant_ids(self) -> List[str]
Method Description
set_quota_event_hook Registers a callback invoked on each quota violation. Arguments: (tenant_id: str, event_type: str) where event_type is e.g. "rpm_exceeded".
get_tenant_stats Returns a snapshot of a single tenant's resource usage.
get_all_tenant_stats Returns stats for all registered tenants.
list_tenant_ids Returns a list of all currently registered tenant IDs.

get_tenant_stats Fields

Key Type Description
tenant_id str Tenant identifier
memory_used_mb float Current memory usage
memory_quota_mb float Configured memory limit
memory_pct float Memory utilisation percentage (0–100)
entry_count int Current number of entries
max_entries int Configured entry limit
requests_this_min int Requests in the current minute window
rpm_quota int Configured RPM limit
def on_quota_event(tenant_id: str, event: str) -> None:
    alert_system.send(f"[QUOTA] tenant={tenant_id} event={event}")

manager._tenant_mgr.set_quota_event_hook(on_quota_event)

# Inspect a single tenant
stats = manager._tenant_mgr.get_tenant_stats("acme")
print(f"Memory: {stats['memory_pct']:.1f}%")
print(f"RPM: {stats['requests_this_min']} / {stats['rpm_quota']}")

# Enumerate all tenants
for tid in manager._tenant_mgr.list_tenant_ids():
    print(tid)

Storage Backends

All backends implement BaseStorage. Use build_storage(config) as the factory; direct instantiation is also supported.

from fennec_memory.cache import build_storage

storage = build_storage(config)   # preferred

# or directly:
from fennec_memory.cache import MemoryStorage, SQLiteStorage, RedisStorage, RedisConfig
mem    = MemoryStorage()
sqlite = SQLiteStorage(db_path="./cache.db", wal=True, cache_size_kb=65536)
redis  = RedisStorage(RedisConfig(host="redis-host", port=6379))

BaseStorage Interface

def get(self, key: str) -> Optional[IntelligentCacheEntry]
def set(self, key: str, entry: IntelligentCacheEntry, ttl_s: Optional[float] = None) -> None
def delete(self, key: str) -> bool
def exists(self, key: str) -> bool
def keys_by_tenant(self, tenant_id: str) -> List[str]
def all_keys(self) -> List[str]
def total_size_bytes(self) -> int
def flush_tenant(self, tenant_id: str) -> int
def close(self) -> None

7. Security Model

Prompt Injection Detection

SecurityGuard compiles a set of regex patterns to detect cache poisoning and prompt override attempts. Detection runs on every get() and put() call before any data is stored or returned.

Built-in patterns detect:

  • Prompt override phrases: "ignore all previous instructions", jailbreak persona requests
  • System prompt exfiltration: "print your system prompt", "reveal hidden instructions"
  • Classic LLM delimiters: [INST], [/INST], <|im_start|>, <system> tags
  • SQL/code injection: DROP TABLE, exec(, eval(, __import__(
  • Cross-tenant data hints: tenant_id=..., namespace=...

Custom patterns can be loaded at startup from a regex file (one pattern per line, # for comments) by setting SecurityConfig.injection_patterns_path.

On detection, SecurityViolation is raised, the error counter is incremented, and LLM_FALLBACK is returned. No partial data is stored.

PII Scrubbing

When SecurityConfig.enable_pii_scrub=True, the following patterns are redacted before storage:

Pattern Replacement
Credit card numbers (16 digits, various separators) [CARD_NUMBER]
US Social Security Numbers (NNN-NN-NNNN) [SSN]
Email addresses [EMAIL]
US phone numbers [PHONE]

PII scrubbing uses simple regex matching and is suitable for basic compliance requirements. For production environments handling sensitive data, integrate a dedicated library such as Microsoft Presidio by processing text before passing it to put().

HMAC Content Integrity

Every stored entry carries a SHA-256 hash of f"{normalized_query}:{response}" in content_hash. Before returning any entry from L1 or Storage, verify_entry_integrity() recomputes this hash and compares it.

If SecurityConfig.hmac_secret is set (via FENNEC_HMAC_SECRET environment variable), sign_content() and verify_content() use Python's hmac module with SHA-256 for cryptographic signing, providing tamper detection even against an adversary with write access to the storage backend. Without it, integrity verification falls back to SHA-256 hash comparison, which detects accidental corruption but not adversarial modification.

Behaviour on integrity failure: The entry is evicted from L1 and discarded from the result; the "integrity_fail_l1" or "integrity_fail_semantic" error counter is incremented; lookup continues to the next layer.

Tenant Isolation

Every entry is stored with a tenant_id tag. Namespace keys follow the format tenant_id::key, preventing key collisions across tenants at the storage level. The TenantManager.is_accessible() check enforces read permissions on every entry returned from semantic search, ensuring a tenant can never receive another tenant's private entries regardless of vector similarity.

Shared entries (is_shared=True) are opt-in at both the writer side (allow_shared_write=True) and reader side (allow_shared_read=True). The "default" tenant cannot be deregistered.


8. Storage Backends

MemoryStorage

Pure in-process dictionary. Data is lost when the process exits. No external dependencies.

Use when: Running tests, ephemeral workloads, development environments, or single-process applications where persistence is not required.

Tradeoffs: Fastest possible access; zero serialisation overhead; no durability; not shareable across processes.

SQLiteStorage

SQLite file-backed storage with WAL mode enabled by default for improved write concurrency. The 64 MB page cache reduces I/O on repeated access patterns.

Use when: Single-node production deployments, applications that need persistence across restarts, or when Redis is unavailable. Default backend.

Tradeoffs: Durable; no external service dependency; limited horizontal scalability; single-writer concurrency (WAL allows concurrent readers).

SQLiteStorage.purge_expired

SQLiteStorage exposes one additional method not present in the BaseStorage interface: a direct SQL-level purge of expired rows. Unlike the eviction timer, which scores and removes entries gradually, purge_expired deletes all rows whose expires_at timestamp has passed in a single DELETE statement and returns the count of removed rows immediately.

def purge_expired(self) -> int

Returns int — number of rows deleted.

When to use: Call this manually after a bulk put() operation, at application startup to clear stale data from a previous run, or from a maintenance script to reclaim disk space without waiting for the next eviction cycle.

from fennec_memory.cache import SQLiteStorage, SQLiteConfig

storage = SQLiteStorage(db_path="./fennec_cache.db")

removed = storage.purge_expired()
print(f"Purged {removed} expired entries from SQLite")

Note: purge_expired is only available on SQLiteStorage. It is not part of the BaseStorage interface and is not available on MemoryStorage or RedisStorage (Redis handles TTL expiry natively through key expiry at the server level).

RedisStorage

Redis-backed storage with configurable connection pooling, SSL, and key prefixing. Supports TTL natively through Redis key expiry.

Use when: Distributed deployments with multiple application nodes sharing a cache, high-availability requirements, or when you need Redis's rich operational tooling (monitoring, replication, clustering).

Tradeoffs: External service dependency; network round-trip latency per operation (~1–5ms); highest horizontal scalability; supports shared state across multiple CacheManager instances.

Comparison

MemoryStorage SQLiteStorage RedisStorage
Persistence None Yes Yes
Cross-process sharing No No Yes
External dependency None None Redis server
Latency ~μs ~100μs ~1–5ms
Horizontal scale Single process Single node Multi-node
Best for Tests / ephemeral Single-node production Distributed production

9. Observability & Metrics

All metrics are accessible through manager.get_metrics() and manager.get_tenant_metrics(tenant_id). Counters are cumulative from startup; no time-windowing is applied at the SDK level.

Hit Rate Metrics

metrics = manager.get_metrics()

# System-wide
overall    = metrics["overall_hit_rate"]    # fraction served from cache
exact_r    = metrics["exact_hit_rate"]      # fraction from exact match
semantic_r = metrics["semantic_hit_rate"]   # fraction from semantic search
fallback_r = metrics["llm_fallback_rate"]   # fraction requiring LLM call

Latency Percentiles

p99 = metrics["latency_overall"]["p99_ms"]    # 99th percentile overall
p50 = metrics["latency_exact"]["p50_ms"]      # median for exact hits
p90 = metrics["latency_semantic"]["p90_ms"]   # 90th percentile semantic hits

Cost & ROI

saved = metrics["total_saved_usd"]    # cumulative USD saved
roi   = metrics["roi_multiplier"]     # saved / spent (e.g., 45.3 → 45x ROI)

RL Policy Stats

rl = metrics["policy_learner"]

System Health

print(metrics["vector_index_size"])   # entries in FAISS
print(metrics["l1_size"])             # entries in L1 LRU
print(metrics["sim_threshold"])       # current adaptive threshold
print(metrics["errors"])              # dict of error type → count

Per-Tenant Monitoring

for tenant in metrics["tenants"]:
    print(
        f"{tenant['tenant_id']}: "
        f"memory {tenant['memory_pct']:.1f}% | "
        f"rpm {tenant['requests_this_min']}/{tenant['rpm_quota']} | "
        f"entries {tenant['entry_count']}/{tenant['max_entries']}"
    )

Alerting Integration

# Wire quota violations to your alerting system
def quota_alert(tenant_id: str, event: str) -> None:
    pagerduty.trigger(f"Cache quota: tenant={tenant_id}, event={event}")

manager._tenant_mgr.set_quota_event_hook(quota_alert)

# Check for security anomalies
metrics = manager.get_metrics()
if metrics["errors"].get("security_violation", 0) > 50:
    security_team.alert("Elevated injection attempt rate")

10. Advanced Usage

Multi-Tenant Setup

import asyncio
from fennec_memory.cache import (
    CacheManager, CacheConfig, TenantRegistration,
    StorageBackend, EmbeddingProvider, EmbeddingConfig,
    SecurityConfig,
)

async def setup():
    config = CacheConfig(
        storage_backend=StorageBackend.REDIS,
        embedding=EmbeddingConfig(provider=EmbeddingProvider.OPENAI),
        security=SecurityConfig(enable_pii_scrub=True),
    )
    manager = await CacheManager.create(config)

    # Register tenants with differentiated quotas
    manager.register_tenant(TenantRegistration(
        tenant_id="free_tier",
        memory_quota_mb=128.0,
        max_entries=1_000,
        request_quota_rpm=100,
    ))
    manager.register_tenant(TenantRegistration(
        tenant_id="enterprise",
        memory_quota_mb=8192.0,
        max_entries=500_000,
        request_quota_rpm=10_000,
        allow_shared_read=True,
        allow_shared_write=True,
    ))
    return manager

RL Feedback Loop

The feedback loop is the primary mechanism for improving cache quality over time. Positive signals lower the similarity threshold (allowing more hits), while negative signals raise it (demanding higher confidence before reuse).

# In your request handler
result = await manager.get(query, tenant_id=tenant)

if result.hit:
    response = result.response

    # After user engagement (e.g., session end, explicit rating)
    async def record_feedback(entry_id, liked):
        await manager.feedback(
            entry_id=entry_id,
            positive=liked,
            magnitude=1.0,
            tenant_id=tenant,
            source="user",
        )

    # Schedule async feedback recording without blocking the response
    asyncio.create_task(record_feedback(result.entry.entry_id, user_clicked_helpful))

else:
    response = await call_llm(query)
    entry = await manager.put(
        query, response, tenant_id=tenant,
        quality_score=0.9,
        response_tokens=350,
        input_tokens=15,
    )

Shared Cache Configuration

Shared entries allow common knowledge to be stored once and served to multiple tenants, reducing duplication and cost for universal content (e.g., product FAQs, legal boilerplate).

# Publisher tenant writes a shared entry
await manager.put(
    query="What are your refund terms?",
    response="Standard refund policy...",
    tenant_id="content_team",
    is_shared=True,       # marks entry as cross-tenant readable
    quality_score=1.0,
)

# Consumer tenant reads it (must have allow_shared_read=True)
result = await manager.get("What is your return policy?", tenant_id="customer_facing")
# Semantic similarity can match "refund terms" ↔ "return policy"

End-to-End Example

The following example illustrates the complete lifecycle: manager creation, tenant registration, cache lookup, LLM fallback with store, feedback recording, and metrics inspection.

import asyncio
from fennec_memory.cache import (
    CacheManager, CacheConfig, TenantRegistration,
    StorageBackend, EmbeddingProvider, EmbeddingConfig, SecurityConfig,
)

async def main():
    config = CacheConfig(
        storage_backend=StorageBackend.SQLITE,
        embedding=EmbeddingConfig(
            provider=EmbeddingProvider.OPENAI,
            model_name="text-embedding-3-small",
        ),
        security=SecurityConfig(enable_pii_scrub=True),
        default_ttl_s=7200.0,
    )

    manager = await CacheManager.create(config)

    manager.register_tenant(TenantRegistration(
        tenant_id="my_app",
        memory_quota_mb=1024.0,
        request_quota_rpm=2000,
    ))

    query = "Explain transformer attention mechanism"
    result = await manager.get(query, tenant_id="my_app")

    if result.hit:
        print(f"[CACHE HIT] {result.decision.value}")
        print(f"Similarity: {result.similarity:.2f} | Saved: ${result.cost_saved_usd:.4f}")
        answer = result.response
    else:
        print("[CACHE MISS] calling LLM...")
        answer = await call_llm(query)

        entry = await manager.put(
            query=query,
            response=answer,
            tenant_id="my_app",
            quality_score=0.9,
            response_tokens=420,
            input_tokens=10,
        )

    if result.hit and result.entry:
        await manager.feedback(
            entry_id=result.entry.entry_id,
            positive=True,
            tenant_id="my_app",
            source="user",
        )

    metrics = manager.get_metrics()
    print(f"Hit rate: {metrics['overall_hit_rate']:.1%}")
    print(f"ROI: {metrics['roi_multiplier']}x")

    await manager.aclose()

asyncio.run(main())

Production Deployment Notes

Environment variables over code: Use CacheConfig.from_env() combined with a secrets manager to keep API keys and HMAC secrets out of source code.

Pre-register all tenants: Do not rely on auto-registration in production. Auto-registered tenants receive default quotas and generate WARNING log entries. Register all tenants explicitly at startup with appropriate limits.

Eviction tuning: Reduce eviction.check_interval_s (e.g., to 60) for high-churn workloads. Set eviction.max_age_s to enforce a hard upper bound on entry age independent of TTL.

Redis in production: Set REDIS_PASSWORD and REDIS_SSL=true. Set socket_timeout and socket_connect_timeout conservatively (2 seconds is the default) to prevent cache failures from blocking the application thread.

Embedding costs: With OpenAI embeddings, every cache miss and every put() call incurs an embedding API cost. Monitor metrics["policy_learner"]["reward_mean"] to confirm the cache is returning quality responses and the embedding spend is justified.

HMAC integrity: Set FENNEC_HMAC_SECRET in production to enable cryptographic tamper detection. Without it, integrity verification falls back to SHA-256 hash comparison, which detects accidental corruption but not adversarial modification.

Graceful shutdown: Call manager.close() or await manager.aclose() at application shutdown to stop the eviction timer and close storage connections cleanly.


11. Edge Cases & Failure Handling

Embedding Service Failure

If the embedding provider is unreachable or returns an error during get(), the exception propagates through _InflightCoalescer and is surfaced to the caller. The L1 and storage exact-match layers complete before embedding is attempted, so an exact-match hit is still served even when the embedding service is down. For put(), an embedding failure prevents the entry from being indexed in FAISS; the entry is still written to L1 and Storage for exact-match retrieval.

Mitigation: Use EmbeddingProvider.MOCK in testing. For production, configure request_timeout and implement retry logic at the embedding provider level.

Redis / SQLite Failure

Storage failures during get() cause the affected layer to return None, and the lookup continues to the next layer (semantic search). Storage failures during put() are logged as errors and the method returns None. The L1 cache remains unaffected and continues to serve exact hits.

Mitigation: For Redis, configure connection pooling and socket_timeout. For SQLite, ensure the database file is on a local, low-latency filesystem.

Quota Exceeded

When any quota is breached (RPM, memory_quota_mb, or max_entries):

  • get() returns CacheLookupResult(hit=False, decision=LLM_FALLBACK) without raising.
  • put() returns None without raising.
  • The quota_event_hook fires (if registered) with the tenant ID and event type.
  • Error counters are incremented in metrics.

The application should treat quota-exceeded responses the same as a cache miss and proceed with an LLM call.

Corrupted Entries

If verify_entry_integrity() detects a hash mismatch on an L1 entry, the entry is invalidated from L1 and the "integrity_fail_l1" counter is incremented. If the mismatch occurs on a semantic candidate, that candidate is skipped. In both cases, lookup continues normally. Corrupted entries are never returned to the caller.

Missing or Expired Entry in feedback()

If the entry_id passed to feedback() no longer exists in Storage or L1 (e.g., it was evicted or its TTL expired), the method logs a WARNING and returns silently without raising an exception. The feedback signal is lost; this is by design for fire-and-forget usage.

Unknown Tenant

An unregistered tenant_id in get(), put(), or feedback() causes TenantManager to auto-register the tenant with the system default quotas (TenantConfig.default_*) and log a WARNING. While this allows simple deployments to work without explicit registration, it is not recommended in production because auto-registered tenants receive default quotas regardless of their actual entitlement.

Async / Sync Mismatch

Calling get_sync(), put_sync(), or feedback_sync() from inside an async def coroutine that is itself running on an event loop is not supported and will produce a deadlock or RuntimeError. Always use the async variants (get(), put(), feedback()) inside async contexts.

Eviction Timer

The background eviction timer runs on a daemon thread. If close() is not called before the process exits, the timer will be terminated abruptly. On a clean shutdown, always call manager.close() or use the context manager protocol to ensure the timer is cancelled and the storage connection is flushed.

Warmup Failure

If the embedding model fails to warm up during CacheManager.create(), a WARNING is logged and the manager is returned in a functional state. Subsequent embedding calls will attempt to initialise the model on demand. This means the first real get() or put() call may experience higher latency.


12. MultiLevelCache — General-Purpose Cache Layer

MultiLevelCache is the general-purpose, LLM-agnostic cache layer that sits beneath the intelligent pipeline. While CacheManager is the recommended interface for LLM workloads (adding semantic search, RL eviction, tenancy, and security), MultiLevelCache can be used standalone for any key-value caching need — for example, caching computed results, API responses, or deserialized configuration objects — without any dependency on embedding models or FAISS.

Architecture

MultiLevelCache implements a three-level memory hierarchy:

get(key)
  │
  ▼
L1 — In-process OrderedDict LRU/LFU (smallest, fastest: ~μs)
  │  MISS + auto-promote on threshold hit
  ▼
L2 — In-process OrderedDict LRU/LFU (medium, fast: ~μs)
  │  MISS + demote on eviction
  ▼
L3 — Disk-backed pickle files (largest, slower: ~ms)
  │  MISS
  ▼
return None

On a cache hit at L2 or L3, the entry is automatically promoted toward L1 based on its hit count. On eviction from L1, entries are demoted to L2; from L2, they cascade to L3. L3 files survive process restarts when persist_l3=True (the default).

Eviction Strategies — CacheStrategy

MultiLevelCache supports five eviction strategies, selected at construction time via the strategy parameter.

Strategy Value Behaviour
LRU "lru" Least Recently Used — evicts the entry accessed least recently. Default.
LFU "lfu" Least Frequently Used — evicts the entry with the fewest total accesses.
FIFO "fifo" First In First Out — evicts the oldest-created entry regardless of access.
TTL "ttl" Evicts the first expired entry found; falls back to oldest if none are expired.
ADAPTIVE "adaptive" Scores entries by hits / (idle_time + 1); evicts the lowest-scoring entry. Balances frequency and recency.
from fennec_memory.cache import MultiLevelCache, CacheStrategy

cache = MultiLevelCache(strategy=CacheStrategy.ADAPTIVE)

Constructor

MultiLevelCache(
    l1_max_items: Optional[int] = None,
    l2_max_items: Optional[int] = None,
    l3_max_items: Optional[int] = None,
    strategy: CacheStrategy = CacheStrategy.LRU,
    config: Optional[CacheConfig] = None,
    persist_l3: bool = True,
)
Parameter Type Default Description
l1_max_items Optional[int] From CacheConfig Maximum entries in L1. Overrides config.l1_max_items if provided.
l2_max_items Optional[int] From CacheConfig Maximum entries in L2. Overrides config.l2_max_items if provided.
l3_max_items Optional[int] From CacheConfig Maximum entries in L3. Overrides config.l3_max_items if provided.
strategy CacheStrategy LRU Eviction strategy applied to L1 and L2. L3 always uses FIFO with expired-first priority.
config Optional[CacheConfig] CacheConfig() Full configuration object. Provides capacity limits, TTL defaults, L3 directory, and cleanup interval.
persist_l3 bool True If True, L3 disk files survive close() / __exit__. If False, all L3 files are deleted on exit.

Important: Keys are SHA-256 hashed before storage. get(), exists(), and delete() all accept the original raw key; hashing is transparent to the caller.

Core Methods

get

def get(self, key: str) -> Optional[Any]

Retrieves the value for key, searching L1 → L2 → L3 in order. Returns None if not found in any level or if the entry has expired. Expired entries are evicted inline during the lookup. An L2 hit that exceeds the promotion threshold (config.l2_to_l1_hits) is automatically promoted to L1. An L3 hit is always promoted to L2.

set

def set(self, key: str, value: Any, ttl: Optional[float] = None) -> bool

Stores value under key in L1. If the entry already exists at any level it is removed first (update semantics). Returns True on success, False if an exception occurs. Uses config.default_ttl when ttl is None.

delete

def delete(self, key: str) -> bool

Removes key from all cache levels simultaneously. Returns True if the key was found in at least one level.

exists

def exists(self, key: str) -> bool

Returns True if key is present in any level and has not expired. Expired entries encountered during the check are evicted inline. Also supports the in operator: "my_key" in cache.

clear

def clear(self, level: Optional[int] = None) -> None

Clears the specified cache level (1, 2, or 3), or all levels if level is None. Clearing L3 deletes the associated disk files. Clearing all levels also resets the metrics counters.

cleanup_expired

def cleanup_expired(self) -> int

Scans all three levels and removes every expired entry. Returns the total number of entries removed. This is also invoked automatically by the background cleanup timer if config.auto_cleanup_interval is set.

get_stats

def get_stats(self) -> dict

Returns a dictionary with per-level and aggregate statistics.

Key Type Description
l1_items int Current entry count in L1
l1_size_mb float Current memory used by L1 entries (MB)
l1_max_items int Configured L1 capacity
l1_utilization float L1 fill percentage (0–100)
l2_items int Current entry count in L2
l2_size_mb float Current memory used by L2 entries (MB)
l2_utilization float L2 fill percentage (0–100)
l3_items int Current entry count in L3
l3_size_mb float Disk space used by L3 files (MB)
l3_utilization float L3 fill percentage (0–100)
total_items int Sum of entries across all levels
total_size_mb float Total memory + disk footprint (MB)
strategy str Active eviction strategy name
overall_hit_rate float Fraction of get() calls that returned a value
l1_hit_rate float L1-specific hit rate
l2_hit_rate float L2-specific hit rate
l3_hit_rate float L3-specific hit rate
evictions int Total eviction events
promotions int Total promotion events
expirations int Total expiration events

get_keys

def get_keys(self, level: Optional[int] = None) -> List[str]

Returns a list of all hashed keys currently stored in the specified level, or across all levels (deduplicated) if level is None. Keys are the SHA-256 hex strings of the original keys, not the originals.

get_entry_info

def get_entry_info(self, key: str) -> Optional[dict]

Returns detailed metadata for a specific entry, or None if not found. Useful for debugging cache behaviour.

Key Type Description
level int Cache level where the entry resides (1, 2, or 3)
hits int Number of times the entry has been accessed (L1/L2 only)
age float Seconds since the entry was created
idle_time float Seconds since the entry was last accessed (L1/L2 only)
size_bytes int Serialised size in bytes (L1/L2 only)
ttl Optional[float] Configured TTL in seconds (None = no expiry)
expired bool Whether the entry has passed its TTL
path str Disk file path (L3 only)

Batch Operations

For high-throughput scenarios where multiple keys need to be read or written together, MultiLevelCache provides three batch methods that iterate internally without requiring the caller to manage individual calls.

get_many

def get_many(self, keys: List[str]) -> Dict[str, Any]

Retrieves multiple values in a single call. Returns a dictionary containing only the keys that were found and had not expired. Missing or expired keys are absent from the result — they do not map to None.

results = cache.get_many(["key_a", "key_b", "key_c"])
# → {"key_a": ..., "key_c": ...}  (key_b was a miss)

set_many

def set_many(self, items: Dict[str, Any], ttl: Optional[float] = None) -> int

Stores multiple key-value pairs in a single call, applying the same ttl to all entries. Returns the number of entries successfully stored.

stored = cache.set_many({"key_a": val_a, "key_b": val_b}, ttl=600.0)
# → 2

delete_many

def delete_many(self, keys: List[str]) -> int

Deletes multiple keys across all cache levels. Returns the number of keys that were actually found and deleted.

removed = cache.delete_many(["key_a", "key_b", "stale_key"])
# → 2  (stale_key was not present)

Async API

MultiLevelCache provides async wrappers for the three most common operations, implemented via asyncio.to_thread so they are non-blocking in an async context without requiring any changes to the underlying synchronous implementation.

async def aget(self, key: str) -> Optional[Any]
async def aset(self, key: str, value: Any, ttl: Optional[float] = None) -> bool
async def adelete(self, key: str) -> bool

These are suitable for use inside FastAPI route handlers, async tasks, or any async def function. For batch async operations, wrap get_many, set_many, and delete_many with asyncio.to_thread directly.

# FastAPI example
@app.get("/data/{key}")
async def get_data(key: str):
    value = await cache.aget(key)
    if value is None:
        value = await fetch_from_db(key)
        await cache.aset(key, value, ttl=300.0)
    return {"data": value}

Context Manager

MultiLevelCache supports both sync and async context managers.

# Sync context manager
with MultiLevelCache(l1_max_items=100, persist_l3=False) as cache:
    cache.set("session_data", payload, ttl=3600.0)
    result = cache.get("session_data")
# L1 and L2 cleared on exit; L3 deleted because persist_l3=False

# Async context manager
async with MultiLevelCache(persist_l3=True) as cache:
    await cache.aset("key", value)

On __exit__ / __aexit__, the background cleanup timer is cancelled. If persist_l3=True, only L1 and L2 are cleared; L3 disk files remain for the next run. If persist_l3=False, all three levels are cleared and all L3 disk files are deleted.

Quick Start

from fennec_memory.cache import MultiLevelCache, CacheStrategy

# Basic usage with LRU eviction
cache = MultiLevelCache(
    l1_max_items=256,
    l2_max_items=2048,
    l3_max_items=20000,
    strategy=CacheStrategy.LRU,
)

# Store a value
cache.set("user:42:profile", {"name": "Alice", "plan": "pro"}, ttl=1800.0)

# Retrieve it
profile = cache.get("user:42:profile")

# Membership test
if "user:42:profile" in cache:
    print("profile is cached")

# Batch fill on application startup
cache.set_many({
    "config:feature_flags": flags,
    "config:rate_limits": limits,
    "config:pricing": pricing,
}, ttl=3600.0)

# Inspect state
print(cache)
# MultiLevelCache(L1=4/256, L2=0/2048, L3=0/20000, strategy=lru, hit_rate=75.00%)

stats = cache.get_stats()
print(f"Overall hit rate: {stats['overall_hit_rate']:.1%}")
print(f"Total items: {stats['total_items']}")

# Cleanup
cache.clear()

Relationship to CacheManager

See Section 14 — CacheManager vs MultiLevelCache for a full side-by-side comparison, decision guide, and usage examples for each component.


13. Low-Level Data Models

This section documents the data models used internally by MultiLevelCache and related infrastructure. These classes are not part of the CacheManager public API but are exposed for direct use with MultiLevelCache, custom storage integrations, or instrumentation.

CacheEntry

Represents a single cached item inside MultiLevelCache. Each entry tracks the cached value along with access metadata used by eviction strategies.

from fennec_memory.cache import CacheEntry

entry = CacheEntry(
    key="hashed_key_hex",
    value=my_object,
    ttl=600.0,
)

Fields

Field Type Default Description
key str Required SHA-256 hashed key (as stored internally)
value Any Required The cached value
created_at float time.time() Unix timestamp of creation
last_access float time.time() Unix timestamp of most recent access
hits int 0 Number of times the entry has been accessed
ttl Optional[float] None Time-to-live in seconds; None means no expiry
size_bytes int Auto-computed Serialised size estimate via pickle.dumps; falls back to config.size_bytes (default 1 KB) on failure

Methods

Method Returns Description
increment_hits() None Increments the hit counter and updates last_access to the current time
is_expired() bool Returns True if ttl is set and time.time() > created_at + ttl
age() float Seconds elapsed since created_at
idle_time() float Seconds elapsed since last_access
get_stats() dict Returns a snapshot dictionary with key, hits, age_seconds, idle_seconds, size_bytes, is_expired, and ttl
entry = CacheEntry(key="abc123", value={"result": 42}, ttl=300.0)

# After some accesses:
entry.increment_hits()

print(entry.hits)          # 1
print(entry.age())         # seconds since creation
print(entry.idle_time())   # seconds since last access
print(entry.is_expired())  # False (within TTL)
print(entry.get_stats())
# {'key': 'abc123', 'hits': 1, 'age_seconds': 0.002, 'idle_seconds': 0.0,
#  'size_bytes': 32, 'is_expired': False, 'ttl': 300.0}

CacheMetrics

A lightweight dataclass that accumulates per-level hit, miss, eviction, promotion, and expiration counters for a MultiLevelCache instance. Each MultiLevelCache owns one CacheMetrics object at cache.metrics.

Note: CacheMetrics is distinct from CacheMetricsCollector, which is used by CacheManager and tracks LLM-specific signals such as cost savings, latency histograms, and semantic similarity. CacheMetrics is simpler and records only raw cache-level operation counts.

Fields

Field Type Description
l1_hits int L1 exact hits
l2_hits int L2 exact hits
l3_hits int L3 disk hits
l1_misses int L1 misses
l2_misses int L2 misses
l3_misses int L3 misses
sets int Total set() calls
evictions int Total eviction events
promotions int Total promotion events (entry moved up a level)
expirations int Total expiration events (entry removed due to TTL)

Methods

Method Signature Description
record_get (level: int, hit: bool) -> None Records a get operation result for the given level (1, 2, or 3)
record_set () -> None Increments the sets counter
record_eviction () -> None Increments the evictions counter
record_promotion () -> None Increments the promotions counter
record_expiration () -> None Increments the expirations counter
get_hit_rate (level: Optional[int] = None) -> float Returns hit rate for the specified level, or overall if None. Returns 0.0 if no requests have been recorded.
get_stats () -> dict Returns all counters plus computed hit rates as a flat dictionary
reset () -> None Resets all counters to zero
# Access metrics directly from a MultiLevelCache instance
cache = MultiLevelCache(l1_max_items=100)
cache.set("key", "value")
cache.get("key")
cache.get("missing")

m = cache.metrics
print(m.get_hit_rate())       # 0.5 (1 hit out of 2 total gets)
print(m.get_hit_rate(level=1)) # 0.5
print(m.get_stats())
# {'l1_hits': 1, 'l2_hits': 0, 'l3_hits': 0, 'total_hits': 1,
#  'l1_misses': 1, 'l2_misses': 1, 'l3_misses': 1, 'total_misses': 3,
#  'l1_hit_rate': 0.5, 'l2_hit_rate': 0.0, 'l3_hit_rate': 0.0,
#  'overall_hit_rate': 0.25, 'sets': 1, 'evictions': 0,
#  'promotions': 0, 'expirations': 0}

m.reset()
print(m.l1_hits)  # 0

CacheStrategy

An Enum of eviction strategies used by MultiLevelCache. Import it to pass as the strategy constructor argument. See Section 12 — Eviction Strategies for a full description of each strategy's behaviour.

from fennec_memory.cache import CacheStrategy

class CacheStrategy(Enum):
    LRU      = "lru"       # Least Recently Used
    LFU      = "lfu"       # Least Frequently Used
    FIFO     = "fifo"      # First In First Out
    TTL      = "ttl"       # Time To Live based
    ADAPTIVE = "adaptive"  # Adaptive (frequency + recency composite score)
from fennec_memory.cache import MultiLevelCache, CacheStrategy

# Use ADAPTIVE for workloads with mixed hot/cold access patterns
cache = MultiLevelCache(strategy=CacheStrategy.ADAPTIVE)


14. CacheManager vs MultiLevelCache

Both components provide multi-level caching, but they are designed for fundamentally different problems and should not be treated as alternatives to one another. This section explains what each component is, where it differs from the other, and how to decide which one to use.


The Core Difference

CacheManager is an intelligent LLM query router. Its job is to intercept natural-language queries before they reach a language model, find semantically equivalent cached answers, and decide — using a cost-aware utility function and a reinforcement-learning policy — whether a cached answer is good enough to serve or whether a fresh LLM call is warranted. It understands tokens, costs, tenants, and the fuzzy nature of language.

MultiLevelCache is a general-purpose in-process key-value store with a three-level memory hierarchy (RAM → RAM → Disk). Its job is to store arbitrary Python objects under string keys and serve them quickly on repeated access, automatically promoting hot data toward the fastest layer and demoting cold data toward the slowest. It understands nothing about LLMs, language, or costs — only keys, values, and TTLs.

Put differently: CacheManager answers the question "Is this query semantically close enough to something I've seen before?" while MultiLevelCache answers the question "Have I seen this exact key before, and where did I put it?"


Architecture Comparison

Aspect CacheManager MultiLevelCache
Primary purpose LLM response caching with semantic matching General-purpose key-value caching
Lookup strategy Exact key → Storage → Semantic (FAISS) L1 memory → L2 memory → L3 disk
Semantic search Yes — FAISS cosine similarity over dense vectors No — exact key match only (SHA-256 hash)
Embedding model Required (OpenAI / HuggingFace / Ollama / Mock) Not used
Eviction policy Thompson Sampling RL bandit (learns from feedback) LRU / LFU / FIFO / TTL / Adaptive (fixed strategy)
Feedback loop Yes — feedback() updates bandit arms and similarity threshold No
Disk persistence Via pluggable BaseStorage backend (SQLite / Redis / Memory) L3 pickle files in config.cache_dir
Multi-tenancy Yes — namespace isolation, per-tenant quotas, shared cache No — single shared namespace
Security Prompt injection detection, PII scrubbing, HMAC integrity None
Cost tracking Yes — USD saved, ROI multiplier, per-tenant cost accounting No
Async model Native asyncio throughout Sync core with asyncio.to_thread wrappers
Concurrency Thread-safe + async-safe; inflight coalescer for embeddings Thread-safe via threading.RLock
Instantiation await CacheManager.create(config) (async factory, required) MultiLevelCache(...) (sync constructor)
Teardown await manager.aclose() or manager.close() cache.clear() or context manager
Dependencies FAISS, embedding provider, storage backend None beyond stdlib

Lookup Pipeline Comparison

CacheManager.get(query)

Query string
  │
  ▼
SecurityGuard.validate_query()       ← reject injections
  │
  ▼
TenantManager.check_and_charge()     ← RPM quota check
  │
  ▼
QueryNormalizer.normalize()          ← canonical form + SHA-256 key
  │
  ▼
_L1ExactCache.get()                  ← in-process LRU (per tenant)
  │  MISS
  ▼
Storage.get()                        ← Redis / SQLite / Memory
  │  MISS
  ▼
CachedEmbedder.embed_single()        ← dense vector (coalesced)
  │
  ▼
EmbeddingIndex.search()              ← FAISS nearest-neighbour
  │
  ▼
CachePolicyLearner.rank_candidates() ← Thompson Sampling re-rank
  │
  ▼
DecisionEngine.decide()              ← SEMANTIC_HIT or LLM_FALLBACK

MultiLevelCache.get(key)

Raw key string
  │
  ▼
SHA-256 hash                         ← deterministic key normalisation
  │
  ▼
L1 OrderedDict lookup                ← in-process memory (~μs)
  │  MISS
  ▼
L2 OrderedDict lookup                ← in-process memory (~μs)
  │  HIT → promote to L1 if hot enough
  │  MISS
  ▼
L3 disk lookup                       ← pickle file read (~ms)
  │  HIT → promote to L2
  ▼
return None

The key difference is steps 4–8 of CacheManager: the embedding, FAISS search, RL ranking, and cost-aware decision. MultiLevelCache skips all of that — it returns the value or None, with no probabilistic reasoning.


Data Model Comparison

CacheManager MultiLevelCache
Entry type IntelligentCacheEntry CacheEntry
Key type tenant_id::SHA-256(normalized_query) SHA-256(raw_key)
Value type Any serialisable object (typically an LLM response string) Any pickle-serialisable Python object
Metadata Bandit arm (α/β), HMAC hash, cost record, embedding vector, tenant ID, quality score Hit count, creation time, last access time, size bytes
Hit result CacheLookupResult with hit, decision, similarity, latency_ms, cost_saved_usd Raw value or None
Metrics object CacheMetricsCollector — tracks latency histograms, cost, semantic similarity, per-tenant stats CacheMetrics — tracks hit/miss/eviction/promotion counts per level

Choosing the Right Component

Use CacheManager when:

  • You are caching responses from an LLM API (OpenAI, Anthropic, local models, etc.)
  • Queries may be phrased differently but mean the same thing (semantic equivalence matters)
  • You need to track cost savings and ROI from caching
  • You have multiple tenants or user groups that need isolated cache namespaces
  • You want the system to learn over time which cached responses are high quality
  • You need prompt injection protection or PII scrubbing before storing data
  • You are building a RAG pipeline, chatbot, or any system where query latency and LLM cost are concerns

Use MultiLevelCache when:

  • You are caching arbitrary computed results (database query results, API responses, parsed configs, deserialized objects)
  • Keys are exact and deterministic — the same input always produces the same key
  • You do not need semantic matching, tenancy, or security features
  • You want zero external dependencies (no embedding model, no FAISS, no Redis required)
  • You need L3 disk persistence for data that survives process restarts but is expensive to recompute
  • You are caching in a context where CacheManager's LLM-specific pipeline would be unnecessary overhead

Use both together when:

The two components are fully independent and can coexist in the same application. A common pattern is to use MultiLevelCache for application-level data (feature flags, user sessions, rate limit counters, expensive DB queries) while CacheManager handles all LLM query caching in the same process.

from fennec_memory.cache import (
    MultiLevelCache, CacheStrategy,
    CacheManager, CacheConfig, StorageBackend,
    EmbeddingConfig, EmbeddingProvider,
)

# Application startup — both caches initialised independently
config_cache = MultiLevelCache(
    l1_max_items=512,
    strategy=CacheStrategy.LRU,
    persist_l3=True,
)

llm_cache = await CacheManager.create(CacheConfig(
    storage_backend=StorageBackend.SQLITE,
    embedding=EmbeddingConfig(provider=EmbeddingProvider.OPENAI),
))

# Request handler — each cache used for its intended purpose
async def handle_request(user_id: str, query: str):
    # MultiLevelCache: exact key lookup for user profile (cheap, deterministic)
    profile = config_cache.get(f"user:{user_id}:profile")
    if profile is None:
        profile = await db.fetch_user(user_id)
        config_cache.set(f"user:{user_id}:profile", profile, ttl=300.0)

    # CacheManager: semantic lookup for LLM response (expensive, fuzzy)
    result = await llm_cache.get(query, tenant_id=user_id)
    if result.hit:
        answer = result.response
    else:
        answer = await call_llm(query, context=profile)
        await llm_cache.put(query, answer, tenant_id=user_id, response_tokens=400)

    return answer

Summary

Question Answer
Are they interchangeable? No. They solve different problems at different layers.
Can they run in the same process? Yes. They are fully independent and have no shared state.
Does CacheManager use MultiLevelCache internally? No. CacheManager has its own _L1ExactCache and delegates to BaseStorage.
Which is faster for exact key lookups? Both hit in-process memory at ~μs. MultiLevelCache has less overhead per lookup (no quota check, no normalisation pipeline).
Which should I start with? If your use case involves LLM queries, start with CacheManager. For everything else, start with MultiLevelCache.

Simple Real Example

from fennec_community.llm import GeminiInterface
from fennec_community.document_loaders import TextLoader 
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem 
from fennec_memory.cache import MultiLevelCache, CacheConfig, CacheStrategy

loader_1 = TextLoader("./data_kn/faq.txt").load()
chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = GeminiInterface(api_key=llm_api)
context_manager = ContextManager()
rag_system = RAGSystem(llm=llm, vector_db=vector_db,chunker=chunker, context_manager=context_manager)

rag_system.add_documents(loader_1)
cache_config = CacheConfig(
    l1_max_items=100,
    l2_max_items=500,
    default_ttl=600,          
    enable_stats=True,
)
cache = MultiLevelCache(strategy=CacheStrategy.LRU, config=cache_config)

from typing import Tuple
import time
def cached_rag_query(query: str) -> Tuple[str, bool]:
    cache_key = f"rag:query:{query.strip().lower()}"
    cached = cache.get(cache_key)
    if cached:
        return cached, True
    answer = rag_system.generate(query)
    cache.set(cache_key, answer)
    return answer, False
queries = [
  "ماهي طرق الدفع المتاحه",
  "ماهي اوقات العمل ",
  "ماهي طرق الدفع المتاحة",
  "ماهي طرق الدفع المتاحه"
]
for q in queries:
    t0 = time.perf_counter()
    answer, from_cache = cached_rag_query(q)
    elapsed = (time.perf_counter() - t0) * 1000
    source = "⚡ Cache" if from_cache else "🔄 RAG"
    print(f"  {source} ({elapsed:.1f}ms)")
    print(f"  Q: {q}")
    print(f"  A: {answer[:80]}...")
    print()
Source: memory/cache_module_docs.md