Files
isparkclaw-webui/api/providers.py
nesquena-hermes 0f1b232c12 fix(ci): eliminate test_set_key flakiness — v0.50.161
Root cause: test_profile_env_isolation.py and test_profile_path_security.py called sys.modules.pop() without restoring, poisoning subsequent tests. Fix: monkeypatch.delitem so pytest auto-restores. Also holds _ENV_LOCK for full I/O cycle in _write_env_file and creates .env at 0600 via os.open. Reviewed by Opus (no independent review needed — test/providers fix only).
2026-04-23 02:09:37 +00:00

332 lines
12 KiB
Python

"""Hermes Web UI -- provider management endpoints.
Provides CRUD operations for configuring provider API keys post-onboarding.
Closes #586 (allow provider key update) and part of #604 (model picker
multi-provider support).
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Any
from api.config import (
_PROVIDER_DISPLAY,
_PROVIDER_MODELS,
get_config,
invalidate_models_cache,
)
logger = logging.getLogger(__name__)
# SECTION: Provider ↔ env var mapping
# Maps canonical provider slug → env var name for API key.
# Providers not listed here (OAuth/token-flow providers like copilot, nous,
# openai-codex) cannot have their keys managed from the WebUI.
_PROVIDER_ENV_VAR: dict[str, str] = {
"openrouter": "OPENROUTER_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"openai": "OPENAI_API_KEY",
"google": "GOOGLE_API_KEY",
"gemini": "GEMINI_API_KEY",
"zai": "GLM_API_KEY",
"kimi-coding": "KIMI_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
"minimax": "MINIMAX_API_KEY",
"mistralai": "MISTRAL_API_KEY",
"x-ai": "XAI_API_KEY",
"opencode-zen": "OPENCODE_ZEN_API_KEY",
"opencode-go": "OPENCODE_GO_API_KEY",
"ollama": "OLLAMA_API_KEY",
"ollama-cloud": "OLLAMA_API_KEY",
}
# Providers that use OAuth or token flows — their credentials are managed
# through the Hermes CLI, not via API keys. The WebUI cannot set these.
_OAUTH_PROVIDERS = frozenset({
"copilot",
"openai-codex",
"nous",
})
# SECTION: Helper functions
def _get_hermes_home() -> Path:
"""Return the active Hermes home directory."""
try:
from api.profiles import get_active_hermes_home
return get_active_hermes_home()
except ImportError:
return Path.home() / ".hermes"
def _load_env_file(env_path: Path) -> dict[str, str]:
"""Read key=value pairs from a .env file."""
values: dict[str, str] = {}
if not env_path.exists():
return values
try:
for raw in env_path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
values[key.strip()] = value.strip().strip('"').strip("'")
except Exception:
return {}
return values
def _write_env_file(env_path: Path, updates: dict[str, str | None]) -> None:
"""Write key=value pairs to the .env file.
Values of ``None`` cause the key to be removed.
Holds ``_ENV_LOCK`` from ``api.streaming`` for the entire load → modify →
write cycle to prevent TOCTOU races between concurrent POST /api/providers
calls (each reading the same file baseline and overwriting the other's key).
Also serialises os.environ mutations with streaming sessions.
"""
from api.streaming import _ENV_LOCK
import stat as _stat
with _ENV_LOCK:
current = _load_env_file(env_path)
for key, value in updates.items():
if value is None:
current.pop(key, None)
os.environ.pop(key, None)
continue
clean = str(value).strip()
if not clean:
continue
# Reject embedded newlines/carriage returns to prevent .env injection
if "\n" in clean or "\r" in clean:
raise ValueError("API key must not contain newline characters.")
current[key] = clean
os.environ[key] = clean
env_path.parent.mkdir(parents=True, exist_ok=True)
lines = [f"{key}={current[key]}" for key in sorted(current)]
# Create at owner-only mode from the first byte (O_CREAT honours the mode
# argument subject to umask). A trailing chmod guards pre-existing files.
_mode = _stat.S_IRUSR | _stat.S_IWUSR # 0o600
_fd = os.open(str(env_path), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, _mode)
with os.fdopen(_fd, "w", encoding="utf-8") as _f:
_f.write("\n".join(lines) + ("\n" if lines else ""))
try:
env_path.chmod(_mode)
except OSError:
pass
def _provider_has_key(provider_id: str) -> bool:
"""Check whether a provider has a configured API key.
Checks (in order):
1. ``~/.hermes/.env`` for the known env var
2. ``os.environ`` for the known env var
3. ``config.yaml → model.api_key``
4. ``config.yaml → providers.<id>.api_key``
5. ``config.yaml → custom_providers[].api_key`` (for custom providers)
"""
env_var = _PROVIDER_ENV_VAR.get(provider_id)
if env_var:
env_path = _get_hermes_home() / ".env"
env_values = _load_env_file(env_path)
if env_values.get(env_var):
return True
if os.getenv(env_var):
return True
cfg = get_config()
# Check model.api_key
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict) and str(model_cfg.get("api_key") or "").strip():
return True
# Check providers.<id>.api_key
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(provider_id, {})
if isinstance(provider_cfg, dict) and str(provider_cfg.get("api_key") or "").strip():
return True
# Check custom_providers
custom_providers = cfg.get("custom_providers", [])
if isinstance(custom_providers, list):
for cp in custom_providers:
if isinstance(cp, dict):
cp_name = (cp.get("name") or "").strip().lower().replace(" ", "-")
if f"custom:{cp_name}" == provider_id or cp.get("name", "").strip().lower() == provider_id:
if str(cp.get("api_key") or "").strip():
return True
return False
def _provider_is_oauth(provider_id: str) -> bool:
"""Check whether a provider uses OAuth/token flows (managed by CLI)."""
return provider_id in _OAUTH_PROVIDERS
# SECTION: Public API
def get_providers() -> dict[str, Any]:
"""Return a list of all known providers with their configuration status.
Each entry contains:
- ``id``: canonical provider slug
- ``display_name``: human-readable name
- ``has_key``: whether an API key is configured
- ``configurable``: whether the key can be set from the WebUI
- ``key_source``: where the key was found (``env_file``, ``env_var``,
``config_yaml``, ``oauth``, ``none``)
- ``models``: list of known model IDs for this provider
"""
providers = []
# Collect all known provider IDs from multiple sources
known_ids = set(_PROVIDER_DISPLAY.keys()) | set(_PROVIDER_MODELS.keys())
# Also detect providers from config.yaml providers section
cfg = get_config()
providers_cfg = cfg.get("providers", {})
if isinstance(providers_cfg, dict):
known_ids.update(providers_cfg.keys())
# Add OAuth providers even if not in _PROVIDER_DISPLAY
known_ids.update(_OAUTH_PROVIDERS)
for pid in sorted(known_ids):
display_name = _PROVIDER_DISPLAY.get(pid, pid.replace("-", " ").title())
is_oauth = _provider_is_oauth(pid)
has_key = _provider_has_key(pid)
# Determine key source
key_source = "none"
if is_oauth:
key_source = "oauth"
# Check if actually authenticated via hermes_cli
try:
from hermes_cli.auth import get_auth_status as _gas
status = _gas(pid)
if isinstance(status, dict) and status.get("logged_in"):
has_key = True
key_source = status.get("key_source", "oauth")
else:
has_key = False
except Exception:
has_key = False
elif has_key:
env_var = _PROVIDER_ENV_VAR.get(pid)
if env_var:
env_path = _get_hermes_home() / ".env"
env_values = _load_env_file(env_path)
if env_values.get(env_var):
key_source = "env_file"
elif os.getenv(env_var):
key_source = "env_var"
else:
key_source = "config_yaml"
else:
key_source = "config_yaml"
models = _PROVIDER_MODELS.get(pid, [])
# Also include models from config.yaml providers section
if isinstance(providers_cfg, dict):
provider_cfg = providers_cfg.get(pid, {})
if isinstance(provider_cfg, dict) and "models" in provider_cfg:
cfg_models = provider_cfg["models"]
if isinstance(cfg_models, dict):
models = models + [{"id": k, "label": k} for k in cfg_models.keys()]
elif isinstance(cfg_models, list):
models = models + [{"id": k, "label": k} for k in cfg_models]
providers.append({
"id": pid,
"display_name": display_name,
"has_key": has_key,
"configurable": not is_oauth and pid in _PROVIDER_ENV_VAR,
"key_source": key_source,
"models": models,
})
# Determine active provider
active_provider = None
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict):
active_provider = model_cfg.get("provider")
return {
"providers": providers,
"active_provider": active_provider,
}
def set_provider_key(provider_id: str, api_key: str | None) -> dict[str, Any]:
"""Set or update the API key for a provider.
Writes the key to ``~/.hermes/.env`` using the standard env var name.
If ``api_key`` is None or empty, the key is removed.
Returns a status dict with the operation result.
"""
provider_id = provider_id.strip().lower()
if not provider_id:
return {"ok": False, "error": "Provider ID is required."}
if _provider_is_oauth(provider_id):
return {
"ok": False,
"error": f"'{_PROVIDER_DISPLAY.get(provider_id, provider_id)}' uses OAuth authentication. "
f"Use `hermes model` in the terminal to configure it.",
}
env_var = _PROVIDER_ENV_VAR.get(provider_id)
if not env_var:
return {
"ok": False,
"error": f"Cannot configure API key for '{_PROVIDER_DISPLAY.get(provider_id, provider_id)}'. "
f"This provider does not have a known env var mapping.",
}
# Validate API key format (basic sanity check)
if api_key:
api_key = api_key.strip()
if "\n" in api_key or "\r" in api_key:
return {"ok": False, "error": "API key must not contain newline characters."}
if len(api_key) < 8:
return {"ok": False, "error": "API key appears too short."}
env_path = _get_hermes_home() / ".env"
try:
_write_env_file(env_path, {env_var: api_key})
except ValueError as exc:
return {"ok": False, "error": str(exc)}
except Exception as exc:
logger.exception("Failed to write env file for provider %s", provider_id)
return {"ok": False, "error": f"Failed to save API key: {exc}"}
# Invalidate the model cache so the dropdown refreshes on next request.
# Using invalidate_models_cache() instead of reload_config() to avoid
# disrupting active streaming sessions that may be reading config.cfg.
invalidate_models_cache()
return {
"ok": True,
"provider": provider_id,
"display_name": _PROVIDER_DISPLAY.get(provider_id, provider_id),
"action": "updated" if api_key else "removed",
}
def remove_provider_key(provider_id: str) -> dict[str, Any]:
"""Remove the API key for a provider.
Convenience wrapper around ``set_provider_key(id, None)``.
"""
return set_provider_key(provider_id, None)