Root cause: test_profile_env_isolation.py and test_profile_path_security.py called sys.modules.pop() without restoring, poisoning subsequent tests. Fix: monkeypatch.delitem so pytest auto-restores. Also holds _ENV_LOCK for full I/O cycle in _write_env_file and creates .env at 0600 via os.open. Reviewed by Opus (no independent review needed — test/providers fix only).
332 lines
12 KiB
Python
332 lines
12 KiB
Python
"""Hermes Web UI -- provider management endpoints.
|
|
|
|
Provides CRUD operations for configuring provider API keys post-onboarding.
|
|
Closes #586 (allow provider key update) and part of #604 (model picker
|
|
multi-provider support).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from api.config import (
|
|
_PROVIDER_DISPLAY,
|
|
_PROVIDER_MODELS,
|
|
get_config,
|
|
invalidate_models_cache,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# SECTION: Provider ↔ env var mapping
|
|
|
|
# Maps canonical provider slug → env var name for API key.
|
|
# Providers not listed here (OAuth/token-flow providers like copilot, nous,
|
|
# openai-codex) cannot have their keys managed from the WebUI.
|
|
_PROVIDER_ENV_VAR: dict[str, str] = {
|
|
"openrouter": "OPENROUTER_API_KEY",
|
|
"anthropic": "ANTHROPIC_API_KEY",
|
|
"openai": "OPENAI_API_KEY",
|
|
"google": "GOOGLE_API_KEY",
|
|
"gemini": "GEMINI_API_KEY",
|
|
"zai": "GLM_API_KEY",
|
|
"kimi-coding": "KIMI_API_KEY",
|
|
"deepseek": "DEEPSEEK_API_KEY",
|
|
"minimax": "MINIMAX_API_KEY",
|
|
"mistralai": "MISTRAL_API_KEY",
|
|
"x-ai": "XAI_API_KEY",
|
|
"opencode-zen": "OPENCODE_ZEN_API_KEY",
|
|
"opencode-go": "OPENCODE_GO_API_KEY",
|
|
"ollama": "OLLAMA_API_KEY",
|
|
"ollama-cloud": "OLLAMA_API_KEY",
|
|
}
|
|
|
|
# Providers that use OAuth or token flows — their credentials are managed
|
|
# through the Hermes CLI, not via API keys. The WebUI cannot set these.
|
|
_OAUTH_PROVIDERS = frozenset({
|
|
"copilot",
|
|
"openai-codex",
|
|
"nous",
|
|
})
|
|
|
|
# SECTION: Helper functions
|
|
|
|
|
|
def _get_hermes_home() -> Path:
|
|
"""Return the active Hermes home directory."""
|
|
try:
|
|
from api.profiles import get_active_hermes_home
|
|
return get_active_hermes_home()
|
|
except ImportError:
|
|
return Path.home() / ".hermes"
|
|
|
|
|
|
def _load_env_file(env_path: Path) -> dict[str, str]:
|
|
"""Read key=value pairs from a .env file."""
|
|
values: dict[str, str] = {}
|
|
if not env_path.exists():
|
|
return values
|
|
try:
|
|
for raw in env_path.read_text(encoding="utf-8").splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
values[key.strip()] = value.strip().strip('"').strip("'")
|
|
except Exception:
|
|
return {}
|
|
return values
|
|
|
|
|
|
def _write_env_file(env_path: Path, updates: dict[str, str | None]) -> None:
|
|
"""Write key=value pairs to the .env file.
|
|
|
|
Values of ``None`` cause the key to be removed.
|
|
Holds ``_ENV_LOCK`` from ``api.streaming`` for the entire load → modify →
|
|
write cycle to prevent TOCTOU races between concurrent POST /api/providers
|
|
calls (each reading the same file baseline and overwriting the other's key).
|
|
Also serialises os.environ mutations with streaming sessions.
|
|
"""
|
|
from api.streaming import _ENV_LOCK
|
|
import stat as _stat
|
|
|
|
with _ENV_LOCK:
|
|
current = _load_env_file(env_path)
|
|
for key, value in updates.items():
|
|
if value is None:
|
|
current.pop(key, None)
|
|
os.environ.pop(key, None)
|
|
continue
|
|
clean = str(value).strip()
|
|
if not clean:
|
|
continue
|
|
# Reject embedded newlines/carriage returns to prevent .env injection
|
|
if "\n" in clean or "\r" in clean:
|
|
raise ValueError("API key must not contain newline characters.")
|
|
current[key] = clean
|
|
os.environ[key] = clean
|
|
|
|
env_path.parent.mkdir(parents=True, exist_ok=True)
|
|
lines = [f"{key}={current[key]}" for key in sorted(current)]
|
|
# Create at owner-only mode from the first byte (O_CREAT honours the mode
|
|
# argument subject to umask). A trailing chmod guards pre-existing files.
|
|
_mode = _stat.S_IRUSR | _stat.S_IWUSR # 0o600
|
|
_fd = os.open(str(env_path), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, _mode)
|
|
with os.fdopen(_fd, "w", encoding="utf-8") as _f:
|
|
_f.write("\n".join(lines) + ("\n" if lines else ""))
|
|
try:
|
|
env_path.chmod(_mode)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _provider_has_key(provider_id: str) -> bool:
|
|
"""Check whether a provider has a configured API key.
|
|
|
|
Checks (in order):
|
|
1. ``~/.hermes/.env`` for the known env var
|
|
2. ``os.environ`` for the known env var
|
|
3. ``config.yaml → model.api_key``
|
|
4. ``config.yaml → providers.<id>.api_key``
|
|
5. ``config.yaml → custom_providers[].api_key`` (for custom providers)
|
|
"""
|
|
env_var = _PROVIDER_ENV_VAR.get(provider_id)
|
|
if env_var:
|
|
env_path = _get_hermes_home() / ".env"
|
|
env_values = _load_env_file(env_path)
|
|
if env_values.get(env_var):
|
|
return True
|
|
if os.getenv(env_var):
|
|
return True
|
|
|
|
cfg = get_config()
|
|
# Check model.api_key
|
|
model_cfg = cfg.get("model", {})
|
|
if isinstance(model_cfg, dict) and str(model_cfg.get("api_key") or "").strip():
|
|
return True
|
|
# Check providers.<id>.api_key
|
|
providers_cfg = cfg.get("providers", {})
|
|
if isinstance(providers_cfg, dict):
|
|
provider_cfg = providers_cfg.get(provider_id, {})
|
|
if isinstance(provider_cfg, dict) and str(provider_cfg.get("api_key") or "").strip():
|
|
return True
|
|
# Check custom_providers
|
|
custom_providers = cfg.get("custom_providers", [])
|
|
if isinstance(custom_providers, list):
|
|
for cp in custom_providers:
|
|
if isinstance(cp, dict):
|
|
cp_name = (cp.get("name") or "").strip().lower().replace(" ", "-")
|
|
if f"custom:{cp_name}" == provider_id or cp.get("name", "").strip().lower() == provider_id:
|
|
if str(cp.get("api_key") or "").strip():
|
|
return True
|
|
return False
|
|
|
|
|
|
def _provider_is_oauth(provider_id: str) -> bool:
|
|
"""Check whether a provider uses OAuth/token flows (managed by CLI)."""
|
|
return provider_id in _OAUTH_PROVIDERS
|
|
|
|
|
|
# SECTION: Public API
|
|
|
|
|
|
def get_providers() -> dict[str, Any]:
|
|
"""Return a list of all known providers with their configuration status.
|
|
|
|
Each entry contains:
|
|
- ``id``: canonical provider slug
|
|
- ``display_name``: human-readable name
|
|
- ``has_key``: whether an API key is configured
|
|
- ``configurable``: whether the key can be set from the WebUI
|
|
- ``key_source``: where the key was found (``env_file``, ``env_var``,
|
|
``config_yaml``, ``oauth``, ``none``)
|
|
- ``models``: list of known model IDs for this provider
|
|
"""
|
|
providers = []
|
|
|
|
# Collect all known provider IDs from multiple sources
|
|
known_ids = set(_PROVIDER_DISPLAY.keys()) | set(_PROVIDER_MODELS.keys())
|
|
|
|
# Also detect providers from config.yaml providers section
|
|
cfg = get_config()
|
|
providers_cfg = cfg.get("providers", {})
|
|
if isinstance(providers_cfg, dict):
|
|
known_ids.update(providers_cfg.keys())
|
|
|
|
# Add OAuth providers even if not in _PROVIDER_DISPLAY
|
|
known_ids.update(_OAUTH_PROVIDERS)
|
|
|
|
for pid in sorted(known_ids):
|
|
display_name = _PROVIDER_DISPLAY.get(pid, pid.replace("-", " ").title())
|
|
is_oauth = _provider_is_oauth(pid)
|
|
has_key = _provider_has_key(pid)
|
|
|
|
# Determine key source
|
|
key_source = "none"
|
|
if is_oauth:
|
|
key_source = "oauth"
|
|
# Check if actually authenticated via hermes_cli
|
|
try:
|
|
from hermes_cli.auth import get_auth_status as _gas
|
|
status = _gas(pid)
|
|
if isinstance(status, dict) and status.get("logged_in"):
|
|
has_key = True
|
|
key_source = status.get("key_source", "oauth")
|
|
else:
|
|
has_key = False
|
|
except Exception:
|
|
has_key = False
|
|
elif has_key:
|
|
env_var = _PROVIDER_ENV_VAR.get(pid)
|
|
if env_var:
|
|
env_path = _get_hermes_home() / ".env"
|
|
env_values = _load_env_file(env_path)
|
|
if env_values.get(env_var):
|
|
key_source = "env_file"
|
|
elif os.getenv(env_var):
|
|
key_source = "env_var"
|
|
else:
|
|
key_source = "config_yaml"
|
|
else:
|
|
key_source = "config_yaml"
|
|
|
|
models = _PROVIDER_MODELS.get(pid, [])
|
|
# Also include models from config.yaml providers section
|
|
if isinstance(providers_cfg, dict):
|
|
provider_cfg = providers_cfg.get(pid, {})
|
|
if isinstance(provider_cfg, dict) and "models" in provider_cfg:
|
|
cfg_models = provider_cfg["models"]
|
|
if isinstance(cfg_models, dict):
|
|
models = models + [{"id": k, "label": k} for k in cfg_models.keys()]
|
|
elif isinstance(cfg_models, list):
|
|
models = models + [{"id": k, "label": k} for k in cfg_models]
|
|
|
|
providers.append({
|
|
"id": pid,
|
|
"display_name": display_name,
|
|
"has_key": has_key,
|
|
"configurable": not is_oauth and pid in _PROVIDER_ENV_VAR,
|
|
"key_source": key_source,
|
|
"models": models,
|
|
})
|
|
|
|
# Determine active provider
|
|
active_provider = None
|
|
model_cfg = cfg.get("model", {})
|
|
if isinstance(model_cfg, dict):
|
|
active_provider = model_cfg.get("provider")
|
|
|
|
return {
|
|
"providers": providers,
|
|
"active_provider": active_provider,
|
|
}
|
|
|
|
|
|
def set_provider_key(provider_id: str, api_key: str | None) -> dict[str, Any]:
|
|
"""Set or update the API key for a provider.
|
|
|
|
Writes the key to ``~/.hermes/.env`` using the standard env var name.
|
|
If ``api_key`` is None or empty, the key is removed.
|
|
|
|
Returns a status dict with the operation result.
|
|
"""
|
|
provider_id = provider_id.strip().lower()
|
|
|
|
if not provider_id:
|
|
return {"ok": False, "error": "Provider ID is required."}
|
|
|
|
if _provider_is_oauth(provider_id):
|
|
return {
|
|
"ok": False,
|
|
"error": f"'{_PROVIDER_DISPLAY.get(provider_id, provider_id)}' uses OAuth authentication. "
|
|
f"Use `hermes model` in the terminal to configure it.",
|
|
}
|
|
|
|
env_var = _PROVIDER_ENV_VAR.get(provider_id)
|
|
if not env_var:
|
|
return {
|
|
"ok": False,
|
|
"error": f"Cannot configure API key for '{_PROVIDER_DISPLAY.get(provider_id, provider_id)}'. "
|
|
f"This provider does not have a known env var mapping.",
|
|
}
|
|
|
|
# Validate API key format (basic sanity check)
|
|
if api_key:
|
|
api_key = api_key.strip()
|
|
if "\n" in api_key or "\r" in api_key:
|
|
return {"ok": False, "error": "API key must not contain newline characters."}
|
|
if len(api_key) < 8:
|
|
return {"ok": False, "error": "API key appears too short."}
|
|
|
|
env_path = _get_hermes_home() / ".env"
|
|
try:
|
|
_write_env_file(env_path, {env_var: api_key})
|
|
except ValueError as exc:
|
|
return {"ok": False, "error": str(exc)}
|
|
except Exception as exc:
|
|
logger.exception("Failed to write env file for provider %s", provider_id)
|
|
return {"ok": False, "error": f"Failed to save API key: {exc}"}
|
|
|
|
# Invalidate the model cache so the dropdown refreshes on next request.
|
|
# Using invalidate_models_cache() instead of reload_config() to avoid
|
|
# disrupting active streaming sessions that may be reading config.cfg.
|
|
invalidate_models_cache()
|
|
|
|
return {
|
|
"ok": True,
|
|
"provider": provider_id,
|
|
"display_name": _PROVIDER_DISPLAY.get(provider_id, provider_id),
|
|
"action": "updated" if api_key else "removed",
|
|
}
|
|
|
|
|
|
def remove_provider_key(provider_id: str) -> dict[str, Any]:
|
|
"""Remove the API key for a provider.
|
|
|
|
Convenience wrapper around ``set_provider_key(id, None)``.
|
|
"""
|
|
return set_provider_key(provider_id, None)
|