Plugin System Modular
Table of Contents
- Overview
- Architecture
- Quick Start
- Core Classes
- Plugin Types
- Metadata System
- Security Layer
- AI & Observability
- Enumerations
- Exception Hierarchy
- Complete Usage Example
Overview
The Plugin System is a production-grade, AI-ready plugin ecosystem designed for LLM applications like 'RAG' and 'AI-Agent'. It provides a unified interface for defining, discovering, registering, securing, and executing plugins of four canonical types: Retrieval, Tool, Action, and Processing.
Key capabilities include:
- Auto-discovery of plugin classes from directories
- Hot-reload support for development and production deployments
- AI-native selection — LLMs and agents can select plugins by natural-language query
- LLM tool descriptors — automatic generation of OpenAI and Anthropic function-calling schemas
- Three-layer security — permission enforcement, input sanitization, and execution sandboxing
- Structured observability — per-plugin execution metrics, health scores, and error journals
- Async-first — all I/O operations are fully asynchronous (
asyncio)
Architecture
PluginManager (central façade)
├── PluginRegistry — catalog & index of all plugins
├── PluginLoader — auto-discovery & hot-reload
├── PermissionGuard — per-plugin permission enforcement
├── ExecutionSandbox — hard-timeout async wrapper
├── AIPluginSelector — keyword + semantic routing
└── PluginObservability — metrics store & dashboards
BasePlugin (abstract base)
├── RetrievalPlugin — fetch / search documents
├── ToolPlugin — call external APIs & utilities
├── ActionPlugin — write / mutate external state
└── ProcessingPlugin — transform, rerank, summarizeQuick Start
from fennec_community.plugins import (
PluginManager, PluginMetadata, SchemaProperty,
ExecutionContext, RetrievalPlugin, PluginConfig,
)
# 1. Create the manager
manager = PluginManager(
plugin_dirs=["./my_plugins"],
safe_mode=False,
)
# 2. Start up (auto-discovers and initializes all plugins)
await manager.startup()
# 3. Execute a plugin by name
context = ExecutionContext(query="RAG 2025", session_id="s-001")
result = await manager.execute("web_search", {"query": "RAG 2025"}, context)
# 4. Let the AI selector pick the best plugin
plugin = manager.select_one("find recent papers on LLMs")
result = await manager.execute(plugin.name, {"query": "LLM papers"}, context)
# 5. Get LLM tool descriptors
openai_tools = manager.tool_descriptors() # OpenAI format
anthropic_tools = manager.anthropic_tools() # Anthropic format
# 6. Shut down gracefully
await manager.shutdown()Core Classes
PluginManager
from fennec_community.plugins import PluginManagerThe central façade of the framework. Wires together the registry, loader, security, AI selector, and observability into a single coherent API. This is the primary entry point for all plugin operations.
Constructor
PluginManager(
plugin_dirs: Optional[List[str]] = None,
system_version: str = "1.0.0",
safe_mode: bool = False,
auto_discover: bool = True,
default_config: Optional[PluginConfig] = None,
embed_fn: Optional[Callable[[str], List[float]]] = None,
hard_timeout: float = 60.0,
)| Parameter | Type | Default | Description |
|---|---|---|---|
plugin_dirs |
List[str] |
None |
Filesystem directories to scan for plugin files during auto-discovery. |
system_version |
str |
"1.0.0" |
Semantic version of the host system, used for plugin compatibility checks. |
safe_mode |
bool |
False |
When True, enforces strict permission checks before every execution and during registration. |
auto_discover |
bool |
True |
Automatically scan plugin_dirs during startup(). |
default_config |
PluginConfig |
None |
Default runtime configuration applied to every discovered plugin. |
embed_fn |
Callable |
None |
Optional embedding function (str) → List[float] used by the AI selector for semantic re-ranking. |
hard_timeout |
float |
60.0 |
Hard execution timeout in seconds enforced by the sandbox. |
Lifecycle Methods
startup
async def startup() -> NonePurpose: Initializes the manager. Triggers auto-discovery of plugins from all configured directories, grants required permissions (in non-safe mode), calls initialize() on every discovered plugin, and fires the system.startup hook.
Returns: None
Raises: Does not raise directly; individual plugin initialization failures are logged and the plugin is marked ERROR.
When to call: Once, before any calls to execute().
await manager.startup()shutdown
async def shutdown() -> NonePurpose: Gracefully shuts down the manager. Stops hot-reload background tasks, fires the system.shutdown hook, and calls cleanup() on every registered plugin. After shutdown, execute() raises PluginError.
Returns: None
When to call: Once, when the application is stopping (e.g., inside a finally block or lifespan context).
await manager.shutdown()Registration Methods
register
async def register(plugin: BasePlugin) -> boolPurpose: Manually registers a plugin instance that was not auto-discovered. Runs a compatibility check against system_version, verifies all declared dependencies are already registered, enforces permission policy in safe mode, then calls initialize().
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin |
BasePlugin |
A fully constructed plugin instance (with a valid METADATA class attribute). |
Returns: bool — True if registration and initialization succeeded; False otherwise.
Fires hook: plugin.registered
my_plugin = MyRetrievalPlugin()
ok = await manager.register(my_plugin)
if not ok:
print("Registration failed — check logs for details.")unregister
async def unregister(name: str, force: bool = False) -> boolPurpose: Unregisters a plugin by name. By default, refuses to unregister if other plugins declare this plugin as a dependency. Calls cleanup() before removal.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
The registered name of the plugin to remove. |
force |
bool |
If True, skips the dependency check and unregisters unconditionally. |
Returns: bool — True on success; False if the plugin was not found or had dependents (and force=False).
Fires hook: plugin.unregistered
await manager.unregister("web_search")
await manager.unregister("core_db", force=True) # bypass dependency checkExecution Methods
execute
async def execute(
name: str,
input_data: Dict[str, Any],
context: Optional[ExecutionContext] = None,
use_retry: bool = False,
) -> AnyPurpose: The primary execution entry point. Looks up the plugin by name, enforces permission policy, sanitizes inputs against the plugin's schema, and delegates to either safe_execute or safe_execute_with_retry. Records metrics for every call.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
Registered name of the plugin to execute. |
input_data |
Dict[str, Any] |
Raw input dictionary. Will be sanitized and validated against the plugin's input_schema. |
context |
ExecutionContext |
Runtime context carrying query, session, memory, and cache handles. Defaults to an empty context. |
use_retry |
bool |
If True, wraps execution in exponential-backoff retry up to config.max_retries. |
Returns: Any — the value returned by the plugin's execute() method (type depends on plugin type).
Raises:
PluginError— plugin disabled, manager shutting down, or unexpected runtime error.PluginTimeoutError— execution exceededconfig.timeout.PluginValidationError— input failed schema validation or injection check.PluginPermissionError— permission check failed (whensafe_mode=True).KeyError— plugin name not registered.
Fires hooks: plugin.executed on success; plugin.failed on error.
context = ExecutionContext(query="climate change 2025", session_id="s-42")
result = await manager.execute(
"web_search",
{"query": "climate change 2025", "top_k": 5},
context,
use_retry=True,
)execute_batch
async def execute_batch(
calls: List[Tuple[str, Dict[str, Any]]],
context: Optional[ExecutionContext] = None,
parallel: bool = True,
) -> Dict[str, Any]Purpose: Executes multiple plugins in a single call. In parallel mode, all plugins run concurrently via asyncio.gather. In sequential mode, plugins run one after another. Failures are captured per-plugin without propagating to the others.
Parameters:
| Parameter | Type | Description |
|---|---|---|
calls |
List[Tuple[str, Dict]] |
List of (plugin_name, input_data) pairs. |
context |
ExecutionContext |
Shared runtime context for all calls. |
parallel |
bool |
True (default) runs all calls concurrently; False runs them sequentially. |
Returns: Dict[str, Any] — mapping of plugin_name → result. On failure, the value is {"error": "<error message>"}.
results = await manager.execute_batch(
calls=[
("web_search", {"query": "RAG 2025"}),
("pdf_reader", {"url": "https://example.com/paper.pdf"}),
("summarizer", {"items": []}),
],
context=context,
parallel=True,
)
# results["web_search"] → search results
# results["pdf_reader"] → {"error": "..."} if it failedexecute_pipeline
async def execute_pipeline(
pipeline: List[str],
initial: Dict[str, Any],
context: Optional[ExecutionContext] = None,
) -> AnyPurpose: Chains plugins sequentially into a data pipeline. The output of plugin N is passed as the input to plugin N+1. If an intermediate result is not a dict, it is wrapped as {} for the next step.
Parameters:
| Parameter | Type | Description |
|---|---|---|
pipeline |
List[str] |
Ordered list of plugin names to execute in sequence. |
initial |
Dict[str, Any] |
Input data for the first plugin in the pipeline. |
context |
ExecutionContext |
Shared runtime context passed to every plugin in the chain. |
Returns: Any — the final output of the last plugin in the pipeline.
result = await manager.execute_pipeline(
pipeline=["web_search", "reranker", "summarizer"],
initial={"query": "quantum computing 2025"},
context=context,
)auto_execute
async def auto_execute(
query: str,
input_data: Optional[Dict[str, Any]] = None,
context: Optional[ExecutionContext] = None,
**selector_kwargs,
) -> Optional[Any]Purpose: Combines AI selection and execution into a single call. The AI selector picks the best matching plugin for the natural-language query, then executes it. Returns None if no suitable plugin is found.
Parameters:
| Parameter | Type | Description |
|---|---|---|
query |
str |
Natural-language description of the task (used for plugin selection). |
input_data |
Dict |
Explicit input for the plugin. Defaults to {"query": query} if omitted. |
context |
ExecutionContext |
Runtime execution context. |
**selector_kwargs |
Additional keyword arguments forwarded to AIPluginSelector.select_one() (e.g., plugin_type, max_cost). |
Returns: Any — the execution result, or None if no plugin was selected.
result = await manager.auto_execute(
query="search for recent papers on vector databases",
context=context,
plugin_type="retrieval",
)AI Selection Methods
select
def select(
query: str,
plugin_type: Optional[str] = None,
max_cost: Optional[CostTier] = None,
top_k: int = 3,
) -> List[BasePlugin]Purpose: Returns the top-K plugins best matching the given natural-language query. Filters by type and cost tier before scoring. Uses keyword scoring always; adds semantic cosine re-ranking when an embed_fn was provided at construction.
Parameters:
| Parameter | Type | Description |
|---|---|---|
query |
str |
Natural-language description of what the agent needs. |
plugin_type |
str |
Restrict candidates to a specific type: "retrieval", "tool", "action", "processing". |
max_cost |
CostTier |
Exclude plugins more expensive than this tier. |
top_k |
int |
Maximum number of plugins to return. |
Returns: List[BasePlugin] — sorted by relevance score, highest first.
candidates = manager.select(
query="search the web for latest AI news",
plugin_type="tool",
max_cost=CostTier.MODERATE,
top_k=3,
)select_one
def select_one(query: str, **kwargs) -> Optional[BasePlugin]Purpose: Convenience wrapper around select() that returns only the single best matching plugin. Returns None if no candidates pass the filters.
Parameters:
| Parameter | Type | Description |
|---|---|---|
query |
str |
Natural-language task description. |
**kwargs |
Forwarded to select(): plugin_type, max_cost, top_k. |
Returns: Optional[BasePlugin] — the best matching plugin instance, or None.
plugin = manager.select_one("find documents about climate change")
if plugin:
result = await manager.execute(plugin.name, {"query": "climate change"}, context)LLM Integration Methods
tool_descriptors
def tool_descriptors() -> List[Dict[str, Any]]Purpose: Returns OpenAI-format function-calling descriptors for all enabled plugins. Use this to feed plugin capabilities directly to an OpenAI-compatible LLM so it can autonomously choose and invoke them.
Returns: List[Dict[str, Any]] — each element is an OpenAI function descriptor:
{
"type": "function",
"function": {
"name": "web_search",
"description": "...",
"parameters": { "type": "object", "properties": {...}, "required": [...] }
}
}tools = manager.tool_descriptors()
response = openai_client.chat.completions.create(
model="gpt-4o", messages=[...], tools=tools
)anthropic_tools
def anthropic_tools() -> List[Dict[str, Any]]Purpose: Returns Anthropic Claude tool-use descriptors for all enabled plugins. Equivalent to tool_descriptors() but formatted per Anthropic's API specification.
Returns: List[Dict[str, Any]] — each element follows Anthropic's tool schema:
{
"name": "web_search",
"description": "...",
"input_schema": { "type": "object", "properties": {...}, "required": [...] }
}tools = manager.anthropic_tools()
response = anthropic_client.messages.create(
model="claude-opus-4-6", messages=[...], tools=tools
)Hot-Reload Methods
enable_hot_reload
async def enable_hot_reload(interval: float = 5.0) -> NonePurpose: Starts a background asyncio.Task that polls registered plugin files every interval seconds. When a changed file is detected, it automatically unregisters the old plugin versions, re-imports the module, registers the new versions, and initializes them. Enables zero-downtime plugin updates during development and in production.
Parameters:
| Parameter | Type | Description |
|---|---|---|
interval |
float |
Polling interval in seconds. Defaults to 5.0. |
Returns: None
await manager.enable_hot_reload(interval=3.0)disable_hot_reload
async def disable_hot_reload() -> NonePurpose: Cancels the hot-reload background task and waits for it to terminate cleanly.
Returns: None
await manager.disable_hot_reload()Hook System Methods
on
def on(event: str, callback: Callable, priority: int = 50) -> NonePurpose: Registers a hook callback for a named lifecycle event. Callbacks are invoked in descending priority order. Both sync and async callbacks are supported.
Parameters:
| Parameter | Type | Description |
|---|---|---|
event |
str |
Event name. Built-in events: system.startup, system.shutdown, plugin.registered, plugin.unregistered, plugin.executed, plugin.failed. |
callback |
Callable |
Function or coroutine to call when the event fires. Receives event-specific keyword arguments. |
priority |
int |
Higher values are called first. Default 50. |
Returns: None
async def on_plugin_executed(plugin, result, **kwargs):
print(f"Plugin {plugin.name} returned: {result}")
manager.on("plugin.executed", on_plugin_executed, priority=100)off
def off(event: str, callback: Callable) -> NonePurpose: Removes a previously registered hook callback. Identified by object identity.
Parameters:
| Parameter | Type | Description |
|---|---|---|
event |
str |
The event name the callback was registered under. |
callback |
Callable |
The exact callback object to remove. |
Returns: None
manager.off("plugin.executed", on_plugin_executed)Observability Methods
dashboard
def dashboard() -> Dict[str, Any]Purpose: Returns a full health and metrics snapshot of the entire plugin ecosystem. Includes system metadata, per-plugin stats and health scores, aggregate metrics, and recent errors. Useful for monitoring endpoints, admin UIs, and debugging.
Returns: Dict[str, Any] with the following structure:
{
"system_version": "1.0.0",
"safe_mode": False,
"hot_reload": False,
"total_plugins": 5,
"enabled_plugins": 4,
"plugins": [
{
"name": "web_search", "version": "1.0.0", "status": "enabled",
"execution": {"total": 42, "success": 40, "failure": 2, "success_rate": "95.2%"},
"timing": {"avg_s": 0.42, "min_s": 0.1, "max_s": 2.1, ...},
"health": {"label": "excellent", "score": 100, ...},
},
...
],
"metrics": {
"total_executions": 150,
"total_failures": 5,
"error_rate_pct": 3.3,
"top_plugins": [...],
"recent_errors": [...],
}
}health_check_all
def health_check_all() -> Dict[str, Dict[str, Any]]Purpose: Returns the health status of every registered plugin, keyed by plugin name. A lightweight alternative to dashboard() when only health information is needed.
Returns: Dict[str, Dict] — maps plugin name to its health report:
{
"web_search": {"label": "excellent", "score": 100, "status": "enabled", "enabled": True},
"pdf_reader": {"label": "degraded", "score": 60, "status": "enabled", "enabled": True},
}Health labels and their score thresholds:
| Label | Score | Success Rate |
|---|---|---|
untested |
0 | No executions yet |
excellent |
100 | ≥ 95% |
good |
80 | ≥ 80% |
degraded |
60 | ≥ 60% |
critical |
30 | < 60% |
Convenience Accessors
get
def get(name: str) -> Optional[BasePlugin]Purpose: Retrieves a registered plugin instance by name. Returns None if not found (non-raising alternative to registry.get_or_raise).
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
The plugin's registered name. |
Returns: Optional[BasePlugin]
plugin = manager.get("web_search")
if plugin:
print(plugin.get_stats())list_plugins
def list_plugins(plugin_type: Optional[str] = None) -> List[Dict[str, Any]]Purpose: Returns a serializable summary of all registered plugins. Optionally filtered by type. Useful for populating UI lists, API responses, or agent tool discovery.
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_type |
str |
Optional type filter: "retrieval", "tool", "action", "processing". |
Returns: List[Dict[str, Any]] — each dict contains name, version, type, enabled, status, capabilities, tags, cost_tier.
retrieval_plugins = manager.list_plugins(plugin_type="retrieval")grant_permission
def grant_permission(plugin_name: str, permission: str) -> NonePurpose: Grants a named permission string to a plugin at runtime. Delegates to PermissionGuard.grant().
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_name |
str |
The plugin's registered name. |
permission |
str |
Permission string (e.g., "network_access", "file_write"). |
Returns: None
manager.grant_permission("web_search", "network_access")revoke_permission
def revoke_permission(plugin_name: str, permission: str) -> NonePurpose: Revokes a named permission from a plugin at runtime. After this call, any check for that permission returns False and future executions in safe_mode will be blocked.
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_name |
str |
The plugin's registered name. |
permission |
str |
Permission string to revoke. |
Returns: None
manager.revoke_permission("web_search", "network_access")PluginRegistry
from fennec_community.plugins import PluginRegistryThe single source of truth for all registered plugins. Maintains name-indexed, type-indexed, and tag-indexed lookup structures. Used internally by PluginManager but also accessible directly for advanced use cases.
register
def register(plugin: BasePlugin) -> NonePurpose: Registers a plugin instance. Indexes it by name, type, and tags for fast lookup. Raises KeyError if a plugin with the same name is already registered.
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin |
BasePlugin |
The plugin instance to register. |
Returns: None
Raises: KeyError if the plugin name is already registered.
unregister
def unregister(name: str) -> NonePurpose: Removes a plugin by name and cleans up all indexes. Raises KeyError if the plugin is not registered.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
The plugin's registered name. |
Returns: None
Raises: KeyError if the name is not found.
get
def get(name: str) -> Optional[BasePlugin]Purpose: Looks up a plugin by name. Returns None if not found (safe, non-raising lookup).
Returns: Optional[BasePlugin]
get_or_raise
def get_or_raise(name: str) -> BasePluginPurpose: Looks up a plugin by name and raises KeyError if it does not exist. Used internally by PluginManager.execute() to surface clear errors.
Returns: BasePlugin
Raises: KeyError
exists
def exists(name: str) -> boolPurpose: Returns True if a plugin with the given name is currently registered.
Returns: bool
all
def all() -> List[BasePlugin]Purpose: Returns a list of all registered plugin instances, in insertion order.
Returns: List[BasePlugin]
names
def names() -> List[str]Purpose: Returns a list of all registered plugin names.
Returns: List[str]
list_by_type
def list_by_type(plugin_type: str) -> List[BasePlugin]Purpose: Returns all plugins of a specific type (e.g., "retrieval", "tool", "action", "processing"). Uses the internal type index for O(1) lookup.
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_type |
str |
The PLUGIN_TYPE string defined on the plugin class. |
Returns: List[BasePlugin]
retrieval_plugins = registry.list_by_type("retrieval")list_by_tag
def list_by_tag(tag: str) -> List[BasePlugin]Purpose: Returns all plugins that declare a specific tag in their metadata. Tag matching is case-insensitive.
Parameters:
| Parameter | Type | Description |
|---|---|---|
tag |
str |
The tag string to search for. |
Returns: List[BasePlugin]
list_enabled
def list_enabled() -> List[BasePlugin]Purpose: Returns only plugins where config.enabled is True. Used by the AI selector to restrict candidates to active plugins.
Returns: List[BasePlugin]
search
def search(query: str, top_k: int = 5) -> List[BasePlugin]Purpose: Performs a keyword-based relevance search over plugin name, description, tags, capabilities, and use-cases. Returns up to top_k plugins sorted by relevance score (highest first). This is the lightweight search; for semantic search use AIPluginSelector.
Parameters:
| Parameter | Type | Description |
|---|---|---|
query |
str |
Natural-language or keyword search string. |
top_k |
int |
Maximum number of results to return. |
Returns: List[BasePlugin]
matches = registry.search("search web documents", top_k=3)find_by_capability
def find_by_capability(capability: str) -> List[BasePlugin]Purpose: Returns all plugins whose metadata.capabilities list contains the given string (case-insensitive substring match). Useful for finding plugins that explicitly advertise a specific capability.
Parameters:
| Parameter | Type | Description |
|---|---|---|
capability |
str |
Capability string to search for (e.g., "web_search", "pdf_extraction"). |
Returns: List[BasePlugin]
to_tool_descriptors
def to_tool_descriptors(plugin_names: Optional[List[str]] = None) -> List[Dict[str, Any]]Purpose: Generates OpenAI function-calling descriptors for all enabled plugins (or a named subset). Delegates to each plugin's metadata.to_tool_descriptor().
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_names |
List[str] |
Optional list of plugin names to include. If None, includes all enabled plugins. |
Returns: List[Dict[str, Any]] — OpenAI-format tool descriptors.
to_anthropic_tools
def to_anthropic_tools(plugin_names: Optional[List[str]] = None) -> List[Dict[str, Any]]Purpose: Generates Anthropic Claude tool-use descriptors for all enabled plugins (or a named subset). Delegates to each plugin's metadata.to_anthropic_tool().
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_names |
List[str] |
Optional subset of plugin names. |
Returns: List[Dict[str, Any]] — Anthropic-format tool descriptors.
snapshot
def snapshot() -> List[Dict[str, Any]]Purpose: Returns a serializable (JSON-safe) summary of all registered plugins. Useful for logging, persistence, and debugging.
Returns: List[Dict[str, Any]] — each dict contains name, version, type, status, enabled, tags, capabilities.
PluginLoader
from fennec_community.plugins import PluginLoaderHandles auto-discovery, dynamic importing, and hot-reload of plugin classes from the filesystem. Used internally by PluginManager but can be used standalone for custom loading workflows.
discover_all
async def discover_all() -> List[str]Purpose: Scans all directories configured in plugin_dirs, imports every .py file (excluding files starting with _), finds all concrete BasePlugin subclasses, instantiates them with default_config, and registers them with the registry.
Returns: List[str] — names of all successfully registered plugins.
loader = PluginLoader(registry, plugin_dirs=["./plugins"])
registered_names = await loader.discover_all()discover_directory
async def discover_directory(directory: str) -> List[str]Purpose: Scans a single directory for plugin files, independent of the directories configured at construction. Useful for dynamically loading a new plugin directory at runtime.
Parameters:
| Parameter | Type | Description |
|---|---|---|
directory |
str |
Absolute or relative path to the directory to scan. |
Returns: List[str] — names of successfully registered plugins from that directory.
names = await loader.discover_directory("./community_plugins")register_lazy
def register_lazy(name: str, file_path: str) -> NonePurpose: Records a name-to-file-path mapping without importing the file. The plugin class is only imported when load_lazy(name) is called. Enables on-demand loading for rarely used or expensive-to-import plugins.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
Logical name to associate with the file. |
file_path |
str |
Absolute or relative path to the .py file. |
Returns: None
loader.register_lazy("heavy_ml_plugin", "./plugins/heavy_ml.py")load_lazy
def load_lazy(name: str) -> Optional[BasePlugin]Purpose: Imports the file associated with the given lazy-registered name, finds the matching plugin class, instantiates it, registers it with the registry, and returns the instance. Returns None if no lazy entry exists or the plugin was not found in the file.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
The name previously registered via register_lazy(). |
Returns: Optional[BasePlugin] — the instantiated plugin, or None.
plugin = loader.load_lazy("heavy_ml_plugin")get_changed_files
def get_changed_files() -> List[Path]Purpose: Returns a list of plugin file paths whose modification time has increased since they were last loaded. Used internally by the hot-reload background task, but also useful for custom reload logic.
Returns: List[Path] — changed file paths.
reload_file
async def reload_file(file_path: Path) -> List[str]Purpose: Reloads plugins from a specific changed file. Unregisters the old plugin versions found in that file, removes the cached module from sys.modules (forcing Python to re-execute the file), re-imports it, and registers the new plugin versions.
Parameters:
| Parameter | Type | Description |
|---|---|---|
file_path |
Path |
Path object pointing to the changed plugin file. |
Returns: List[str] — names of plugins successfully reloaded.
changed = loader.get_changed_files()
for path in changed:
await loader.reload_file(path)BasePlugin
from fennec_community.plugins import BasePluginThe abstract base class every plugin must extend. Provides the full execution lifecycle (validate → cache → execute → retry → stats), concurrency safety via asyncio.Lock, result caching, and an error journal.
Subclasses must:
- Define a class-level
METADATA: PluginMetadataattribute. - Implement
initialize(),execute(), andcleanup().
validate (optional override)
async def validate(input_data: Dict[str, Any]) -> Dict[str, Any]Purpose: Validates input_data against the plugin's input_schema. The default implementation checks that all required fields are present. Override to add type checking, cross-field validation, or transformation logic.
Parameters:
| Parameter | Type | Description |
|---|---|---|
input_data |
Dict[str, Any] |
The raw input dictionary. |
Returns: Dict[str, Any] — the (possibly transformed) validated input.
Raises: PluginValidationError if validation fails.
health_check (optional override)
async def health_check() -> boolPurpose: Returns True if the plugin is healthy and ready to serve requests. The default implementation checks that status == PluginStatus.ENABLED. Override to add deeper readiness checks (e.g., ping a database connection).
Returns: bool
safe_execute
async def safe_execute(
input_data: Dict[str, Any],
context: ExecutionContext,
) -> AnyPurpose: The guarded execution path. Checks that the plugin is enabled and not in error state, calls validate(), performs a cache lookup (if config.cache_results=True), then calls execute() under asyncio.wait_for with config.timeout. Updates execution statistics on every call.
Parameters:
| Parameter | Type | Description |
|---|---|---|
input_data |
Dict[str, Any] |
Pre-validated or raw input. |
context |
ExecutionContext |
Runtime context. |
Returns: Any — the result of execute(), possibly from cache.
Raises: PluginError, PluginTimeoutError, PluginValidationError
Note: Normally called by
PluginManager.execute(), not directly.
safe_execute_with_retry
async def safe_execute_with_retry(
input_data: Dict[str, Any],
context: ExecutionContext,
) -> AnyPurpose: Wraps safe_execute() in an exponential-backoff retry loop. Retries up to config.max_retries times, waiting 2^attempt seconds between attempts. Logs each retry attempt.
Parameters:
| Parameter | Type | Description |
|---|---|---|
input_data |
Dict[str, Any] |
Input data passed to safe_execute. |
context |
ExecutionContext |
Runtime context. |
Returns: Any — the first successful result.
Raises: The last PluginError if all retries are exhausted.
get_stats
def get_stats() -> Dict[str, Any]Purpose: Returns a comprehensive statistics snapshot for this plugin instance. Includes execution counts, success rate, timing metrics (total, average, min, max, last), timestamps, and the current error journal size.
Returns: Dict[str, Any]
{
"name": "web_search", "version": "1.0.0", "status": "enabled",
"execution": {"total": 100, "success": 97, "failure": 3, "success_rate": "97.0%"},
"timing": {"total_s": 42.1, "avg_s": 0.421, "min_s": 0.1, "max_s": 2.3, "last_s": 0.38},
"timestamps": {"initialized": "...", "last_executed": "...", "last_error": "..."},
"error_journal_size": 3,
"current_error": None,
}get_health
def get_health() -> Dict[str, Any]Purpose: Returns a concise health assessment derived from the execution success rate. Labels range from "untested" (no executions) to "critical" (< 60% success rate).
Returns: Dict[str, Any] — {"label": str, "score": int, "status": str, "enabled": bool}
reset_stats
def reset_stats() -> NonePurpose: Resets all execution counters, timing accumulators, and the error journal to zero. Useful for clearing stale metrics after fixing an error condition.
Returns: None
PluginConfig
from fennec_community.plugins import PluginConfigRuntime configuration knobs for a plugin instance. Passed to the constructor and governs execution behavior.
Constructor
PluginConfig(
enabled: bool = True,
timeout: float = 30.0,
max_retries: int = 3,
cache_results: bool = False,
cache_ttl_sec: int = 300,
custom: Dict[str, Any] = None,
)| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
True |
Whether the plugin is active. |
timeout |
float |
30.0 |
Per-call execution timeout in seconds. Must be > 0. |
max_retries |
int |
3 |
Maximum retry attempts for safe_execute_with_retry. |
cache_results |
bool |
False |
Enable in-memory result caching. Cache key is SHA-256 of the input dict. |
cache_ttl_sec |
int |
300 |
Cache time-to-live in seconds (5 minutes default). |
custom |
Dict |
{} |
Arbitrary plugin-specific configuration. |
config = PluginConfig(
enabled=True,
timeout=10.0,
max_retries=2,
cache_results=True,
cache_ttl_sec=60,
custom={"api_key": "sk-...", "model": "gpt-4o"},
)
plugin = MyPlugin(config=config)Plugin Types
RetrievalPlugin
from fennec_community.plugins import RetrievalPluginBase class for plugins that fetch or search for information: vector stores, search engines, knowledge bases, databases.
Output contract: execute() must return List[Dict[str, Any]] where each dict contains at minimum:
"content":str— the retrieved text passage"score":float— relevance score in range 0–1"source":str— source identifier or URL
Class attribute: PLUGIN_TYPE = "retrieval"
retrieve
async def retrieve(
query: str,
top_k: int = 5,
context: ExecutionContext = None,
) -> List[Dict[str, Any]]Purpose: A convenience wrapper that constructs the standard input dict and calls safe_execute() internally. Eliminates boilerplate in calling code.
Parameters:
| Parameter | Type | Description |
|---|---|---|
query |
str |
The search query or natural-language question. |
top_k |
int |
Maximum number of results to retrieve. |
context |
ExecutionContext |
Execution context. Defaults to a new context with query set. |
Returns: List[Dict[str, Any]] — ranked list of retrieved documents.
class VectorStorePlugin(RetrievalPlugin):
PLUGIN_TYPE = "retrieval"
METADATA = PluginMetadata(name="vector_store", ...)
async def initialize(self) -> bool:
self.client = VectorClient(...)
return True
async def execute(self, input_data, context):
return self.client.search(input_data["query"], input_data["top_k"])
async def cleanup(self) -> None:
self.client.close()
# Usage:
results = await vector_plugin.retrieve("climate change impacts", top_k=10)ToolPlugin
from fennec_community.plugins import ToolPluginBase class for plugins that call external tools, APIs, or utilities: web search, calculators, code interpreters, SQL runners, etc. These are the primary target for LLM function-calling.
Output contract: execute() must return Dict[str, Any] with a structured result.
Class attribute: PLUGIN_TYPE = "tool"
No additional public methods beyond the base class. Implement initialize(), execute(), and cleanup().
ActionPlugin
from fennec_community.plugins import ActionPluginBase class for plugins that perform side-effects or mutations: send email, write to a database, update calendar entries, call a webhook, etc.
Output contract: execute() must return Dict[str, Any] containing at minimum "success": bool and "message": str.
Class attribute: PLUGIN_TYPE = "action"
Class attribute: REQUIRES_CONFIRMATION: bool = True — set to False if the action is safe to auto-execute without a confirmation step.
can_execute
async def can_execute(
input_data: Dict[str, Any],
context: ExecutionContext,
) -> boolPurpose: Pre-flight gate called before every execution. Override to implement authorization checks, quota enforcement, dry-run mode, or any condition that must pass before the action is allowed to run. If this returns False, safe_execute() raises PluginPermissionError.
Parameters:
| Parameter | Type | Description |
|---|---|---|
input_data |
Dict[str, Any] |
The input data for the pending action. |
context |
ExecutionContext |
Runtime context including user and session identity. |
Returns: bool — True to allow execution; False to block it.
class SendEmailPlugin(ActionPlugin):
PLUGIN_TYPE = "action"
REQUIRES_CONFIRMATION = True
METADATA = PluginMetadata(name="send_email", ...)
async def can_execute(self, input_data, context) -> bool:
# Only allow for authenticated users
return bool(context.user_id)
async def execute(self, input_data, context):
send_email(input_data["to"], input_data["body"])
return {"success": True, "message": "Email sent."}ProcessingPlugin
from fennec_community.plugins import ProcessingPluginBase class for plugins that transform, filter, rerank, or summarize data: rerankers, chunkers, text extractors, translators, formatters.
Input contract: input_data must contain "items": List[Any] plus any additional parameters.
Output contract: execute() must return List[Any] — the transformed items.
Class attribute: PLUGIN_TYPE = "processing"
process
async def process(
items: List[Any],
context: ExecutionContext = None,
**kwargs,
) -> List[Any]Purpose: Convenience wrapper that packages items and any extra keyword arguments into the standard input_data dict and calls safe_execute().
Parameters:
| Parameter | Type | Description |
|---|---|---|
items |
List[Any] |
The items to process (e.g., retrieved documents to rerank). |
context |
ExecutionContext |
Execution context. Defaults to an empty context. |
**kwargs |
Additional parameters merged into the input dict (e.g., query="..." for a reranker). |
Returns: List[Any] — the processed/transformed items.
reranked = await reranker_plugin.process(
items=retrieved_docs,
context=context,
query="climate change impacts",
)Metadata System
PluginMetadata
from fennec_community.plugins import PluginMetadataComplete, AI-queryable metadata for a plugin. Must be set as a class-level METADATA attribute on every plugin class. Drives auto-discovery scoring, LLM descriptor generation, compatibility checks, and cost estimation.
Constructor Fields
| Field | Type | Required | Description |
|---|---|---|---|
name |
str |
✅ | Unique plugin identifier (used as the registry key). |
version |
str |
✅ | Semantic version string (e.g., "1.2.0"). |
author |
str |
✅ | Author name or organization. |
description |
str |
✅ | Human-readable description. Also used in LLM tool descriptors. |
capabilities |
List[str] |
Explicit capability strings used for search and AI selection scoring. | |
tags |
List[str] |
Category tags (e.g., ["search", "web", "rag"]). |
|
use_cases |
List[str] |
Example use-case strings included in LLM descriptions. | |
cost_tier |
CostTier |
Estimated cost tier for filtering in AIPluginSelector. |
|
cost_per_call_usd |
float |
Estimated per-call cost in USD. | |
input_schema |
List[SchemaProperty] |
Defines the expected input fields. Drives validation and LLM descriptors. | |
output_schema |
List[SchemaProperty] |
Describes the expected output fields. | |
dependencies |
List[str] |
Names of other plugins that must be registered before this one. | |
permissions |
List[str] |
Permission strings required (e.g., "network_access"). |
|
min_system_version |
str |
Minimum compatible system version. | |
max_system_version |
str |
Maximum compatible system version. | |
homepage |
str |
URL to documentation or repository. | |
license |
str |
License identifier (defaults to "MIT"). |
to_tool_descriptor
def to_tool_descriptor() -> Dict[str, Any]Purpose: Generates an OpenAI-format function-calling descriptor from this metadata. Builds the parameter schema from input_schema, assembles a rich LLM description by joining description, use_cases, and cost_tier.
Returns: Dict[str, Any] — OpenAI {"type": "function", "function": {...}} descriptor.
to_anthropic_tool
def to_anthropic_tool() -> Dict[str, Any]Purpose: Generates an Anthropic Claude tool-use descriptor from this metadata. Equivalent structure to to_tool_descriptor() but in Anthropic's format with "input_schema" instead of "parameters".
Returns: Dict[str, Any] — Anthropic tool descriptor.
is_compatible_with
def is_compatible_with(system_version: str) -> boolPurpose: Returns True if system_version falls within [min_system_version, max_system_version]. Used by PluginManager.register() to block incompatible plugins.
Parameters:
| Parameter | Type | Description |
|---|---|---|
system_version |
str |
The host system's current semantic version. |
Returns: bool
to_dict
def to_dict() -> Dict[str, Any]Purpose: Returns a fully serializable (JSON-safe) dictionary of all metadata fields. Datetime fields are converted to ISO 8601 strings. Useful for persistence, API responses, and logging.
Returns: Dict[str, Any]
from_dict (classmethod)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> PluginMetadataPurpose: Reconstructs a PluginMetadata instance from a dictionary (the reverse of to_dict()). Handles conversion of ISO 8601 strings back to datetime objects and cost_tier string back to CostTier enum.
Parameters:
| Parameter | Type | Description |
|---|---|---|
data |
Dict[str, Any] |
A dictionary previously produced by to_dict(). |
Returns: PluginMetadata
SchemaProperty
from fennec_community.plugins import SchemaPropertyDescribes a single field in a plugin's input or output schema. Compatible with JSON Schema draft-07 and OpenAI function-calling format.
Fields
| Field | Type | Required | Description |
|---|---|---|---|
name |
str |
✅ | Field name (key in the input/output dict). |
type |
str |
✅ | JSON type: "string", "integer", "number", "boolean", "array", "object". |
description |
str |
✅ | Field description included in LLM tool descriptors. |
required |
bool |
Whether the field must be present. Defaults to True. |
|
enum |
List[Any] |
Allowed values; validated by InputSanitizer. |
|
default |
Any |
Default value applied by InputSanitizer when the field is absent. |
|
items |
Dict |
JSON Schema items descriptor for type="array". |
|
properties |
Dict |
JSON Schema properties for type="object". |
to_json_schema
def to_json_schema() -> Dict[str, Any]Purpose: Converts this property into a JSON Schema draft-07 compatible dict, suitable for embedding in parameters.properties in an OpenAI function-calling descriptor.
Returns: Dict[str, Any]
ExecutionContext
from fennec_community.plugins import ExecutionContextRuntime context passed to every plugin.execute() call. Carries all RAG-relevant state. All fields are optional and default to safe empty values.
Fields
| Field | Type | Default | Description |
|---|---|---|---|
query |
str |
"" |
The user's original query. |
session_id |
str |
"" |
Current session identifier (used for session-scoped permission checks). |
user_id |
str |
"" |
Current user identifier. |
metadata |
Dict[str, Any] |
{} |
Arbitrary key-value context (e.g., locale, filters). |
memory |
Any |
None |
Handle to a memory store (e.g., conversation history). |
cache |
Any |
None |
Handle to a cache backend. |
router |
Any |
None |
Handle to a router or orchestrator. |
trace_id |
str |
"" |
Distributed tracing identifier for request correlation. |
max_tokens |
int |
2048 |
Token budget hint for LLM-calling plugins. |
context = ExecutionContext(
query="What is RAG?",
session_id="session-abc",
user_id="user-42",
trace_id="trace-xyz",
max_tokens=1024,
metadata={"locale": "en-US"},
)Security Layer
PermissionGuard
from fennec_community.plugins import PermissionGuardCentral authority for plugin permission management. Maintains per-plugin PermissionPolicy records that can be scoped by session or user.
grant
def grant(
plugin_name: str,
permission: PermissionType | str,
session_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> NonePurpose: Grants a permission to a plugin. Optionally scopes the grant to a specific session or user so it only applies in that context.
Parameters:
| Parameter | Type | Description |
|---|---|---|
plugin_name |
str |
The plugin's registered name. |
permission |
PermissionType | str |
The permission to grant. |
session_id |
str |
If set, the grant only applies when context.session_id matches. |
user_id |
str |
If set, the grant only applies when context.user_id matches. |
Returns: None
guard.grant("web_search", PermissionType.NETWORK_ACCESS)
guard.grant("file_writer", PermissionType.FILE_WRITE, user_id="admin")revoke
def revoke(plugin_name: str, permission: PermissionType | str) -> NonePurpose: Revokes a permission from a plugin. The permission is added to the explicit deny list, so future check() calls return False for it.
Returns: None
grant_all_required
def grant_all_required(plugin: BasePlugin) -> NonePurpose: Grants every permission declared in the plugin's metadata.permissions list. Called by PluginManager.startup() in non-safe mode.
Returns: None
check
def check(
plugin_name: str,
permission: PermissionType | str,
context: Optional[ExecutionContext] = None,
) -> boolPurpose: Returns True if the plugin currently holds the specified permission, taking into account explicit denies and optional session/user scope restrictions.
Returns: bool
assert_permission
def assert_permission(
plugin: BasePlugin,
permission: PermissionType | str,
context: Optional[ExecutionContext] = None,
) -> NonePurpose: Calls check() and raises PluginPermissionError if it returns False. Use this for strict enforcement within execute() implementations.
Raises: PluginPermissionError
check_all_required
def check_all_required(
plugin: BasePlugin,
context: Optional[ExecutionContext] = None,
) -> boolPurpose: Checks all permissions declared in plugin.metadata.permissions. Returns False on the first missing permission and logs a warning. Used by PluginManager in safe_mode.
Returns: bool — True only if all required permissions pass.
get_grants
def get_grants(plugin_name: str) -> Set[str]Purpose: Returns the set of permission strings currently granted to a plugin. Returns an empty set if no policy exists.
Returns: Set[str]
InputSanitizer
from fennec_community.plugins import InputSanitizerA static utility class that validates and sanitizes plugin inputs against a schema. Protects against type mismatches, missing required fields, oversized strings, and prompt/code injection attacks.
sanitize (static)
@staticmethod
def sanitize(
data: Dict[str, Any],
schema: List[SchemaProperty],
strict: bool = True,
) -> Dict[str, Any]Purpose: Validates data against schema in four steps: (1) checks all required fields are present, (2) coerces values to their declared types, (3) enforces string length limits (32 KB max per field), (4) scans string values for injection patterns. Fills defaults for absent optional fields.
Parameters:
| Parameter | Type | Description |
|---|---|---|
data |
Dict[str, Any] |
Raw input dict from the caller. |
schema |
List[SchemaProperty] |
The plugin's declared input_schema. |
strict |
bool |
If True, raises on unknown keys. If False (lenient), unknown keys are silently dropped. |
Returns: Dict[str, Any] — the cleaned, validated input.
Raises: PluginValidationError on any validation failure.
Injection patterns detected:
- "ignore previous instructions" / "forget instructions" / "act as if"
- "system prompt" / "jailbreak" / "dan mode"
- XSS patterns:
<script,javascript:,onerror= - Shell injection:
; rm,; wget,; curl -o,nc -e - Path traversal:
../,..\,%2e%2e
clean_data = InputSanitizer.sanitize(
data={"query": "find papers on LLMs", "top_k": 5},
schema=my_plugin.metadata.input_schema,
strict=False,
)ExecutionSandbox
from fennec_community.plugins import ExecutionSandboxWraps any coroutine with a hard timeout and slow-execution logging.
run
async def run(coro, plugin_name: str = "?") -> AnyPurpose: Executes the given coroutine with asyncio.wait_for enforcing hard_timeout. Logs a warning if execution exceeds slow_threshold. Propagates all exceptions (including asyncio.TimeoutError) to the caller.
Parameters:
| Parameter | Type | Description |
|---|---|---|
coro |
Coroutine |
The coroutine to execute inside the sandbox. |
plugin_name |
str |
Plugin name used in log messages. |
Returns: Any — the coroutine's return value.
Raises: asyncio.TimeoutError if the hard timeout is exceeded; re-raises any other exception from the coroutine.
sandbox = ExecutionSandbox(hard_timeout_sec=30.0, log_slow_threshold=5.0)
result = await sandbox.run(my_plugin.execute(data, ctx), plugin_name="my_plugin")AI & Observability
AIPluginSelector
from fennec_community.plugins import AIPluginSelectorRule-based and optionally semantic plugin selector. Used by PluginManager.select() and PluginManager.select_one().
select
def select(
query: str,
plugin_type: Optional[str] = None,
max_cost: Optional[CostTier] = None,
top_k: int = 3,
) -> List[BasePlugin]Purpose: Selects the top-K plugins best suited to answer the given natural-language query. Applies type and cost filters, scores candidates with keyword matching (always available), and optionally re-ranks with cosine similarity when embed_fn was provided at construction. Results are sorted by relevance score descending.
Scoring weights (keyword):
| Field | Weight |
|---|---|
name |
×3.0 |
tags |
×2.5 |
description |
×2.0 |
capabilities |
×2.0 |
use_cases |
×1.5 |
Parameters:
| Parameter | Type | Description |
|---|---|---|
query |
str |
Natural-language description of the task. |
plugin_type |
str |
Optional type filter. |
max_cost |
CostTier |
Optional maximum cost tier filter. |
top_k |
int |
Maximum results to return. |
Returns: List[BasePlugin]
select_one
def select_one(query: str, **kwargs) -> Optional[BasePlugin]Purpose: Returns only the single best matching plugin, or None if no candidates remain after filtering.
Returns: Optional[BasePlugin]
PluginObservability
from fennec_community.plugins import PluginObservabilityLightweight in-process metrics store. Stores up to 200 execution records per plugin and up to 500 error records globally.
summary
def summary() -> Dict[str, Any]Purpose: Returns an aggregate metrics summary across all plugins: total executions, total failures, error rate, top 5 most-used plugins, hook call counts, and the 10 most recent errors.
Returns: Dict[str, Any]
{
"total_executions": 250,
"total_failures": 12,
"error_rate_pct": 4.8,
"top_plugins": [("web_search", 100), ("pdf_reader", 80), ...],
"hook_call_counts": {"plugin.executed": 238, "plugin.failed": 12, ...},
"recent_errors": [{"plugin": "pdf_reader", "ts": 1234567890.0, "msg": "..."}, ...],
}plugin_report
def plugin_report(name: str) -> Dict[str, Any]Purpose: Returns a detailed performance report for a single plugin: execution count, success rate, average/max/min latency in milliseconds.
Parameters:
| Parameter | Type | Description |
|---|---|---|
name |
str |
The plugin's registered name. |
Returns: Dict[str, Any]
report = manager.metrics.plugin_report("web_search")
# {"name": "web_search", "executions": 100, "success_rate": 97.0,
# "avg_ms": 421.3, "max_ms": 2300.0, "min_ms": 98.7}Enumerations
from fennec_community.plugins import (
PluginStatus, PluginPriority, PermissionType, CostTier
)PluginStatus
Represents the lifecycle state of a plugin instance.
| Value | Meaning |
|---|---|
DISABLED |
Not active; execute() raises PluginError. |
ENABLED |
Ready to serve requests. |
LOADING |
Being loaded by the loader. |
INITIALIZING |
initialize() is in progress. |
ERROR |
Failed to initialize or exceeded error threshold. |
UPDATING |
Being updated/hot-reloaded. |
DEPRECATED |
Still functional but scheduled for removal. |
PluginPriority
Integer enum for ordering hook callbacks and plugin selection tiebreaks.
| Value | Score |
|---|---|
CRITICAL |
1000 |
HIGH |
100 |
NORMAL |
50 |
LOW |
10 |
BACKGROUND |
1 |
PermissionType
Standard permission strings enforced by PermissionGuard.
| Value | Meaning |
|---|---|
FILE_READ |
Read files from the filesystem. |
FILE_WRITE |
Write or modify files. |
NETWORK_ACCESS |
Make outbound network requests. |
SYSTEM_CALL |
Execute system/shell commands. |
DATABASE_ACCESS |
Query or write to a database. |
USER_DATA_ACCESS |
Access user PII or profile data. |
LLM_CALL |
Make calls to an LLM API. |
VECTOR_STORE |
Read/write a vector database. |
CostTier
Estimated cost classification used for filtering in AIPluginSelector.
| Value | Estimated Cost |
|---|---|
FREE |
No external calls; zero cost. |
CHEAP |
< $0.001 per call. |
MODERATE |
$0.001 – $0.01 per call. |
EXPENSIVE |
> $0.01 per call. |
Exception Hierarchy
Exception
└── PluginError # Base for all plugin errors
├── PluginTimeoutError # Execution exceeded timeout
├── PluginValidationError # Input schema validation failed
└── PluginPermissionError # Forbidden operation attempted| Exception | When raised |
|---|---|
PluginError |
Plugin disabled, in error state, or unexpected runtime failure. |
PluginTimeoutError |
asyncio.wait_for timeout exceeded during safe_execute. |
PluginValidationError |
Missing required field, type mismatch, injection pattern detected, or enum violation. |
PluginPermissionError |
PermissionGuard check failed, or ActionPlugin.can_execute() returned False. |
Complete Usage Example
Below is a minimal but complete example showing how to define a custom plugin and integrate it with the framework.
import asyncio
from fennec_community.plugins import (
PluginManager,
PluginMetadata, SchemaProperty,
ExecutionContext,
ToolPlugin, PluginConfig,
CostTier, PermissionType,
)
# ─────────────────────────────────────────────
# 1. Define a custom plugin
# ─────────────────────────────────────────────
class WebSearchPlugin(ToolPlugin):
PLUGIN_TYPE = "tool"
METADATA = PluginMetadata(
name="web_search",
version="1.0.0",
author="Acme Corp",
description="Searches the web and returns ranked results.",
capabilities=["web search", "information retrieval", "news"],
tags=["search", "web", "rag"],
use_cases=[
"Find recent news articles",
"Look up factual information",
],
cost_tier=CostTier.CHEAP,
cost_per_call_usd=0.0005,
permissions=[PermissionType.NETWORK_ACCESS],
input_schema=[
SchemaProperty("query", "string", "Search query", required=True),
SchemaProperty("top_k", "integer", "Max results to return", required=False, default=5),
],
output_schema=[
SchemaProperty("results", "array", "List of search results"),
],
)
async def initialize(self) -> bool:
# Set up HTTP client, load API keys, etc.
self._client = ...
return True
async def execute(self, input_data, context):
results = await self._client.search(
input_data["query"], top_k=input_data["top_k"]
)
return {"results": results}
async def cleanup(self) -> None:
await self._client.close()
# ─────────────────────────────────────────────
# 2. Wire up the manager
# ─────────────────────────────────────────────
async def main():
manager = PluginManager(
system_version="1.0.0",
safe_mode=True,
auto_discover=False, # skip filesystem discovery
hard_timeout=30.0,
)
# Register the plugin manually
plugin = WebSearchPlugin(config=PluginConfig(timeout=10.0, cache_results=True))
ok = await manager.register(plugin)
assert ok, "Registration failed"
# Grant the required network permission
manager.grant_permission("web_search", PermissionType.NETWORK_ACCESS)
# Execute
ctx = ExecutionContext(query="RAG architectures 2025", session_id="demo")
result = await manager.execute(
"web_search",
{"query": "RAG architectures 2025", "top_k": 3},
ctx,
)
print(result)
# AI-assisted selection
plugin = manager.select_one("search the web for recent papers")
print(f"Selected: {plugin.name}")
# Health dashboard
dash = manager.dashboard()
print(f"Total executions: {dash['metrics']['total_executions']}")
# Get LLM tool descriptors
tools = manager.anthropic_tools()
await manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())another example
class VectorRetrievalPlugin(RetrievalPlugin):
"""
Retrieves semantically similar documents from a vector store.
Replace _fake_vector_search() with your actual VDB client
(Qdrant, Pinecone, Weaviate, pgvector, …).
"""
METADATA = PluginMetadata(
name = "vector_retrieval",
version = "2.0.1",
author = "RAG Team",
description = "Semantic search over an embedded document corpus via vector similarity.",
capabilities = [
"semantic search",
"dense retrieval",
"ANN nearest-neighbor lookup",
"multi-modal embeddings",
],
tags = ["retrieval", "vector", "semantic", "rag"],
use_cases = [
"Find relevant passages for a user question",
"Retrieve supporting evidence for claims",
"Match customer queries to FAQ entries",
],
cost_tier = CostTier.FREE,
cost_per_call_usd = 0.0,
permissions = ["vector_store"],
input_schema = [
SchemaProperty("query", "string", "Query text to embed and search", required=True),
SchemaProperty("top_k", "integer", "Number of passages to retrieve", required=False, default=5),
SchemaProperty("threshold", "number", "Minimum similarity score (0-1)", required=False, default=0.5),
SchemaProperty("namespace", "string", "Vector store namespace / index", required=False, default="default"),
],
output_schema = [
SchemaProperty("results", "array", "Ranked list of {content, score, source}"),
],
)
async def initialize(self) -> bool:
logger.info("[VectorRetrievalPlugin] Connected to vector store.")
self._vdb = None # attach your VDB client here
return True
async def execute(
self,
input_data: Dict[str, Any],
context: ExecutionContext,
) -> List[Dict[str, Any]]:
query = input_data["query"]
top_k = input_data.get("top_k", 5)
threshold = input_data.get("threshold", 0.5)
passages = await self._fake_vector_search(query, top_k, threshold)
return passages
async def cleanup(self) -> None:
pass
@staticmethod
async def _fake_vector_search(
query: str, top_k: int, threshold: float
) -> List[Dict[str, Any]]:
await asyncio.sleep(0.02)
return [
{
"content": f"Passage {i+1} semantically related to '{query}'.",
"score": round(0.95 - i * 0.08, 3),
"source": f"doc_{i+1}.pdf",
"chunk_id": i,
}
for i in range(top_k)
if 0.95 - i * 0.08 >= threshold
]
community/plugin.md