cache_module_docs
Multi-Level Intelligent LLM Cache System
Table of Contents
- Overview
- System Architecture
- Core Concepts
- Quick Start Guide
- Configuration Reference
- Public API Reference
- Security Model
- Storage Backends
- Observability & Metrics
- Advanced Usage
- Edge Cases & Failure Handling
- MultiLevelCache — General-Purpose Cache Layer
- Low-Level Data Models
- CacheManager vs MultiLevelCache
1. Overview
The Fennec Cache Module is a production-grade, multi-level intelligent caching system designed specifically for LLM pipelines. Rather than paying the cost—in latency and money—of an LLM call for every request, Fennec intercepts queries before they reach the model, serves cached responses when semantically equivalent answers already exist, and learns over time which responses are most valuable to keep.
The system combines three complementary lookup strategies (exact key matching, persistent storage lookup, and FAISS-powered semantic vector search) with a Reinforcement Learning eviction policy that continuously improves cache quality based on user and system feedback. All of this is wrapped in a multi-tenant security layer with per-tenant quota enforcement, PII scrubbing, and HMAC content integrity verification.
Why It Exists
LLM APIs charge per token and incur hundreds of milliseconds of latency per call. Real-world applications—chatbots, RAG pipelines, APIs—frequently receive semantically identical or near-identical queries that do not need a fresh LLM response. Fennec exploits this by storing and reusing responses intelligently, reducing cost by orders of magnitude while delivering sub-millisecond latency for cache hits.
Real-World Use Cases
| Scenario | Benefit |
|---|---|
| Chatbots / Q&A systems | Serve repeated questions instantly; eliminate redundant LLM spend |
| RAG pipelines | Cache retrieved responses alongside embeddings for repeated document queries |
| Multi-tenant SaaS | Full tenant isolation with per-tenant quotas, shared-cache opt-in |
| High-traffic APIs | Reduce p99 latency from seconds to milliseconds for hot query paths |
| Cost monitoring | Real-time USD savings tracking and ROI reporting per tenant |
2. System Architecture
Pipeline Overview
Every get() call traverses the following ordered pipeline, short-circuiting at the first hit:
Query
│
▼
SecurityGuard ← validate query; reject injections, length violations
│
▼
TenantManager ← check & charge RPM quota
│
▼
QueryNormalizer ← Unicode NFC, lowercase, punctuation strip,
│ synonym expansion → canonical form + SHA-256 key
▼
L1 Exact Cache ← in-process OrderedDict LRU (fastest path, ~μs)
│ MISS
▼
Storage Exact Lookup ← Redis / SQLite / Memory (persistent exact match)
│ MISS → promote hit to L1
▼
EmbeddingIndex ← embed query (coalesced for concurrent requests)
│ → FAISS cosine similarity search
▼
PolicyLearner ← Thompson Sampling re-ranks candidates by expected reward
│
▼
DecisionEngine ← cost-aware utility function; decides SEMANTIC_HIT vs LLM_FALLBACK
│
├── CACHE HIT → return CacheLookupResult (hit=True)
│
└── LLM_FALLBACK → caller invokes LLM → put() → persist + embed + indexStore Pipeline (put())
SecurityGuard ← validate query & response
│
▼
PII Scrubber ← redact sensitive data (if enabled)
│
▼
QuotaCheck ← entry count + memory headroom
│
▼
QueryNormalizer ← normalize + compute exact key
│
▼
CachedEmbedder ← embed normalized query vector
│
▼
CostModel ← compute cost record (embedding + LLM costs)
│
▼
IntelligentCacheEntry ← build entry with RL bandit arm, HMAC hash, metadata
│
▼
L1 put + Storage.set + EmbeddingIndex.add
│
▼
QuotaAccounting ← increment entry count + charge memoryComponent Responsibilities
| Component | Responsibility |
|---|---|
_L1ExactCache |
In-process LRU OrderedDict; sub-microsecond exact lookup |
BaseStorage (+ backends) |
Persistent exact-match store; cross-process durability |
EmbeddingIndex |
FAISS vector index; semantic nearest-neighbour search |
CachedEmbedder |
Embedding model wrapper with in-process LRU for vectors |
_InflightCoalescer |
Deduplicates concurrent embedding calls for identical queries |
QueryNormalizer |
Canonical form transformation; deterministic SHA-256 key generation |
CachePolicyLearner |
Thompson Sampling bandit; ranks candidates; drives eviction |
DecisionEngine |
Cost/quality/latency utility function; makes final routing decision |
TenantManager |
Registration, quota enforcement, namespace isolation |
SecurityGuard |
Injection detection, PII scrubbing, HMAC integrity verification |
CacheMetricsCollector |
Thread-safe hit/miss/latency/cost counters |
Design Philosophy
Fennec is async-first: the full pipeline is built around asyncio with thread-safe primitives for background tasks (eviction timer, coalescer). Sync wrappers (get_sync, put_sync, feedback_sync) are provided for non-async callers. The system is pluggable—storage backends, embedding providers, and eviction policies are all swappable without changing application code. All quota and access decisions are fail-closed: a quota breach or security violation returns LLM_FALLBACK rather than raising, so the application always has a safe path forward.
3. Core Concepts
3.1 Multi-Level Caching
Fennec implements three cache layers with different speed/durability tradeoffs:
L1 — In-Process Exact Cache
An OrderedDict-backed LRU cache keyed on the SHA-256 hash of the normalized query, scoped per tenant. Lookups are in-process memory accesses (~microseconds). Capacity is bounded by l1_max_items (default: 512). Entries are automatically promoted from Storage on a hit to warm the L1.
Storage Exact Lookup Persistent exact-match lookup against the configured backend (Redis, SQLite, or Memory). Used when the L1 is cold or the entry was evicted. A hit here promotes the entry back to L1 for subsequent requests.
L2 — Semantic Search (FAISS)
When exact match fails, the query is embedded into a dense vector and searched against a FAISS index using cosine similarity. Candidates above semantic.similarity_threshold * 0.80 are loaded, filtered for tenant accessibility and integrity, and re-ranked by the RL policy. The DecisionEngine then decides whether the best candidate is good enough to serve or whether an LLM call is required.
3.2 Semantic Search & Embeddings
Query embedding is performed by CachedEmbedder, which wraps the configured embedding provider with an in-process LRU (embedding_cache_size, default: 4096 vectors). Concurrent requests for the same query are coalesced by _InflightCoalescer so the embedding model is called exactly once per unique query in flight, regardless of concurrency.
Supported embedding providers:
| Provider | Value | Notes |
|---|---|---|
| OpenAI | EmbeddingProvider.OPENAI |
Requires OPENAI_API_KEY; default model text-embedding-3-small |
| HuggingFace | EmbeddingProvider.HUGGINGFACE |
Sentence-transformers; runs locally |
| Ollama | EmbeddingProvider.OLLAMA |
Local inference at ollama_url |
| Mock | EmbeddingProvider.MOCK |
Deterministic random vectors; for tests and offline use |
3.3 RL-Based Caching (Thompson Sampling)
Every IntelligentCacheEntry carries a BanditArm—a Beta distribution parameterised by (alpha, beta) representing accumulated successes and failures. The system uses Thompson Sampling: each arm is scored by drawing a sample from Beta(alpha, beta), and candidates are ranked by this sampled reward. This provides principled exploration (low-confidence entries can still win) while favouring entries with a strong positive feedback history.
Feedback signals update the bandit arm:
positive=True→alpha += magnitudepositive=False→beta += magnitude
The adaptive_threshold feature adjusts the cosine similarity threshold automatically: positive feedback relaxes it (finds more hits), negative feedback tightens it (reduces false positives).
3.4 Multi-Tenancy & Isolation
Every entry is tagged with a tenant_id. The TenantManager enforces three independent quota dimensions per tenant: memory (MB), entry count, and requests per minute (RPM). Namespace keys are prefixed as tenant_id::key, providing hard storage-level separation. A "default" tenant is always registered for single-tenant deployments.
Cross-tenant shared cache access is opt-in: a tenant must have allow_shared_read=True to read entries marked is_shared=True, and allow_shared_write=True to publish shared entries. The is_accessible() method on TenantManager enforces these rules uniformly across all lookup paths.
3.5 Quota Enforcement
Three quota types are enforced:
| Quota | Unit | Enforcement Point |
|---|---|---|
request_quota_rpm |
Requests per minute | get() entry (before any work) |
max_entries |
Entry count | put() before storing |
memory_quota_mb |
Megabytes | put() before storing |
When any quota is exceeded, QuotaExceeded is raised internally and the operation returns LLM_FALLBACK (for get()) or None (for put()). An optional quota_event_hook callback fires on each violation for integration with external alerting.
Unknown tenant IDs are auto-registered with default quotas and a WARNING log entry. For production multi-tenant deployments, register tenants explicitly before first use.
QuotaExceeded Exception
The QuotaExceeded exception exposes structured context for alerting and logging:
class QuotaExceeded(Exception):
tenant_id: str # identifier of the tenant that exceeded quota
reason: str # human-readable reason (e.g., "RPM quota 1000 exceeded")from fennec_memory.cache import QuotaExceeded
try:
tenant_manager.check_and_charge_request("acme")
except QuotaExceeded as e:
print(f"Tenant {e.tenant_id} exceeded quota: {e.reason}")3.6 Eviction
A background threading.Timer runs the eviction cycle every eviction.check_interval_s seconds (default: 300). Each cycle:
- Removes expired entries (TTL exceeded).
- Removes entries exceeding
eviction.max_age_s(if configured). - Evicts the bottom 5% of entries by composite
eviction_score(or more if storage exceeds the soft cap). - Applies RL reward decay (
usage_decay) to all surviving entries.
The composite eviction score combines bandit expected reward, normalized usage count, and recency, weighted by reward_weight, usage_weight, and recency_weight.
4. Quick Start Guide
Installation
pip install fennec-memoryMinimal Working Example
import asyncio
from fennec_memory.cache import CacheManager, CacheConfig, StorageBackend
from fennec_memory.cache import EmbeddingConfig, EmbeddingProvider
async def main():
# 1. Create manager (async factory; warms up embedding model)
config = CacheConfig(
storage_backend=StorageBackend.SQLITE,
embedding=EmbeddingConfig(provider=EmbeddingProvider.OPENAI),
default_ttl_s=3600.0,
)
manager = await CacheManager.create(config)
query = "What is retrieval-augmented generation?"
# 2. Attempt cache lookup
result = await manager.get(query)
if result.hit:
answer = result.response
print(f"Cache hit ({result.decision.value}), saved ${result.cost_saved_usd:.4f}")
else:
# 3. LLM fallback
answer = await your_llm_call(query)
# 4. Store for future requests
await manager.put(query, answer, response_tokens=350)
# 5. Teardown
await manager.aclose()
asyncio.run(main())Sync Usage (Flask, scripts)
# No async context needed
result = manager.get_sync("What is RAG?")
if not result.hit:
answer = your_llm_call_sync("What is RAG?")
manager.put_sync("What is RAG?", answer)5. Configuration Reference
All configuration is handled through a single root CacheConfig dataclass. Pass one instance to CacheManager.create().
CacheConfig — Root Configuration
from fennec_memory.cache import CacheConfig, StorageBackend
config = CacheConfig(
storage_backend=StorageBackend.SQLITE, # storage backend selection
l1_max_items=512, # L1 in-process LRU capacity
default_ttl_s=3600.0, # default entry TTL in seconds
log_level="INFO", # logging level
)| Field | Type | Default | Description |
|---|---|---|---|
storage_backend |
StorageBackend |
SQLITE |
Persistent backend: REDIS, SQLITE, or MEMORY |
l1_max_items |
int |
512 |
Maximum entries in the in-process L1 LRU |
l2_max_items |
int |
4096 |
Maximum entries in the L2 in-process cache |
default_ttl_s |
Optional[float] |
3600.0 |
Default TTL for stored entries (seconds). None = no expiry |
log_level |
str |
"INFO" |
Python logging level |
embedding |
EmbeddingConfig |
see below | Embedding provider settings |
semantic |
SemanticConfig |
see below | Similarity thresholds and index settings |
cost |
CostConfig |
see below | Token pricing and utility weights |
rl |
RLConfig |
see below | Thompson Sampling hyperparameters |
tenant |
TenantConfig |
see below | Default tenant quotas |
security |
SecurityConfig |
see below | Security and PII settings |
eviction |
EvictionConfig |
see below | Eviction policy and scheduling |
redis |
RedisConfig |
see below | Redis connection settings |
sqlite |
SQLiteConfig |
see below | SQLite file and performance settings |
from_env() — Environment Variable Loading
config = CacheConfig.from_env()| Environment Variable | Config Field | Default |
|---|---|---|
CACHE_L1_MAX_ITEMS |
l1_max_items |
100 |
CACHE_L2_MAX_ITEMS |
l2_max_items |
1000 |
CACHE_L3_MAX_ITEMS |
l3_max_items |
10000 |
CACHE_DIR |
cache_dir |
./cache_storage |
CACHE_DEFAULT_TTL |
default_ttl_s |
None |
OPENAI_API_KEY |
embedding.openai_api_key |
— |
REDIS_HOST |
redis.host |
localhost |
REDIS_PORT |
redis.port |
6379 |
REDIS_DB |
redis.db |
0 |
REDIS_PASSWORD |
redis.password |
— |
REDIS_SSL |
redis.ssl |
false |
SQLITE_PATH |
sqlite.db_path |
./fennec_cache.db |
FENNEC_HMAC_SECRET |
security.hmac_secret |
— |
LOG_LEVEL |
log_level |
INFO |
to_dict() — Serialisation
d = config.to_dict()Converts the full CacheConfig to a plain dictionary. Useful for logging, auditing, or persisting configuration state.
EmbeddingConfig
Controls the embedding model used for semantic search.
from fennec_memory.cache import EmbeddingConfig, EmbeddingProvider
EmbeddingConfig(
provider=EmbeddingProvider.OPENAI,
model_name="text-embedding-3-small",
dimension=1536,
batch_size=64,
cache_embeddings=True,
embedding_cache_size=4096,
request_timeout=10.0,
)| Field | Type | Default | Description |
|---|---|---|---|
provider |
EmbeddingProvider |
MOCK |
Embedding backend |
model_name |
str |
"text-embedding-3-small" |
Model identifier (for OpenAI) |
dimension |
int |
1536 |
Vector dimension; must match the chosen model |
batch_size |
int |
64 |
Embedding batch size |
cache_embeddings |
bool |
True |
Enable in-process LRU for computed vectors |
embedding_cache_size |
int |
4096 |
LRU capacity for cached vectors |
openai_api_key |
Optional[str] |
$OPENAI_API_KEY |
API key (auto-read from env) |
hf_model_name |
str |
"sentence-transformers/all-MiniLM-L6-v2" |
HuggingFace model name |
ollama_url |
str |
"http://localhost:11434" |
Ollama server URL |
request_timeout |
float |
10.0 |
HTTP timeout for embedding requests (seconds) |
SemanticConfig
Controls the FAISS vector index and similarity threshold behaviour.
| Field | Type | Default | Description |
|---|---|---|---|
similarity_threshold |
float |
0.85 |
Minimum cosine similarity to accept a semantic hit |
adaptive_threshold |
bool |
True |
Auto-tune threshold based on feedback signals |
threshold_min |
float |
0.70 |
Minimum adaptive threshold floor |
threshold_max |
float |
0.97 |
Maximum adaptive threshold ceiling |
threshold_step |
float |
0.01 |
Step size for each threshold adjustment |
index_type |
str |
"flat" |
FAISS index type: "flat", "ivf", or "hnsw" |
max_index_size |
int |
100_000 |
Maximum vectors in the FAISS index |
SecurityConfig
| Field | Type | Default | Description |
|---|---|---|---|
max_query_length |
int |
8192 |
Maximum allowed query character length |
max_response_length |
int |
65536 |
Maximum allowed response character length |
enable_injection_check |
bool |
True |
Enable prompt injection detection |
injection_patterns_path |
Optional[str] |
None |
Path to a file of custom regex patterns (one per line) |
enable_pii_scrub |
bool |
False |
Enable PII redaction before storage |
hmac_secret |
Optional[str] |
$FENNEC_HMAC_SECRET |
HMAC signing key for entry integrity verification |
TenantConfig
Defines default quotas applied to auto-registered or unspecified tenants.
| Field | Type | Default | Description |
|---|---|---|---|
default_memory_quota_mb |
float |
512.0 |
Default memory quota per tenant (MB) |
default_request_quota_rpm |
int |
1000 |
Default requests-per-minute limit |
default_max_entries |
int |
10_000 |
Default maximum entry count per tenant |
enable_shared_cache |
bool |
False |
Grant default tenant shared read/write access |
isolation_strict |
bool |
True |
Enforce hard namespace separation |
RLConfig
Thompson Sampling hyperparameters for the bandit policy.
| Field | Type | Default | Description |
|---|---|---|---|
prior_alpha |
float |
1.0 |
Beta distribution prior for successes (uniform prior = no prior knowledge) |
prior_beta |
float |
1.0 |
Beta distribution prior for failures |
exploration_bonus |
float |
0.05 |
Additional reward bonus for under-explored entries |
positive_feedback_reward |
float |
1.0 |
Reward magnitude added on positive feedback |
negative_feedback_penalty |
float |
1.0 |
Penalty magnitude added on negative feedback |
similarity_bonus_scale |
float |
0.3 |
Reward multiplier for high-similarity hits |
usage_decay |
float |
0.995 |
Multiplicative reward decay applied each eviction cycle |
min_reward_to_keep |
float |
0.05 |
Entries with expected reward below this are eligible for eviction |
EvictionConfig
| Field | Type | Default | Description |
|---|---|---|---|
policy |
EvictionPolicy |
REWARD_LRU |
Eviction strategy: REWARD_LRU, LRU, LFU, or TTL |
check_interval_s |
int |
300 |
Seconds between eviction cycle runs |
max_age_s |
Optional[int] |
None |
Hard maximum age for any entry; None = unlimited |
reward_weight |
float |
0.50 |
Weight of bandit reward in composite eviction score |
usage_weight |
float |
0.30 |
Weight of usage count in composite eviction score |
recency_weight |
float |
0.20 |
Weight of recency (time since last access) in composite score |
RedisConfig
| Field | Type | Default / Env |
|---|---|---|
host |
str |
$REDIS_HOST → localhost |
port |
int |
$REDIS_PORT → 6379 |
db |
int |
$REDIS_DB → 0 |
password |
Optional[str] |
$REDIS_PASSWORD |
ssl |
bool |
$REDIS_SSL → false |
socket_timeout |
float |
2.0 |
max_connections |
int |
50 |
key_prefix |
str |
"fennec:" |
SQLiteConfig
| Field | Type | Default / Env |
|---|---|---|
db_path |
str |
$SQLITE_PATH → ./fennec_cache.db |
wal_mode |
bool |
True (Write-Ahead Logging; better concurrency) |
cache_size_kb |
int |
65536 (64 MB page cache) |
PerformanceConfig
Controls low-level async and concurrency behaviour. These settings tune how the system handles inflight requests, parallel embedding calls, and I/O threading. In most cases the defaults are appropriate; adjust only when profiling indicates a bottleneck.
from fennec_memory.cache import PerformanceConfig
PerformanceConfig(
enable_async=True,
coalescing_window_ms=10,
embedding_batch_size=32,
io_threads=4,
max_concurrent_llm_calls=20,
)| Field | Type | Default | Description |
|---|---|---|---|
enable_async |
bool |
True |
Enable async execution mode. Set to False only for purely synchronous deployments where no event loop is ever present. |
coalescing_window_ms |
int |
10 |
Time window in milliseconds during which concurrent identical embedding requests are deduplicated by _InflightCoalescer. Higher values increase deduplication efficiency at the cost of added latency. |
embedding_batch_size |
int |
32 |
Number of texts to embed in a single provider call. Larger batches reduce HTTP overhead; smaller batches reduce per-request latency variance. |
io_threads |
int |
4 |
Size of the thread pool used for storage I/O operations executed via asyncio.to_thread. Increase for high-concurrency deployments with slow storage. |
max_concurrent_llm_calls |
int |
20 |
Maximum number of in-flight LLM calls the system will allow simultaneously. Requests above this limit queue until a slot is free. |
Note:
PerformanceConfigis nested insideCacheConfigas theperformancefield and is automatically constructed with defaults. Pass an explicit instance only when you need non-default values.
from fennec_memory.cache import CacheConfig, PerformanceConfig
config = CacheConfig(
performance=PerformanceConfig(
coalescing_window_ms=20, # longer window for very high concurrency
io_threads=8,
max_concurrent_llm_calls=50,
)
)6. Public API Reference
CacheManager.create
Async factory method. The preferred way to instantiate CacheManager. Builds all subsystems and warms up the embedding model with a no-op call.
@classmethod
async def create(cls, config: Optional[CacheConfig] = None) -> "CacheManager"Parameters
| Name | Type | Required | Description |
|---|---|---|---|
config |
CacheConfig |
No | Full configuration object. Uses defaults if None. |
Returns A fully initialised CacheManager instance, ready for use.
Behaviour Constructs all subsystems (normalizer, security guard, embedder, storage, policy learner, cost model, tenant manager, metrics collector, coalescer, L1 cache, vector index). Starts the background eviction timer. Sends a "warmup" string to the embedding model to pre-load it. If warmup fails, a WARNING is logged and initialisation continues normally.
Important: Always use
CacheManager.create()rather than calling__init__directly. The factory guarantees embedding model warmup and proper subsystem wiring.
from fennec_memory.cache import CacheManager, CacheConfig, StorageBackend
from fennec_memory.cache import EmbeddingConfig, EmbeddingProvider
config = CacheConfig(
storage_backend=StorageBackend.SQLITE,
embedding=EmbeddingConfig(
provider=EmbeddingProvider.OPENAI,
model_name="text-embedding-3-small",
),
default_ttl_s=3600.0,
)
manager = await CacheManager.create(config)CacheManager.get
Primary cache lookup. Traverses all cache layers in order, returning a routing decision on every call.
async def get(
self,
query: str,
tenant_id: str = "default",
top_k: int = 1,
) -> CacheLookupResultParameters
| Name | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Raw query string from the user or application |
tenant_id |
str |
No | Tenant namespace. Defaults to "default". |
top_k |
int |
No | Number of semantic candidates to retrieve from FAISS (default: 1) |
Returns CacheLookupResult
| Field | Type | Description |
|---|---|---|
hit |
bool |
True if a cached response was found |
decision |
RoutingDecision |
EXACT_HIT, SEMANTIC_HIT, or LLM_FALLBACK |
entry |
Optional[IntelligentCacheEntry] |
The matched entry, or None on a miss |
similarity |
float |
Cosine similarity score (1.0 for exact hits) |
latency_ms |
float |
Total lookup time in milliseconds |
cost_saved_usd |
float |
Estimated USD saved by avoiding an LLM call |
response |
Any |
Shortcut property: entry.response if entry is not None |
Internal Lookup Sequence
SecurityGuard.validate_query()— rejects injections; returnsLLM_FALLBACKon violation without raising.TenantManager.check_and_charge_request()— deducts one RPM token; returnsLLM_FALLBACKif quota exceeded.QueryNormalizer.normalize()→exact_cache_key()— produce canonical form and SHA-256 key._L1ExactCache.get()— in-process LRU lookup; verifies HMAC integrity; evicts corrupted entries.Storage.get()— persistent exact lookup; promotes hit to L1.CachedEmbedder.embed_single()(coalesced) — embed the normalized query.EmbeddingIndex.search()— FAISS nearest-neighbour search; filters bysimilarity_threshold * 0.80.- Load and filter candidates: expired, inaccessible, and integrity-failed entries are discarded.
CachePolicyLearner.rank_candidates()— Thompson Sampling re-ranks surviving candidates.DecisionEngine.decide()— applies cost/quality/latency utility function; returnsSEMANTIC_HITorLLM_FALLBACK.- Promote hit to L1; record metrics.
result = await manager.get("What is retrieval-augmented generation?", tenant_id="acme")
if result.hit:
print(f"[{result.decision.value}] similarity={result.similarity:.2f}")
print(f"Saved: ${result.cost_saved_usd:.4f} | latency: {result.latency_ms:.1f}ms")
answer = result.response
else:
# Call your LLM here
answer = await your_llm(query)
await manager.put(query, answer, tenant_id="acme")CacheManager.put
Stores a query-response pair. Call this immediately after a successful LLM call when get() returns hit=False.
async def put(
self,
query: str,
response: Any,
tenant_id: str = "default",
ttl_s: Optional[float] = None,
quality_score: float = 1.0,
response_tokens: int = 0,
is_shared: bool = False,
input_tokens: int = 0,
) -> Optional[IntelligentCacheEntry]Parameters
| Name | Type | Required | Description |
|---|---|---|---|
query |
str |
Yes | Original raw query string |
response |
Any |
Yes | LLM response to cache |
tenant_id |
str |
No | Tenant namespace. Defaults to "default". |
ttl_s |
Optional[float] |
No | Entry TTL in seconds. Falls back to config.default_ttl_s if None. |
quality_score |
float |
No | External quality signal in [0, 1]. Used to seed the bandit arm. |
response_tokens |
int |
No | Approximate output token count (for cost tracking) |
is_shared |
bool |
No | Mark entry as readable by other tenants with allow_shared_read=True |
input_tokens |
int |
No | Approximate input token count (for cost tracking) |
Returns IntelligentCacheEntry on success, or None if rejected by security or quota.
Internal Store Sequence
SecurityGuard.validate_query()andvalidate_response().SecurityGuard.scrub_pii()on query and response (ifenable_pii_scrub=True).TenantManager.check_entry_quota()andcheck_memory_quota().- Normalize query → compute exact key.
- Embed normalized query (no coalescing; each
put()computes its own vector). - Build
CostRecordfrom token counts and configured pricing. - Construct
IntelligentCacheEntrywith bandit arm, content hash, and metadata. - Write to L1, Storage, and FAISS index.
- Update tenant memory and entry count quotas.
entry = await manager.put(
query="Explain transformer attention mechanism",
response=llm_answer,
tenant_id="acme",
ttl_s=7200.0,
quality_score=0.95,
response_tokens=350,
input_tokens=12,
)
if entry:
print(f"Stored: {entry.entry_id[:8]}... cost={entry.cost_record.total_usd:.6f} USD")CacheManager.feedback
Records a quality signal for a cached entry. Updates the Thompson Sampling bandit arm and adjusts the adaptive similarity threshold. Use this whenever you have a signal about response quality—user ratings, LLM-as-judge scores, or implicit engagement metrics.
async def feedback(
self,
entry_id: str,
positive: bool,
magnitude: float = 1.0,
tenant_id: str = "default",
source: str = "user",
) -> NoneParameters
| Name | Type | Required | Description |
|---|---|---|---|
entry_id |
str |
Yes | entry_id from CacheLookupResult.entry.entry_id |
positive |
bool |
Yes | True = response was good; False = response was wrong or unhelpful |
magnitude |
float |
No | Signal strength in [0, ∞). Default 1.0. Use higher values for high-confidence signals. |
tenant_id |
str |
No | Tenant namespace |
source |
str |
No | Signal origin: "user", "llm_eval", or "auto" |
Returns None. Fire-and-forget; does not raise on unknown entry_id.
Internal Behaviour
- Loads the entry from Storage or L1.
- Calls
CachePolicyLearner.record_feedback()→ updatesbandit_arm.alpha(positive) orbandit_arm.beta(negative). - Updates
confidence_scoreon the entry. - Adjusts the adaptive similarity threshold: positive feedback relaxes it by
threshold_step * 0.5; negative feedback tightens it bythreshold_step. - Persists the updated entry back to Storage.
If entry_id is not found (e.g., TTL expired), logs a WARNING and returns silently.
result = await manager.get(query, tenant_id="acme")
if result.hit:
answer = result.response
# After user interaction...
user_satisfied = True # e.g., from thumbs-up button
await manager.feedback(
entry_id=result.entry.entry_id,
positive=user_satisfied,
magnitude=1.0,
tenant_id="acme",
source="user",
)CacheManager.get_sync / put_sync / feedback_sync
Synchronous wrappers for non-async callers. Suitable for use in Flask views, Django handlers, Celery tasks, scripts, and Jupyter notebooks.
def get_sync(self, query: str, tenant_id: str = "default") -> CacheLookupResult
def put_sync(
self,
query: str,
response: Any,
tenant_id: str = "default",
**kwargs, # same keyword arguments as put()
) -> Optional[IntelligentCacheEntry]
def feedback_sync(
self,
entry_id: str,
positive: bool,
tenant_id: str = "default",
) -> NoneBehaviour Each method calls _run_sync(), which detects the calling context:
- If a running event loop exists in the current thread (FastAPI, Jupyter): submits via
asyncio.run_coroutine_threadsafe()and blocks on the returnedFuture. - Otherwise (plain script, thread pool, Celery worker): uses
asyncio.run()to create an isolated loop for the duration of the call.
Warning: Do not call sync wrappers from inside an
async deffunction. If you are already in an async context, use the async methods directly.
# Flask route
@app.route("/ask")
def ask():
query = request.args["q"]
result = manager.get_sync(query, tenant_id="webapp")
if result.hit:
return jsonify({"answer": result.response, "cached": True})
answer = call_llm_sync(query)
manager.put_sync(query, answer, tenant_id="webapp", response_tokens=300)
return jsonify({"answer": answer, "cached": False})CacheManager.register_tenant
Registers a new tenant with custom quotas and permissions.
def register_tenant(self, reg: TenantRegistration) -> NoneParameters
| Name | Type | Required | Description |
|---|---|---|---|
reg |
TenantRegistration |
Yes | Tenant registration data |
TenantRegistration Fields
| Field | Type | Default | Description |
|---|---|---|---|
tenant_id |
str |
Required | Unique tenant identifier |
display_name |
str |
"" |
Human-readable tenant name |
memory_quota_mb |
float |
512.0 |
Memory quota in MB |
max_entries |
int |
10_000 |
Maximum entry count |
request_quota_rpm |
int |
1_000 |
Requests per minute limit |
allow_shared_read |
bool |
False |
Can this tenant read is_shared=True entries from other tenants? |
allow_shared_write |
bool |
False |
Can this tenant publish is_shared=True entries? |
custom_ttl_s |
Optional[float] |
None |
Override default TTL for this tenant's entries |
metadata |
Dict[str, str] |
{} |
Arbitrary metadata for billing or routing |
If a tenant_id already exists, the registration is updated and a warning is logged.
from fennec_memory.cache import TenantRegistration
manager.register_tenant(TenantRegistration(
tenant_id="enterprise_client_a",
display_name="ACME Corp",
memory_quota_mb=2048.0,
max_entries=50_000,
request_quota_rpm=5_000,
allow_shared_read=True,
))CacheManager.flush_tenant
Evicts all cache entries belonging to a tenant. Removes from Storage, L1, and the FAISS vector index. Executes synchronously.
def flush_tenant(self, tenant_id: str) -> intParameters
| Name | Type | Required | Description |
|---|---|---|---|
tenant_id |
str |
Yes | Tenant whose entries should be removed |
Returns int — number of entries deleted.
removed = manager.flush_tenant("enterprise_client_a")
print(f"Flushed {removed} entries")CacheManager.get_metrics
Returns a full system-wide metrics snapshot. All counters are cumulative since the CacheManager was created.
def get_metrics(self) -> Dict[str, object]Returned Keys
| Key | Type | Description |
|---|---|---|
total_requests |
int |
Total get() calls |
overall_hit_rate |
float |
Fraction of requests served from cache (0–1) |
exact_hit_rate |
float |
Fraction served by exact match |
semantic_hit_rate |
float |
Fraction served by semantic match |
llm_fallback_rate |
float |
Fraction routed to LLM |
total_saved_usd |
float |
Cumulative USD saved by cache hits |
roi_multiplier |
float |
total_saved_usd / total_spent_usd |
latency_overall |
dict |
Histogram with p50_ms, p90_ms, p99_ms |
latency_exact |
dict |
Latency histogram for exact-hit requests |
latency_semantic |
dict |
Latency histogram for semantic-hit requests |
vector_index_size |
int |
Number of vectors in the FAISS index |
l1_size |
int |
Current entry count in L1 |
sim_threshold |
float |
Current adaptive similarity threshold |
tenants |
list |
Per-tenant stats (see get_tenant_metrics) |
errors |
dict |
Error counts keyed by type (e.g., "security_violation", "quota_exceeded") |
decision_engine |
dict |
Decision engine internal stats |
policy_learner |
dict |
RL policy stats (reward mean, p10, p90, feedback rate) |
metrics = manager.get_metrics()
print(f"Hit rate: {metrics['overall_hit_rate']:.1%}")
print(f"ROI: {metrics['roi_multiplier']}x")
print(f"p99 latency: {metrics['latency_overall']['p99_ms']:.1f}ms")
print(f"Saved: ${metrics['total_saved_usd']:.2f}")
if metrics["errors"].get("security_violation", 0) > 100:
alert("High rate of injection attempts detected")CacheManager.get_tenant_metrics
Per-tenant metrics snapshot.
def get_tenant_metrics(self, tenant_id: str) -> Dict[str, object]Returned Keys
| Key | Type | Description |
|---|---|---|
tenant_id |
str |
Tenant identifier |
rpm |
float |
Requests in the last 60 seconds |
cost_saved |
float |
USD saved for this tenant |
memory_used_mb |
float |
Current memory usage |
memory_quota_mb |
float |
Configured memory limit |
memory_pct |
float |
Memory utilisation percentage (0–100) |
entry_count |
int |
Number of entries owned by this tenant |
max_entries |
int |
Configured entry limit |
requests_this_min |
int |
Requests in the current minute window |
rpm_quota |
int |
Configured RPM limit |
CacheManager.close / aclose
Releases all resources. Stops the background eviction timer and closes the storage connection.
def close(self) -> None
async def aclose(self) -> NoneSupports use as a context manager:
# Sync context manager
with manager:
result = manager.get_sync("question")
# Async context (manual)
await manager.aclose()QueryNormalizer
Transforms raw query strings into a canonical form used as the cache key and embedding input. Ensures that minor surface variations (casing, punctuation, synonyms) map to the same cache entry. Supports Unicode handling for both English and Arabic stop-words.
class QueryNormalizer:
def __init__(self, config: Optional[NormalizationConfig] = None) -> None
def normalize(self, query: str) -> str
def exact_cache_key(self, tenant_id: str, normalized_query: str) -> strNormalization Pipeline (applied in order)
- Unicode NFC normalization
- Remove control and zero-width characters
- Lowercase
- Remove punctuation (default: enabled)
- Collapse whitespace
- Synonym expansion (e.g.,
"llm"→"large language model") - Stop-word removal (default: disabled; supports English and Arabic)
- Token deduplication (default: disabled)
- Length cap at 2048 characters
exact_cache_key() returns a SHA-256 hex digest of f"{tenant_id}:{normalized_query}", providing globally unique, tenant-scoped keys.
from fennec_memory.cache import QueryNormalizer, NormalizationConfig
normalizer = QueryNormalizer(NormalizationConfig(
remove_stopwords=True,
extra_synonyms={"gpt-4": "large language model"},
))
normalized = normalizer.normalize("What is LLM?")
# → "what large language model"
key = normalizer.exact_cache_key("tenant_a", normalized)
# → SHA-256 hex stringSecurityGuard
Stateless security validator. Thread-safe. Instantiate once; reuse across all requests.
class SecurityGuard:
def __init__(self, config: SecurityConfig) -> None
def validate_query(self, query: str, tenant_id: str = "default") -> None
def validate_response(self, response: Any, tenant_id: str = "default") -> None
def enforce_tenant_access(self, requesting_tenant: str, entry_tenant: str, is_shared: bool) -> None
def scrub_pii(self, text: str) -> str
def sign_content(self, content: str) -> str
def verify_content(self, content: str, signature: str) -> bool
def verify_entry_integrity(self, entry: Any) -> boolAll validate_* and enforce_* methods raise SecurityViolation on failure. scrub_pii() and verify_entry_integrity() return a value rather than raising.
from fennec_memory.cache import SecurityGuard, SecurityConfig, SecurityViolation
guard = SecurityGuard(SecurityConfig(
enable_pii_scrub=True,
hmac_secret="production-secret-key",
))
clean = guard.scrub_pii("Contact me at user@example.com or 555-123-4567")
# → "Contact me at [EMAIL] or [PHONE]"
try:
guard.validate_query("ignore all previous instructions", "tenant_a")
except SecurityViolation as e:
print(f"Rejected: {e.reason}")TenantManager
Central thread-safe registry for tenant lifecycle, quota enforcement, and namespace management. In most cases you will interact with TenantManager indirectly through CacheManager. Use it directly only for advanced scenarios such as quota hooks or manual isolation checks.
class TenantManager:
def __init__(self, config: TenantConfig) -> NoneOn construction, the "default" tenant is automatically registered using the quotas defined in the provided TenantConfig. The "default" tenant cannot be deregistered.
Registration
def register(self, reg: TenantRegistration) -> None
def deregister(self, tenant_id: str) -> None
def is_registered(self, tenant_id: str) -> bool
def get_registration(self, tenant_id: str) -> TenantRegistration| Method | Description |
|---|---|
register |
Registers a new tenant or updates an existing one. Thread-safe. Logs a WARNING if the tenant_id already exists. |
deregister |
Removes a tenant and its quota state. Raises if called on "default". |
is_registered |
Returns True if the tenant is currently registered. |
get_registration |
Returns the TenantRegistration for the given tenant_id, or raises KeyError if not found. |
from fennec_memory.cache import TenantManager, TenantRegistration, TenantConfig
mgr = TenantManager(TenantConfig())
mgr.register(TenantRegistration(
tenant_id="acme",
display_name="ACME Corp",
memory_quota_mb=1024.0,
max_entries=20_000,
request_quota_rpm=3_000,
allow_shared_read=True,
))
print(mgr.is_registered("acme")) # True
reg = mgr.get_registration("acme")
print(reg.memory_quota_mb) # 1024.0
mgr.deregister("acme")Quota Enforcement
def check_and_charge_request(self, tenant_id: str) -> None # raises QuotaExceeded
def check_memory_quota(self, tenant_id: str) -> None # raises QuotaExceeded
def check_entry_quota(self, tenant_id: str) -> None # raises QuotaExceeded
def charge_memory(self, tenant_id: str, size_bytes: int) -> None
def release_memory(self, tenant_id: str, size_bytes: int) -> None
def increment_entries(self, tenant_id: str) -> None
def decrement_entries(self, tenant_id: str) -> None| Method | Description |
|---|---|
check_and_charge_request |
Verifies RPM quota and deducts one request. Raises QuotaExceeded if the limit is reached. |
check_memory_quota |
Raises QuotaExceeded if the tenant's memory usage has reached memory_quota_mb. |
check_entry_quota |
Raises QuotaExceeded if the tenant's entry count has reached max_entries. |
charge_memory |
Increments the tenant's tracked memory usage by size_bytes. |
release_memory |
Decrements tracked memory usage; floors at 0. |
increment_entries |
Increments the entry counter by 1. |
decrement_entries |
Decrements the entry counter by 1; floors at 0. |
Namespace / Key Management
def namespace_key(self, tenant_id: str, key: str) -> str
def extract_tenant(self, namespaced_key: str) -> str| Method | Description |
|---|---|
namespace_key |
Returns a globally unique key in the form tenant_id::key. |
extract_tenant |
Parses the tenant_id from a namespaced key. Returns "default" if the key contains no :: separator. |
ns_key = mgr.namespace_key("acme", "query_abc123")
# → "acme::query_abc123"
tenant = mgr.extract_tenant("acme::query_abc123")
# → "acme"
tenant = mgr.extract_tenant("orphan_key")
# → "default"Cross-Tenant Shared Cache
def can_read_shared(self, tenant_id: str) -> bool
def can_write_shared(self, tenant_id: str) -> bool
def is_accessible(self, requesting_tenant: str, entry: IntelligentCacheEntry) -> bool| Method | Description |
|---|---|
can_read_shared |
Returns True if the tenant has allow_shared_read=True. |
can_write_shared |
Returns True if the tenant has allow_shared_write=True. |
is_accessible |
Enforces the full isolation ruleset against a specific entry. |
is_accessible() Rules
- The owning tenant always has access to its own entries.
- Entries marked
is_shared=Trueare accessible to any tenant withallow_shared_read=True. - All other combinations are denied.
entry = storage.get("acme::some_key")
if mgr.is_accessible(requesting_tenant="beta_corp", entry=entry):
return entry
else:
raise PermissionError("Cross-tenant access denied")Monitoring & Stats
def set_quota_event_hook(self, hook: Callable[[str, str], None]) -> None
def get_tenant_stats(self, tenant_id: str) -> Dict[str, object]
def get_all_tenant_stats(self) -> List[Dict[str, object]]
def list_tenant_ids(self) -> List[str]| Method | Description |
|---|---|
set_quota_event_hook |
Registers a callback invoked on each quota violation. Arguments: (tenant_id: str, event_type: str) where event_type is e.g. "rpm_exceeded". |
get_tenant_stats |
Returns a snapshot of a single tenant's resource usage. |
get_all_tenant_stats |
Returns stats for all registered tenants. |
list_tenant_ids |
Returns a list of all currently registered tenant IDs. |
get_tenant_stats Fields
| Key | Type | Description |
|---|---|---|
tenant_id |
str |
Tenant identifier |
memory_used_mb |
float |
Current memory usage |
memory_quota_mb |
float |
Configured memory limit |
memory_pct |
float |
Memory utilisation percentage (0–100) |
entry_count |
int |
Current number of entries |
max_entries |
int |
Configured entry limit |
requests_this_min |
int |
Requests in the current minute window |
rpm_quota |
int |
Configured RPM limit |
def on_quota_event(tenant_id: str, event: str) -> None:
alert_system.send(f"[QUOTA] tenant={tenant_id} event={event}")
manager._tenant_mgr.set_quota_event_hook(on_quota_event)
# Inspect a single tenant
stats = manager._tenant_mgr.get_tenant_stats("acme")
print(f"Memory: {stats['memory_pct']:.1f}%")
print(f"RPM: {stats['requests_this_min']} / {stats['rpm_quota']}")
# Enumerate all tenants
for tid in manager._tenant_mgr.list_tenant_ids():
print(tid)Storage Backends
All backends implement BaseStorage. Use build_storage(config) as the factory; direct instantiation is also supported.
from fennec_memory.cache import build_storage
storage = build_storage(config) # preferred
# or directly:
from fennec_memory.cache import MemoryStorage, SQLiteStorage, RedisStorage, RedisConfig
mem = MemoryStorage()
sqlite = SQLiteStorage(db_path="./cache.db", wal=True, cache_size_kb=65536)
redis = RedisStorage(RedisConfig(host="redis-host", port=6379))BaseStorage Interface
def get(self, key: str) -> Optional[IntelligentCacheEntry]
def set(self, key: str, entry: IntelligentCacheEntry, ttl_s: Optional[float] = None) -> None
def delete(self, key: str) -> bool
def exists(self, key: str) -> bool
def keys_by_tenant(self, tenant_id: str) -> List[str]
def all_keys(self) -> List[str]
def total_size_bytes(self) -> int
def flush_tenant(self, tenant_id: str) -> int
def close(self) -> None7. Security Model
Prompt Injection Detection
SecurityGuard compiles a set of regex patterns to detect cache poisoning and prompt override attempts. Detection runs on every get() and put() call before any data is stored or returned.
Built-in patterns detect:
- Prompt override phrases:
"ignore all previous instructions", jailbreak persona requests - System prompt exfiltration:
"print your system prompt","reveal hidden instructions" - Classic LLM delimiters:
[INST],[/INST],<|im_start|>,<system>tags - SQL/code injection:
DROP TABLE,exec(,eval(,__import__( - Cross-tenant data hints:
tenant_id=...,namespace=...
Custom patterns can be loaded at startup from a regex file (one pattern per line, # for comments) by setting SecurityConfig.injection_patterns_path.
On detection, SecurityViolation is raised, the error counter is incremented, and LLM_FALLBACK is returned. No partial data is stored.
PII Scrubbing
When SecurityConfig.enable_pii_scrub=True, the following patterns are redacted before storage:
| Pattern | Replacement |
|---|---|
| Credit card numbers (16 digits, various separators) | [CARD_NUMBER] |
US Social Security Numbers (NNN-NN-NNNN) |
[SSN] |
| Email addresses | [EMAIL] |
| US phone numbers | [PHONE] |
PII scrubbing uses simple regex matching and is suitable for basic compliance requirements. For production environments handling sensitive data, integrate a dedicated library such as Microsoft Presidio by processing text before passing it to put().
HMAC Content Integrity
Every stored entry carries a SHA-256 hash of f"{normalized_query}:{response}" in content_hash. Before returning any entry from L1 or Storage, verify_entry_integrity() recomputes this hash and compares it.
If SecurityConfig.hmac_secret is set (via FENNEC_HMAC_SECRET environment variable), sign_content() and verify_content() use Python's hmac module with SHA-256 for cryptographic signing, providing tamper detection even against an adversary with write access to the storage backend. Without it, integrity verification falls back to SHA-256 hash comparison, which detects accidental corruption but not adversarial modification.
Behaviour on integrity failure: The entry is evicted from L1 and discarded from the result; the "integrity_fail_l1" or "integrity_fail_semantic" error counter is incremented; lookup continues to the next layer.
Tenant Isolation
Every entry is stored with a tenant_id tag. Namespace keys follow the format tenant_id::key, preventing key collisions across tenants at the storage level. The TenantManager.is_accessible() check enforces read permissions on every entry returned from semantic search, ensuring a tenant can never receive another tenant's private entries regardless of vector similarity.
Shared entries (is_shared=True) are opt-in at both the writer side (allow_shared_write=True) and reader side (allow_shared_read=True). The "default" tenant cannot be deregistered.
8. Storage Backends
MemoryStorage
Pure in-process dictionary. Data is lost when the process exits. No external dependencies.
Use when: Running tests, ephemeral workloads, development environments, or single-process applications where persistence is not required.
Tradeoffs: Fastest possible access; zero serialisation overhead; no durability; not shareable across processes.
SQLiteStorage
SQLite file-backed storage with WAL mode enabled by default for improved write concurrency. The 64 MB page cache reduces I/O on repeated access patterns.
Use when: Single-node production deployments, applications that need persistence across restarts, or when Redis is unavailable. Default backend.
Tradeoffs: Durable; no external service dependency; limited horizontal scalability; single-writer concurrency (WAL allows concurrent readers).
SQLiteStorage.purge_expired
SQLiteStorage exposes one additional method not present in the BaseStorage interface: a direct SQL-level purge of expired rows. Unlike the eviction timer, which scores and removes entries gradually, purge_expired deletes all rows whose expires_at timestamp has passed in a single DELETE statement and returns the count of removed rows immediately.
def purge_expired(self) -> intReturns int — number of rows deleted.
When to use: Call this manually after a bulk put() operation, at application startup to clear stale data from a previous run, or from a maintenance script to reclaim disk space without waiting for the next eviction cycle.
from fennec_memory.cache import SQLiteStorage, SQLiteConfig
storage = SQLiteStorage(db_path="./fennec_cache.db")
removed = storage.purge_expired()
print(f"Purged {removed} expired entries from SQLite")Note:
purge_expiredis only available onSQLiteStorage. It is not part of theBaseStorageinterface and is not available onMemoryStorageorRedisStorage(Redis handles TTL expiry natively through key expiry at the server level).
RedisStorage
Redis-backed storage with configurable connection pooling, SSL, and key prefixing. Supports TTL natively through Redis key expiry.
Use when: Distributed deployments with multiple application nodes sharing a cache, high-availability requirements, or when you need Redis's rich operational tooling (monitoring, replication, clustering).
Tradeoffs: External service dependency; network round-trip latency per operation (~1–5ms); highest horizontal scalability; supports shared state across multiple CacheManager instances.
Comparison
MemoryStorage |
SQLiteStorage |
RedisStorage |
|
|---|---|---|---|
| Persistence | None | Yes | Yes |
| Cross-process sharing | No | No | Yes |
| External dependency | None | None | Redis server |
| Latency | ~μs | ~100μs | ~1–5ms |
| Horizontal scale | Single process | Single node | Multi-node |
| Best for | Tests / ephemeral | Single-node production | Distributed production |
9. Observability & Metrics
All metrics are accessible through manager.get_metrics() and manager.get_tenant_metrics(tenant_id). Counters are cumulative from startup; no time-windowing is applied at the SDK level.
Hit Rate Metrics
metrics = manager.get_metrics()
# System-wide
overall = metrics["overall_hit_rate"] # fraction served from cache
exact_r = metrics["exact_hit_rate"] # fraction from exact match
semantic_r = metrics["semantic_hit_rate"] # fraction from semantic search
fallback_r = metrics["llm_fallback_rate"] # fraction requiring LLM callLatency Percentiles
p99 = metrics["latency_overall"]["p99_ms"] # 99th percentile overall
p50 = metrics["latency_exact"]["p50_ms"] # median for exact hits
p90 = metrics["latency_semantic"]["p90_ms"] # 90th percentile semantic hitsCost & ROI
saved = metrics["total_saved_usd"] # cumulative USD saved
roi = metrics["roi_multiplier"] # saved / spent (e.g., 45.3 → 45x ROI)RL Policy Stats
rl = metrics["policy_learner"]
System Health
print(metrics["vector_index_size"]) # entries in FAISS
print(metrics["l1_size"]) # entries in L1 LRU
print(metrics["sim_threshold"]) # current adaptive threshold
print(metrics["errors"]) # dict of error type → countPer-Tenant Monitoring
for tenant in metrics["tenants"]:
print(
f"{tenant['tenant_id']}: "
f"memory {tenant['memory_pct']:.1f}% | "
f"rpm {tenant['requests_this_min']}/{tenant['rpm_quota']} | "
f"entries {tenant['entry_count']}/{tenant['max_entries']}"
)Alerting Integration
# Wire quota violations to your alerting system
def quota_alert(tenant_id: str, event: str) -> None:
pagerduty.trigger(f"Cache quota: tenant={tenant_id}, event={event}")
manager._tenant_mgr.set_quota_event_hook(quota_alert)
# Check for security anomalies
metrics = manager.get_metrics()
if metrics["errors"].get("security_violation", 0) > 50:
security_team.alert("Elevated injection attempt rate")10. Advanced Usage
Multi-Tenant Setup
import asyncio
from fennec_memory.cache import (
CacheManager, CacheConfig, TenantRegistration,
StorageBackend, EmbeddingProvider, EmbeddingConfig,
SecurityConfig,
)
async def setup():
config = CacheConfig(
storage_backend=StorageBackend.REDIS,
embedding=EmbeddingConfig(provider=EmbeddingProvider.OPENAI),
security=SecurityConfig(enable_pii_scrub=True),
)
manager = await CacheManager.create(config)
# Register tenants with differentiated quotas
manager.register_tenant(TenantRegistration(
tenant_id="free_tier",
memory_quota_mb=128.0,
max_entries=1_000,
request_quota_rpm=100,
))
manager.register_tenant(TenantRegistration(
tenant_id="enterprise",
memory_quota_mb=8192.0,
max_entries=500_000,
request_quota_rpm=10_000,
allow_shared_read=True,
allow_shared_write=True,
))
return managerRL Feedback Loop
The feedback loop is the primary mechanism for improving cache quality over time. Positive signals lower the similarity threshold (allowing more hits), while negative signals raise it (demanding higher confidence before reuse).
# In your request handler
result = await manager.get(query, tenant_id=tenant)
if result.hit:
response = result.response
# After user engagement (e.g., session end, explicit rating)
async def record_feedback(entry_id, liked):
await manager.feedback(
entry_id=entry_id,
positive=liked,
magnitude=1.0,
tenant_id=tenant,
source="user",
)
# Schedule async feedback recording without blocking the response
asyncio.create_task(record_feedback(result.entry.entry_id, user_clicked_helpful))
else:
response = await call_llm(query)
entry = await manager.put(
query, response, tenant_id=tenant,
quality_score=0.9,
response_tokens=350,
input_tokens=15,
)Shared Cache Configuration
Shared entries allow common knowledge to be stored once and served to multiple tenants, reducing duplication and cost for universal content (e.g., product FAQs, legal boilerplate).
# Publisher tenant writes a shared entry
await manager.put(
query="What are your refund terms?",
response="Standard refund policy...",
tenant_id="content_team",
is_shared=True, # marks entry as cross-tenant readable
quality_score=1.0,
)
# Consumer tenant reads it (must have allow_shared_read=True)
result = await manager.get("What is your return policy?", tenant_id="customer_facing")
# Semantic similarity can match "refund terms" ↔ "return policy"End-to-End Example
The following example illustrates the complete lifecycle: manager creation, tenant registration, cache lookup, LLM fallback with store, feedback recording, and metrics inspection.
import asyncio
from fennec_memory.cache import (
CacheManager, CacheConfig, TenantRegistration,
StorageBackend, EmbeddingProvider, EmbeddingConfig, SecurityConfig,
)
async def main():
config = CacheConfig(
storage_backend=StorageBackend.SQLITE,
embedding=EmbeddingConfig(
provider=EmbeddingProvider.OPENAI,
model_name="text-embedding-3-small",
),
security=SecurityConfig(enable_pii_scrub=True),
default_ttl_s=7200.0,
)
manager = await CacheManager.create(config)
manager.register_tenant(TenantRegistration(
tenant_id="my_app",
memory_quota_mb=1024.0,
request_quota_rpm=2000,
))
query = "Explain transformer attention mechanism"
result = await manager.get(query, tenant_id="my_app")
if result.hit:
print(f"[CACHE HIT] {result.decision.value}")
print(f"Similarity: {result.similarity:.2f} | Saved: ${result.cost_saved_usd:.4f}")
answer = result.response
else:
print("[CACHE MISS] calling LLM...")
answer = await call_llm(query)
entry = await manager.put(
query=query,
response=answer,
tenant_id="my_app",
quality_score=0.9,
response_tokens=420,
input_tokens=10,
)
if result.hit and result.entry:
await manager.feedback(
entry_id=result.entry.entry_id,
positive=True,
tenant_id="my_app",
source="user",
)
metrics = manager.get_metrics()
print(f"Hit rate: {metrics['overall_hit_rate']:.1%}")
print(f"ROI: {metrics['roi_multiplier']}x")
await manager.aclose()
asyncio.run(main())Production Deployment Notes
Environment variables over code: Use CacheConfig.from_env() combined with a secrets manager to keep API keys and HMAC secrets out of source code.
Pre-register all tenants: Do not rely on auto-registration in production. Auto-registered tenants receive default quotas and generate WARNING log entries. Register all tenants explicitly at startup with appropriate limits.
Eviction tuning: Reduce eviction.check_interval_s (e.g., to 60) for high-churn workloads. Set eviction.max_age_s to enforce a hard upper bound on entry age independent of TTL.
Redis in production: Set REDIS_PASSWORD and REDIS_SSL=true. Set socket_timeout and socket_connect_timeout conservatively (2 seconds is the default) to prevent cache failures from blocking the application thread.
Embedding costs: With OpenAI embeddings, every cache miss and every put() call incurs an embedding API cost. Monitor metrics["policy_learner"]["reward_mean"] to confirm the cache is returning quality responses and the embedding spend is justified.
HMAC integrity: Set FENNEC_HMAC_SECRET in production to enable cryptographic tamper detection. Without it, integrity verification falls back to SHA-256 hash comparison, which detects accidental corruption but not adversarial modification.
Graceful shutdown: Call manager.close() or await manager.aclose() at application shutdown to stop the eviction timer and close storage connections cleanly.
11. Edge Cases & Failure Handling
Embedding Service Failure
If the embedding provider is unreachable or returns an error during get(), the exception propagates through _InflightCoalescer and is surfaced to the caller. The L1 and storage exact-match layers complete before embedding is attempted, so an exact-match hit is still served even when the embedding service is down. For put(), an embedding failure prevents the entry from being indexed in FAISS; the entry is still written to L1 and Storage for exact-match retrieval.
Mitigation: Use EmbeddingProvider.MOCK in testing. For production, configure request_timeout and implement retry logic at the embedding provider level.
Redis / SQLite Failure
Storage failures during get() cause the affected layer to return None, and the lookup continues to the next layer (semantic search). Storage failures during put() are logged as errors and the method returns None. The L1 cache remains unaffected and continues to serve exact hits.
Mitigation: For Redis, configure connection pooling and socket_timeout. For SQLite, ensure the database file is on a local, low-latency filesystem.
Quota Exceeded
When any quota is breached (RPM, memory_quota_mb, or max_entries):
get()returnsCacheLookupResult(hit=False, decision=LLM_FALLBACK)without raising.put()returnsNonewithout raising.- The
quota_event_hookfires (if registered) with the tenant ID and event type. - Error counters are incremented in metrics.
The application should treat quota-exceeded responses the same as a cache miss and proceed with an LLM call.
Corrupted Entries
If verify_entry_integrity() detects a hash mismatch on an L1 entry, the entry is invalidated from L1 and the "integrity_fail_l1" counter is incremented. If the mismatch occurs on a semantic candidate, that candidate is skipped. In both cases, lookup continues normally. Corrupted entries are never returned to the caller.
Missing or Expired Entry in feedback()
If the entry_id passed to feedback() no longer exists in Storage or L1 (e.g., it was evicted or its TTL expired), the method logs a WARNING and returns silently without raising an exception. The feedback signal is lost; this is by design for fire-and-forget usage.
Unknown Tenant
An unregistered tenant_id in get(), put(), or feedback() causes TenantManager to auto-register the tenant with the system default quotas (TenantConfig.default_*) and log a WARNING. While this allows simple deployments to work without explicit registration, it is not recommended in production because auto-registered tenants receive default quotas regardless of their actual entitlement.
Async / Sync Mismatch
Calling get_sync(), put_sync(), or feedback_sync() from inside an async def coroutine that is itself running on an event loop is not supported and will produce a deadlock or RuntimeError. Always use the async variants (get(), put(), feedback()) inside async contexts.
Eviction Timer
The background eviction timer runs on a daemon thread. If close() is not called before the process exits, the timer will be terminated abruptly. On a clean shutdown, always call manager.close() or use the context manager protocol to ensure the timer is cancelled and the storage connection is flushed.
Warmup Failure
If the embedding model fails to warm up during CacheManager.create(), a WARNING is logged and the manager is returned in a functional state. Subsequent embedding calls will attempt to initialise the model on demand. This means the first real get() or put() call may experience higher latency.
12. MultiLevelCache — General-Purpose Cache Layer
MultiLevelCache is the general-purpose, LLM-agnostic cache layer that sits beneath the intelligent pipeline. While CacheManager is the recommended interface for LLM workloads (adding semantic search, RL eviction, tenancy, and security), MultiLevelCache can be used standalone for any key-value caching need — for example, caching computed results, API responses, or deserialized configuration objects — without any dependency on embedding models or FAISS.
Architecture
MultiLevelCache implements a three-level memory hierarchy:
get(key)
│
▼
L1 — In-process OrderedDict LRU/LFU (smallest, fastest: ~μs)
│ MISS + auto-promote on threshold hit
▼
L2 — In-process OrderedDict LRU/LFU (medium, fast: ~μs)
│ MISS + demote on eviction
▼
L3 — Disk-backed pickle files (largest, slower: ~ms)
│ MISS
▼
return NoneOn a cache hit at L2 or L3, the entry is automatically promoted toward L1 based on its hit count. On eviction from L1, entries are demoted to L2; from L2, they cascade to L3. L3 files survive process restarts when persist_l3=True (the default).
Eviction Strategies — CacheStrategy
MultiLevelCache supports five eviction strategies, selected at construction time via the strategy parameter.
| Strategy | Value | Behaviour |
|---|---|---|
LRU |
"lru" |
Least Recently Used — evicts the entry accessed least recently. Default. |
LFU |
"lfu" |
Least Frequently Used — evicts the entry with the fewest total accesses. |
FIFO |
"fifo" |
First In First Out — evicts the oldest-created entry regardless of access. |
TTL |
"ttl" |
Evicts the first expired entry found; falls back to oldest if none are expired. |
ADAPTIVE |
"adaptive" |
Scores entries by hits / (idle_time + 1); evicts the lowest-scoring entry. Balances frequency and recency. |
from fennec_memory.cache import MultiLevelCache, CacheStrategy
cache = MultiLevelCache(strategy=CacheStrategy.ADAPTIVE)Constructor
MultiLevelCache(
l1_max_items: Optional[int] = None,
l2_max_items: Optional[int] = None,
l3_max_items: Optional[int] = None,
strategy: CacheStrategy = CacheStrategy.LRU,
config: Optional[CacheConfig] = None,
persist_l3: bool = True,
)| Parameter | Type | Default | Description |
|---|---|---|---|
l1_max_items |
Optional[int] |
From CacheConfig |
Maximum entries in L1. Overrides config.l1_max_items if provided. |
l2_max_items |
Optional[int] |
From CacheConfig |
Maximum entries in L2. Overrides config.l2_max_items if provided. |
l3_max_items |
Optional[int] |
From CacheConfig |
Maximum entries in L3. Overrides config.l3_max_items if provided. |
strategy |
CacheStrategy |
LRU |
Eviction strategy applied to L1 and L2. L3 always uses FIFO with expired-first priority. |
config |
Optional[CacheConfig] |
CacheConfig() |
Full configuration object. Provides capacity limits, TTL defaults, L3 directory, and cleanup interval. |
persist_l3 |
bool |
True |
If True, L3 disk files survive close() / __exit__. If False, all L3 files are deleted on exit. |
Important: Keys are SHA-256 hashed before storage.
get(),exists(), anddelete()all accept the original raw key; hashing is transparent to the caller.
Core Methods
get
def get(self, key: str) -> Optional[Any]Retrieves the value for key, searching L1 → L2 → L3 in order. Returns None if not found in any level or if the entry has expired. Expired entries are evicted inline during the lookup. An L2 hit that exceeds the promotion threshold (config.l2_to_l1_hits) is automatically promoted to L1. An L3 hit is always promoted to L2.
set
def set(self, key: str, value: Any, ttl: Optional[float] = None) -> boolStores value under key in L1. If the entry already exists at any level it is removed first (update semantics). Returns True on success, False if an exception occurs. Uses config.default_ttl when ttl is None.
delete
def delete(self, key: str) -> boolRemoves key from all cache levels simultaneously. Returns True if the key was found in at least one level.
exists
def exists(self, key: str) -> boolReturns True if key is present in any level and has not expired. Expired entries encountered during the check are evicted inline. Also supports the in operator: "my_key" in cache.
clear
def clear(self, level: Optional[int] = None) -> NoneClears the specified cache level (1, 2, or 3), or all levels if level is None. Clearing L3 deletes the associated disk files. Clearing all levels also resets the metrics counters.
cleanup_expired
def cleanup_expired(self) -> intScans all three levels and removes every expired entry. Returns the total number of entries removed. This is also invoked automatically by the background cleanup timer if config.auto_cleanup_interval is set.
get_stats
def get_stats(self) -> dictReturns a dictionary with per-level and aggregate statistics.
| Key | Type | Description |
|---|---|---|
l1_items |
int |
Current entry count in L1 |
l1_size_mb |
float |
Current memory used by L1 entries (MB) |
l1_max_items |
int |
Configured L1 capacity |
l1_utilization |
float |
L1 fill percentage (0–100) |
l2_items |
int |
Current entry count in L2 |
l2_size_mb |
float |
Current memory used by L2 entries (MB) |
l2_utilization |
float |
L2 fill percentage (0–100) |
l3_items |
int |
Current entry count in L3 |
l3_size_mb |
float |
Disk space used by L3 files (MB) |
l3_utilization |
float |
L3 fill percentage (0–100) |
total_items |
int |
Sum of entries across all levels |
total_size_mb |
float |
Total memory + disk footprint (MB) |
strategy |
str |
Active eviction strategy name |
overall_hit_rate |
float |
Fraction of get() calls that returned a value |
l1_hit_rate |
float |
L1-specific hit rate |
l2_hit_rate |
float |
L2-specific hit rate |
l3_hit_rate |
float |
L3-specific hit rate |
evictions |
int |
Total eviction events |
promotions |
int |
Total promotion events |
expirations |
int |
Total expiration events |
get_keys
def get_keys(self, level: Optional[int] = None) -> List[str]Returns a list of all hashed keys currently stored in the specified level, or across all levels (deduplicated) if level is None. Keys are the SHA-256 hex strings of the original keys, not the originals.
get_entry_info
def get_entry_info(self, key: str) -> Optional[dict]Returns detailed metadata for a specific entry, or None if not found. Useful for debugging cache behaviour.
| Key | Type | Description |
|---|---|---|
level |
int |
Cache level where the entry resides (1, 2, or 3) |
hits |
int |
Number of times the entry has been accessed (L1/L2 only) |
age |
float |
Seconds since the entry was created |
idle_time |
float |
Seconds since the entry was last accessed (L1/L2 only) |
size_bytes |
int |
Serialised size in bytes (L1/L2 only) |
ttl |
Optional[float] |
Configured TTL in seconds (None = no expiry) |
expired |
bool |
Whether the entry has passed its TTL |
path |
str |
Disk file path (L3 only) |
Batch Operations
For high-throughput scenarios where multiple keys need to be read or written together, MultiLevelCache provides three batch methods that iterate internally without requiring the caller to manage individual calls.
get_many
def get_many(self, keys: List[str]) -> Dict[str, Any]Retrieves multiple values in a single call. Returns a dictionary containing only the keys that were found and had not expired. Missing or expired keys are absent from the result — they do not map to None.
results = cache.get_many(["key_a", "key_b", "key_c"])
# → {"key_a": ..., "key_c": ...} (key_b was a miss)set_many
def set_many(self, items: Dict[str, Any], ttl: Optional[float] = None) -> intStores multiple key-value pairs in a single call, applying the same ttl to all entries. Returns the number of entries successfully stored.
stored = cache.set_many({"key_a": val_a, "key_b": val_b}, ttl=600.0)
# → 2delete_many
def delete_many(self, keys: List[str]) -> intDeletes multiple keys across all cache levels. Returns the number of keys that were actually found and deleted.
removed = cache.delete_many(["key_a", "key_b", "stale_key"])
# → 2 (stale_key was not present)Async API
MultiLevelCache provides async wrappers for the three most common operations, implemented via asyncio.to_thread so they are non-blocking in an async context without requiring any changes to the underlying synchronous implementation.
async def aget(self, key: str) -> Optional[Any]
async def aset(self, key: str, value: Any, ttl: Optional[float] = None) -> bool
async def adelete(self, key: str) -> boolThese are suitable for use inside FastAPI route handlers, async tasks, or any async def function. For batch async operations, wrap get_many, set_many, and delete_many with asyncio.to_thread directly.
# FastAPI example
@app.get("/data/{key}")
async def get_data(key: str):
value = await cache.aget(key)
if value is None:
value = await fetch_from_db(key)
await cache.aset(key, value, ttl=300.0)
return {"data": value}Context Manager
MultiLevelCache supports both sync and async context managers.
# Sync context manager
with MultiLevelCache(l1_max_items=100, persist_l3=False) as cache:
cache.set("session_data", payload, ttl=3600.0)
result = cache.get("session_data")
# L1 and L2 cleared on exit; L3 deleted because persist_l3=False
# Async context manager
async with MultiLevelCache(persist_l3=True) as cache:
await cache.aset("key", value)On __exit__ / __aexit__, the background cleanup timer is cancelled. If persist_l3=True, only L1 and L2 are cleared; L3 disk files remain for the next run. If persist_l3=False, all three levels are cleared and all L3 disk files are deleted.
Quick Start
from fennec_memory.cache import MultiLevelCache, CacheStrategy
# Basic usage with LRU eviction
cache = MultiLevelCache(
l1_max_items=256,
l2_max_items=2048,
l3_max_items=20000,
strategy=CacheStrategy.LRU,
)
# Store a value
cache.set("user:42:profile", {"name": "Alice", "plan": "pro"}, ttl=1800.0)
# Retrieve it
profile = cache.get("user:42:profile")
# Membership test
if "user:42:profile" in cache:
print("profile is cached")
# Batch fill on application startup
cache.set_many({
"config:feature_flags": flags,
"config:rate_limits": limits,
"config:pricing": pricing,
}, ttl=3600.0)
# Inspect state
print(cache)
# MultiLevelCache(L1=4/256, L2=0/2048, L3=0/20000, strategy=lru, hit_rate=75.00%)
stats = cache.get_stats()
print(f"Overall hit rate: {stats['overall_hit_rate']:.1%}")
print(f"Total items: {stats['total_items']}")
# Cleanup
cache.clear()Relationship to CacheManager
See Section 14 — CacheManager vs MultiLevelCache for a full side-by-side comparison, decision guide, and usage examples for each component.
13. Low-Level Data Models
This section documents the data models used internally by MultiLevelCache and related infrastructure. These classes are not part of the CacheManager public API but are exposed for direct use with MultiLevelCache, custom storage integrations, or instrumentation.
CacheEntry
Represents a single cached item inside MultiLevelCache. Each entry tracks the cached value along with access metadata used by eviction strategies.
from fennec_memory.cache import CacheEntry
entry = CacheEntry(
key="hashed_key_hex",
value=my_object,
ttl=600.0,
)Fields
| Field | Type | Default | Description |
|---|---|---|---|
key |
str |
Required | SHA-256 hashed key (as stored internally) |
value |
Any |
Required | The cached value |
created_at |
float |
time.time() |
Unix timestamp of creation |
last_access |
float |
time.time() |
Unix timestamp of most recent access |
hits |
int |
0 |
Number of times the entry has been accessed |
ttl |
Optional[float] |
None |
Time-to-live in seconds; None means no expiry |
size_bytes |
int |
Auto-computed | Serialised size estimate via pickle.dumps; falls back to config.size_bytes (default 1 KB) on failure |
Methods
| Method | Returns | Description |
|---|---|---|
increment_hits() |
None |
Increments the hit counter and updates last_access to the current time |
is_expired() |
bool |
Returns True if ttl is set and time.time() > created_at + ttl |
age() |
float |
Seconds elapsed since created_at |
idle_time() |
float |
Seconds elapsed since last_access |
get_stats() |
dict |
Returns a snapshot dictionary with key, hits, age_seconds, idle_seconds, size_bytes, is_expired, and ttl |
entry = CacheEntry(key="abc123", value={"result": 42}, ttl=300.0)
# After some accesses:
entry.increment_hits()
print(entry.hits) # 1
print(entry.age()) # seconds since creation
print(entry.idle_time()) # seconds since last access
print(entry.is_expired()) # False (within TTL)
print(entry.get_stats())
# {'key': 'abc123', 'hits': 1, 'age_seconds': 0.002, 'idle_seconds': 0.0,
# 'size_bytes': 32, 'is_expired': False, 'ttl': 300.0}CacheMetrics
A lightweight dataclass that accumulates per-level hit, miss, eviction, promotion, and expiration counters for a MultiLevelCache instance. Each MultiLevelCache owns one CacheMetrics object at cache.metrics.
Note:
CacheMetricsis distinct fromCacheMetricsCollector, which is used byCacheManagerand tracks LLM-specific signals such as cost savings, latency histograms, and semantic similarity.CacheMetricsis simpler and records only raw cache-level operation counts.
Fields
| Field | Type | Description |
|---|---|---|
l1_hits |
int |
L1 exact hits |
l2_hits |
int |
L2 exact hits |
l3_hits |
int |
L3 disk hits |
l1_misses |
int |
L1 misses |
l2_misses |
int |
L2 misses |
l3_misses |
int |
L3 misses |
sets |
int |
Total set() calls |
evictions |
int |
Total eviction events |
promotions |
int |
Total promotion events (entry moved up a level) |
expirations |
int |
Total expiration events (entry removed due to TTL) |
Methods
| Method | Signature | Description |
|---|---|---|
record_get |
(level: int, hit: bool) -> None |
Records a get operation result for the given level (1, 2, or 3) |
record_set |
() -> None |
Increments the sets counter |
record_eviction |
() -> None |
Increments the evictions counter |
record_promotion |
() -> None |
Increments the promotions counter |
record_expiration |
() -> None |
Increments the expirations counter |
get_hit_rate |
(level: Optional[int] = None) -> float |
Returns hit rate for the specified level, or overall if None. Returns 0.0 if no requests have been recorded. |
get_stats |
() -> dict |
Returns all counters plus computed hit rates as a flat dictionary |
reset |
() -> None |
Resets all counters to zero |
# Access metrics directly from a MultiLevelCache instance
cache = MultiLevelCache(l1_max_items=100)
cache.set("key", "value")
cache.get("key")
cache.get("missing")
m = cache.metrics
print(m.get_hit_rate()) # 0.5 (1 hit out of 2 total gets)
print(m.get_hit_rate(level=1)) # 0.5
print(m.get_stats())
# {'l1_hits': 1, 'l2_hits': 0, 'l3_hits': 0, 'total_hits': 1,
# 'l1_misses': 1, 'l2_misses': 1, 'l3_misses': 1, 'total_misses': 3,
# 'l1_hit_rate': 0.5, 'l2_hit_rate': 0.0, 'l3_hit_rate': 0.0,
# 'overall_hit_rate': 0.25, 'sets': 1, 'evictions': 0,
# 'promotions': 0, 'expirations': 0}
m.reset()
print(m.l1_hits) # 0CacheStrategy
An Enum of eviction strategies used by MultiLevelCache. Import it to pass as the strategy constructor argument. See Section 12 — Eviction Strategies for a full description of each strategy's behaviour.
from fennec_memory.cache import CacheStrategy
class CacheStrategy(Enum):
LRU = "lru" # Least Recently Used
LFU = "lfu" # Least Frequently Used
FIFO = "fifo" # First In First Out
TTL = "ttl" # Time To Live based
ADAPTIVE = "adaptive" # Adaptive (frequency + recency composite score)from fennec_memory.cache import MultiLevelCache, CacheStrategy
# Use ADAPTIVE for workloads with mixed hot/cold access patterns
cache = MultiLevelCache(strategy=CacheStrategy.ADAPTIVE)14. CacheManager vs MultiLevelCache
Both components provide multi-level caching, but they are designed for fundamentally different problems and should not be treated as alternatives to one another. This section explains what each component is, where it differs from the other, and how to decide which one to use.
The Core Difference
CacheManager is an intelligent LLM query router. Its job is to intercept natural-language queries before they reach a language model, find semantically equivalent cached answers, and decide — using a cost-aware utility function and a reinforcement-learning policy — whether a cached answer is good enough to serve or whether a fresh LLM call is warranted. It understands tokens, costs, tenants, and the fuzzy nature of language.
MultiLevelCache is a general-purpose in-process key-value store with a three-level memory hierarchy (RAM → RAM → Disk). Its job is to store arbitrary Python objects under string keys and serve them quickly on repeated access, automatically promoting hot data toward the fastest layer and demoting cold data toward the slowest. It understands nothing about LLMs, language, or costs — only keys, values, and TTLs.
Put differently: CacheManager answers the question "Is this query semantically close enough to something I've seen before?" while MultiLevelCache answers the question "Have I seen this exact key before, and where did I put it?"
Architecture Comparison
| Aspect | CacheManager |
MultiLevelCache |
|---|---|---|
| Primary purpose | LLM response caching with semantic matching | General-purpose key-value caching |
| Lookup strategy | Exact key → Storage → Semantic (FAISS) | L1 memory → L2 memory → L3 disk |
| Semantic search | Yes — FAISS cosine similarity over dense vectors | No — exact key match only (SHA-256 hash) |
| Embedding model | Required (OpenAI / HuggingFace / Ollama / Mock) | Not used |
| Eviction policy | Thompson Sampling RL bandit (learns from feedback) | LRU / LFU / FIFO / TTL / Adaptive (fixed strategy) |
| Feedback loop | Yes — feedback() updates bandit arms and similarity threshold |
No |
| Disk persistence | Via pluggable BaseStorage backend (SQLite / Redis / Memory) |
L3 pickle files in config.cache_dir |
| Multi-tenancy | Yes — namespace isolation, per-tenant quotas, shared cache | No — single shared namespace |
| Security | Prompt injection detection, PII scrubbing, HMAC integrity | None |
| Cost tracking | Yes — USD saved, ROI multiplier, per-tenant cost accounting | No |
| Async model | Native asyncio throughout |
Sync core with asyncio.to_thread wrappers |
| Concurrency | Thread-safe + async-safe; inflight coalescer for embeddings | Thread-safe via threading.RLock |
| Instantiation | await CacheManager.create(config) (async factory, required) |
MultiLevelCache(...) (sync constructor) |
| Teardown | await manager.aclose() or manager.close() |
cache.clear() or context manager |
| Dependencies | FAISS, embedding provider, storage backend | None beyond stdlib |
Lookup Pipeline Comparison
CacheManager.get(query)
Query string
│
▼
SecurityGuard.validate_query() ← reject injections
│
▼
TenantManager.check_and_charge() ← RPM quota check
│
▼
QueryNormalizer.normalize() ← canonical form + SHA-256 key
│
▼
_L1ExactCache.get() ← in-process LRU (per tenant)
│ MISS
▼
Storage.get() ← Redis / SQLite / Memory
│ MISS
▼
CachedEmbedder.embed_single() ← dense vector (coalesced)
│
▼
EmbeddingIndex.search() ← FAISS nearest-neighbour
│
▼
CachePolicyLearner.rank_candidates() ← Thompson Sampling re-rank
│
▼
DecisionEngine.decide() ← SEMANTIC_HIT or LLM_FALLBACKMultiLevelCache.get(key)
Raw key string
│
▼
SHA-256 hash ← deterministic key normalisation
│
▼
L1 OrderedDict lookup ← in-process memory (~μs)
│ MISS
▼
L2 OrderedDict lookup ← in-process memory (~μs)
│ HIT → promote to L1 if hot enough
│ MISS
▼
L3 disk lookup ← pickle file read (~ms)
│ HIT → promote to L2
▼
return NoneThe key difference is steps 4–8 of CacheManager: the embedding, FAISS search, RL ranking, and cost-aware decision. MultiLevelCache skips all of that — it returns the value or None, with no probabilistic reasoning.
Data Model Comparison
CacheManager |
MultiLevelCache |
|
|---|---|---|
| Entry type | IntelligentCacheEntry |
CacheEntry |
| Key type | tenant_id::SHA-256(normalized_query) |
SHA-256(raw_key) |
| Value type | Any serialisable object (typically an LLM response string) | Any pickle-serialisable Python object |
| Metadata | Bandit arm (α/β), HMAC hash, cost record, embedding vector, tenant ID, quality score | Hit count, creation time, last access time, size bytes |
| Hit result | CacheLookupResult with hit, decision, similarity, latency_ms, cost_saved_usd |
Raw value or None |
| Metrics object | CacheMetricsCollector — tracks latency histograms, cost, semantic similarity, per-tenant stats |
CacheMetrics — tracks hit/miss/eviction/promotion counts per level |
Choosing the Right Component
Use CacheManager when:
- You are caching responses from an LLM API (OpenAI, Anthropic, local models, etc.)
- Queries may be phrased differently but mean the same thing (semantic equivalence matters)
- You need to track cost savings and ROI from caching
- You have multiple tenants or user groups that need isolated cache namespaces
- You want the system to learn over time which cached responses are high quality
- You need prompt injection protection or PII scrubbing before storing data
- You are building a RAG pipeline, chatbot, or any system where query latency and LLM cost are concerns
Use MultiLevelCache when:
- You are caching arbitrary computed results (database query results, API responses, parsed configs, deserialized objects)
- Keys are exact and deterministic — the same input always produces the same key
- You do not need semantic matching, tenancy, or security features
- You want zero external dependencies (no embedding model, no FAISS, no Redis required)
- You need L3 disk persistence for data that survives process restarts but is expensive to recompute
- You are caching in a context where
CacheManager's LLM-specific pipeline would be unnecessary overhead
Use both together when:
The two components are fully independent and can coexist in the same application. A common pattern is to use MultiLevelCache for application-level data (feature flags, user sessions, rate limit counters, expensive DB queries) while CacheManager handles all LLM query caching in the same process.
from fennec_memory.cache import (
MultiLevelCache, CacheStrategy,
CacheManager, CacheConfig, StorageBackend,
EmbeddingConfig, EmbeddingProvider,
)
# Application startup — both caches initialised independently
config_cache = MultiLevelCache(
l1_max_items=512,
strategy=CacheStrategy.LRU,
persist_l3=True,
)
llm_cache = await CacheManager.create(CacheConfig(
storage_backend=StorageBackend.SQLITE,
embedding=EmbeddingConfig(provider=EmbeddingProvider.OPENAI),
))
# Request handler — each cache used for its intended purpose
async def handle_request(user_id: str, query: str):
# MultiLevelCache: exact key lookup for user profile (cheap, deterministic)
profile = config_cache.get(f"user:{user_id}:profile")
if profile is None:
profile = await db.fetch_user(user_id)
config_cache.set(f"user:{user_id}:profile", profile, ttl=300.0)
# CacheManager: semantic lookup for LLM response (expensive, fuzzy)
result = await llm_cache.get(query, tenant_id=user_id)
if result.hit:
answer = result.response
else:
answer = await call_llm(query, context=profile)
await llm_cache.put(query, answer, tenant_id=user_id, response_tokens=400)
return answerSummary
| Question | Answer |
|---|---|
| Are they interchangeable? | No. They solve different problems at different layers. |
| Can they run in the same process? | Yes. They are fully independent and have no shared state. |
Does CacheManager use MultiLevelCache internally? |
No. CacheManager has its own _L1ExactCache and delegates to BaseStorage. |
| Which is faster for exact key lookups? | Both hit in-process memory at ~μs. MultiLevelCache has less overhead per lookup (no quota check, no normalisation pipeline). |
| Which should I start with? | If your use case involves LLM queries, start with CacheManager. For everything else, start with MultiLevelCache. |
Simple Real Example
from fennec_community.llm import GeminiInterface
from fennec_community.document_loaders import TextLoader
from fennec_community.vector_database import FAISSVectorDatabase
from fennec_community.chunks import ArabicTextChunker
from fennec_community.context import ContextManager
from fennec_community.embeddings import OllamaEmbedder
from fennec_community.rag.core import RAGSystem
from fennec_memory.cache import MultiLevelCache, CacheConfig, CacheStrategy
loader_1 = TextLoader("./data_kn/faq.txt").load()
chunker = ArabicTextChunker(chunk_size=100, overlap=20)
embedder = OllamaEmbedder()
vector_db = FAISSVectorDatabase(embedder=embedder)
llm = GeminiInterface(api_key=llm_api)
context_manager = ContextManager()
rag_system = RAGSystem(llm=llm, vector_db=vector_db,chunker=chunker, context_manager=context_manager)
rag_system.add_documents(loader_1)
cache_config = CacheConfig(
l1_max_items=100,
l2_max_items=500,
default_ttl=600,
enable_stats=True,
)
cache = MultiLevelCache(strategy=CacheStrategy.LRU, config=cache_config)
from typing import Tuple
import time
def cached_rag_query(query: str) -> Tuple[str, bool]:
cache_key = f"rag:query:{query.strip().lower()}"
cached = cache.get(cache_key)
if cached:
return cached, True
answer = rag_system.generate(query)
cache.set(cache_key, answer)
return answer, False
queries = [
"ماهي طرق الدفع المتاحه",
"ماهي اوقات العمل ",
"ماهي طرق الدفع المتاحة",
"ماهي طرق الدفع المتاحه"
]
for q in queries:
t0 = time.perf_counter()
answer, from_cache = cached_rag_query(q)
elapsed = (time.perf_counter() - t0) * 1000
source = "⚡ Cache" if from_cache else "🔄 RAG"
print(f" {source} ({elapsed:.1f}ms)")
print(f" Q: {q}")
print(f" A: {answer[:80]}...")
print()
memory/cache_module_docs.md