Files
isparkclaw-webui/api/workspace.py
Frank Song 62c56175b7 feat(workspaces): autocomplete trusted workspace paths — v0.50.162 (PR #880 by @franksong2702, closes #616)
Adds GET /api/workspaces/suggest endpoint and autocomplete dropdown in the Spaces panel. Suggestions limited to trusted roots (home, saved workspaces, boot default). Keyboard nav, Tab completion, hidden dir support. Symlink-escape and dotdot-escape invariants locked by regression tests.
2026-04-23 02:35:58 +00:00

508 lines
18 KiB
Python

"""
Hermes Web UI -- Workspace and file system helpers.
Workspace lists and last-used workspace are stored per-profile so each
profile has its own workspace configuration. State files live at
``{profile_home}/webui_state/workspaces.json`` and
``{profile_home}/webui_state/last_workspace.txt``. The global STATE_DIR
paths are used as fallback when no profile module is available.
"""
import json
import logging
import os
import subprocess
from pathlib import Path
logger = logging.getLogger(__name__)
from api.config import (
WORKSPACES_FILE as _GLOBAL_WS_FILE,
LAST_WORKSPACE_FILE as _GLOBAL_LW_FILE,
DEFAULT_WORKSPACE as _BOOT_DEFAULT_WORKSPACE,
MAX_FILE_BYTES, IMAGE_EXTS, MD_EXTS
)
# ── Profile-aware path resolution ───────────────────────────────────────────
def _profile_state_dir() -> Path:
"""Return the webui_state directory for the active profile.
For the default profile, returns the global STATE_DIR (respects
HERMES_WEBUI_STATE_DIR env var for test isolation).
For named profiles, returns {profile_home}/webui_state/.
"""
try:
from api.profiles import get_active_profile_name, get_active_hermes_home
name = get_active_profile_name()
if name and name != 'default':
d = get_active_hermes_home() / 'webui_state'
d.mkdir(parents=True, exist_ok=True)
return d
except ImportError:
logger.debug("Failed to import profiles module, using global state dir")
return _GLOBAL_WS_FILE.parent
def _workspaces_file() -> Path:
"""Return the workspaces.json path for the active profile."""
return _profile_state_dir() / 'workspaces.json'
def _last_workspace_file() -> Path:
"""Return the last_workspace.txt path for the active profile."""
return _profile_state_dir() / 'last_workspace.txt'
def _profile_default_workspace() -> str:
"""Read the profile's default workspace from its config.yaml.
Checks keys in priority order:
1. 'workspace' — explicit webui workspace key
2. 'default_workspace' — alternate explicit key
3. 'terminal.cwd' — hermes-agent terminal working dir (most common)
Falls back to the boot-time DEFAULT_WORKSPACE constant.
"""
try:
from api.config import get_config
cfg = get_config()
# Explicit webui workspace keys first
for key in ('workspace', 'default_workspace'):
ws = cfg.get(key)
if ws:
p = Path(str(ws)).expanduser().resolve()
if p.is_dir():
return str(p)
# Fall through to terminal.cwd — the agent's configured working directory
terminal_cfg = cfg.get('terminal', {})
if isinstance(terminal_cfg, dict):
cwd = terminal_cfg.get('cwd', '')
if cwd and str(cwd) not in ('.', ''):
p = Path(str(cwd)).expanduser().resolve()
if p.is_dir():
return str(p)
except (ImportError, Exception):
logger.debug("Failed to load profile default workspace config")
return str(_BOOT_DEFAULT_WORKSPACE)
# ── Public API ──────────────────────────────────────────────────────────────
def _clean_workspace_list(workspaces: list) -> list:
"""Sanitize a workspace list:
- Remove entries whose paths no longer exist on disk.
- Remove entries whose paths live inside another profile's directory
(e.g. ~/.hermes/profiles/X/... should not appear on a different profile).
- Rename any entry whose name is literally 'default' to 'Home' (avoids
confusion with the 'default' profile name).
Returns the cleaned list (may be empty).
"""
hermes_profiles = (Path.home() / '.hermes' / 'profiles').resolve()
result = []
for w in workspaces:
path = w.get('path', '')
name = w.get('name', '')
p = Path(path).resolve() if path else Path('/')
# Skip paths that no longer exist
if not p.is_dir():
continue
# Skip paths inside a DIFFERENT profile's directory (cross-profile leak).
# Allow paths inside the CURRENT profile's own directory (e.g. test workspaces
# created under ~/.hermes/profiles/webui/webui-mvp-test/).
try:
p.relative_to(hermes_profiles)
# p is under ~/.hermes/profiles/ — only skip if it's under a DIFFERENT profile
try:
from api.profiles import get_active_hermes_home
own_profile_dir = get_active_hermes_home().resolve()
p.relative_to(own_profile_dir)
# p is under our own profile dir — keep it
except (ValueError, Exception):
continue # under profiles/ but not our own — cross-profile leak, skip
except ValueError:
pass # not under profiles/ at all — keep it
# Rename confusing 'default' label to 'Home'
if name.lower() == 'default':
name = 'Home'
result.append({'path': str(p), 'name': name})
return result
def _migrate_global_workspaces() -> list:
"""Read the legacy global workspaces.json, clean it, and return the result.
This is the migration path for users upgrading from a pre-profile version:
their global file may contain cross-profile entries, test artifacts, and
stale paths accumulated over time. We clean it in-place and rewrite it.
"""
if not _GLOBAL_WS_FILE.exists():
return []
try:
raw = json.loads(_GLOBAL_WS_FILE.read_text(encoding='utf-8'))
cleaned = _clean_workspace_list(raw)
if len(cleaned) != len(raw):
# Rewrite the cleaned version so future reads are already clean
_GLOBAL_WS_FILE.write_text(
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
)
return cleaned
except Exception:
return []
def load_workspaces() -> list:
ws_file = _workspaces_file()
if ws_file.exists():
try:
raw = json.loads(ws_file.read_text(encoding='utf-8'))
cleaned = _clean_workspace_list(raw)
if len(cleaned) != len(raw):
# Persist the cleaned version so stale entries don't keep reappearing
try:
ws_file.write_text(
json.dumps(cleaned, ensure_ascii=False, indent=2), encoding='utf-8'
)
except Exception:
logger.debug("Failed to persist cleaned workspace list")
return cleaned or [{'path': _profile_default_workspace(), 'name': 'Home'}]
except Exception:
logger.debug("Failed to load workspaces from %s", ws_file)
# No profile-local file yet.
# For the DEFAULT profile: migrate from the legacy global file (one-time cleanup).
# For NAMED profiles: always start clean with just their own workspace.
try:
from api.profiles import get_active_profile_name
is_default = get_active_profile_name() in ('default', None)
except ImportError:
is_default = True
if is_default:
migrated = _migrate_global_workspaces()
if migrated:
return migrated
# Fresh start: single entry from the profile's configured workspace, labeled "Home"
return [{'path': _profile_default_workspace(), 'name': 'Home'}]
def save_workspaces(workspaces: list) -> None:
ws_file = _workspaces_file()
ws_file.parent.mkdir(parents=True, exist_ok=True)
ws_file.write_text(json.dumps(workspaces, ensure_ascii=False, indent=2), encoding='utf-8')
def get_last_workspace() -> str:
lw_file = _last_workspace_file()
if lw_file.exists():
try:
p = lw_file.read_text(encoding='utf-8').strip()
if p and Path(p).is_dir():
return p
except Exception:
logger.debug("Failed to read last workspace from %s", lw_file)
# Fallback: try global file
if _GLOBAL_LW_FILE.exists():
try:
p = _GLOBAL_LW_FILE.read_text(encoding='utf-8').strip()
if p and Path(p).is_dir():
return p
except Exception:
logger.debug("Failed to read global last workspace")
return _profile_default_workspace()
def set_last_workspace(path: str) -> None:
try:
lw_file = _last_workspace_file()
lw_file.parent.mkdir(parents=True, exist_ok=True)
lw_file.write_text(str(path), encoding='utf-8')
except Exception:
logger.debug("Failed to set last workspace")
def _workspace_blocked_roots() -> tuple[Path, ...]:
return (
# Linux / macOS
Path('/etc'),
Path('/usr'),
Path('/var'),
Path('/bin'),
Path('/sbin'),
Path('/boot'),
Path('/proc'),
Path('/sys'),
Path('/dev'),
Path('/lib'),
Path('/lib64'),
Path('/opt/homebrew'),
)
def _is_within(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def _trusted_workspace_roots() -> list[Path]:
roots: list[Path] = []
def add(candidate: str | Path | None) -> None:
if candidate in (None, ""):
return
try:
p = Path(candidate).expanduser().resolve()
except Exception:
return
if not p.exists() or not p.is_dir():
return
if any(_is_within(p, blocked) for blocked in _workspace_blocked_roots()):
return
if p not in roots:
roots.append(p)
add(Path.home())
add(_BOOT_DEFAULT_WORKSPACE)
for w in load_workspaces():
add(w.get("path"))
roots.sort(key=lambda p: len(str(p)))
return roots
def list_workspace_suggestions(prefix: str = "", limit: int = 12) -> list[str]:
"""Return workspace path suggestions under trusted roots only.
Suggestions are limited to directories under one of:
- Path.home()
- the boot default workspace
- already-saved workspace roots
Arbitrary system prefixes return an empty list rather than an error so the
UI can safely autocomplete while the user types.
"""
roots = _trusted_workspace_roots()
if not roots:
return []
raw = (prefix or "").strip()
if not raw:
return [str(p) for p in roots[:limit]]
if raw.startswith("~"):
target = Path(raw).expanduser()
elif Path(raw).is_absolute():
target = Path(raw)
else:
target = Path.home() / raw
normalized = str(target)
normalized_lower = normalized.lower()
suggestions: list[str] = []
def add(path: Path) -> None:
value = str(path)
if value not in suggestions:
suggestions.append(value)
# If the user is typing a partial trusted root like /Users/xuef..., suggest
# the matching trusted roots without scanning arbitrary system parents.
for root in roots:
if str(root).lower().startswith(normalized_lower):
add(root)
in_root = [
root
for root in roots
if normalized == str(root) or normalized.startswith(str(root) + os.sep)
]
if not in_root:
return suggestions[:limit]
anchor_root = max(in_root, key=lambda p: len(str(p)))
ends_with_sep = raw.endswith(os.sep) or raw.endswith('/')
parent = target if ends_with_sep else target.parent
leaf = '' if ends_with_sep else target.name
show_hidden = leaf.startswith('.')
try:
parent_resolved = parent.expanduser().resolve()
except Exception:
return suggestions[:limit]
if not parent_resolved.exists() or not parent_resolved.is_dir():
return suggestions[:limit]
if not _is_within(parent_resolved, anchor_root):
return suggestions[:limit]
leaf_lower = leaf.lower()
try:
children = sorted(parent_resolved.iterdir(), key=lambda p: p.name.lower())
except OSError:
return suggestions[:limit]
for child in children:
if not child.is_dir():
continue
if child.name.startswith('.') and not show_hidden:
continue
if leaf_lower and not child.name.lower().startswith(leaf_lower):
continue
add(child.resolve())
if len(suggestions) >= limit:
break
return suggestions[:limit]
def resolve_trusted_workspace(path: str | Path | None = None) -> Path:
"""Resolve and validate a workspace path.
A path is trusted if it satisfies at least one of:
(A) It is under the user's home directory (Path.home()).
Works cross-platform: ~/... on Linux/macOS, C:\\Users\\... on Windows.
(B) It is already in the profile's saved workspace list.
This covers self-hosted deployments where workspaces live outside home
(e.g. /data/projects, /opt/workspace) — once a workspace is saved by
an admin, it can be reused without re-validation.
Additionally enforced regardless of (A)/(B):
1. The path must exist.
2. The path must be a directory.
3. The path must not be a known system root (/etc, /usr, /var, /bin, /sbin,
/boot, /proc, /sys, /dev, /root on Linux/macOS; Windows system dirs).
This prevents even admin-saved workspaces from pointing at OS internals.
None/empty path falls back to the boot-time DEFAULT_WORKSPACE, which is always
trusted (it was validated at server startup).
"""
if path in (None, ""):
return Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
candidate = Path(path).expanduser().resolve()
if not candidate.exists():
raise ValueError(f"Path does not exist: {candidate}")
if not candidate.is_dir():
raise ValueError(f"Path is not a directory: {candidate}")
# Block known system roots and their children
for blocked in _workspace_blocked_roots():
try:
candidate.relative_to(blocked)
raise ValueError(f"Path points to a system directory: {candidate}")
except ValueError as e:
if "system directory" in str(e):
raise
# relative_to raised ValueError = candidate is NOT under blocked = safe
# (A) Trusted if under the user's home directory — cross-platform via Path.home()
try:
candidate.relative_to(Path.home().resolve())
return candidate
except ValueError:
pass
# (B) Trusted if already in the saved workspace list — covers non-home installs
try:
saved = load_workspaces()
saved_paths = {Path(w["path"]).resolve() for w in saved if w.get("path")}
if candidate in saved_paths:
return candidate
except Exception:
pass
# (C) Trusted if it is equal to or under the boot-time DEFAULT_WORKSPACE.
# In Docker deployments HERMES_WEBUI_DEFAULT_WORKSPACE is often set to a
# volume mount outside the user's home (e.g. /data/workspace). That path
# was already validated at server startup, so any sub-path of it is safe
# without requiring the user to add it to the workspace list manually.
try:
boot_default = Path(_BOOT_DEFAULT_WORKSPACE).expanduser().resolve()
candidate.relative_to(boot_default)
return candidate
except ValueError:
pass
raise ValueError(
f"Path is outside the user home directory, not in the saved workspace "
f"list, and not under the default workspace: {candidate}. "
f"Add it via Settings → Workspaces first."
)
def safe_resolve_ws(root: Path, requested: str) -> Path:
"""Resolve a relative path inside a workspace root, raising ValueError on traversal."""
resolved = (root / requested).resolve()
resolved.relative_to(root.resolve())
return resolved
def list_dir(workspace: Path, rel: str='.'):
target = safe_resolve_ws(workspace, rel)
if not target.is_dir():
raise FileNotFoundError(f"Not a directory: {rel}")
entries = []
for item in sorted(target.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
entries.append({
'name': item.name,
'path': str(item.relative_to(workspace)),
'type': 'dir' if item.is_dir() else 'file',
'size': item.stat().st_size if item.is_file() else None,
})
if len(entries) >= 200:
break
return entries
def read_file_content(workspace: Path, rel: str) -> dict:
target = safe_resolve_ws(workspace, rel)
if not target.is_file():
raise FileNotFoundError(f"Not a file: {rel}")
size = target.stat().st_size
if size > MAX_FILE_BYTES:
raise ValueError(f"File too large ({size} bytes, max {MAX_FILE_BYTES})")
content = target.read_text(encoding='utf-8', errors='replace')
return {'path': rel, 'content': content, 'size': size, 'lines': content.count('\n') + 1}
# ── Git detection ──────────────────────────────────────────────────────────
def _run_git(args, cwd, timeout=3):
"""Run a git command and return stdout, or None on failure."""
try:
r = subprocess.run(
['git'] + args, cwd=str(cwd), capture_output=True,
text=True, timeout=timeout,
)
return r.stdout.strip() if r.returncode == 0 else None
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
def git_info_for_workspace(workspace: Path) -> dict:
"""Return git info for a workspace directory, or None if not a git repo."""
if not (workspace / '.git').exists():
return None
branch = _run_git(['rev-parse', '--abbrev-ref', 'HEAD'], workspace)
if branch is None:
return None
# Status counts
status_out = _run_git(['status', '--porcelain'], workspace) or ''
lines = [l for l in status_out.splitlines() if l]
# git status --porcelain: XY format where X=index, Y=worktree
modified = sum(1 for l in lines if len(l) >= 2 and (l[0] in 'MAR' or l[1] in 'MAR'))
untracked = sum(1 for l in lines if l.startswith('??'))
dirty = len(lines)
# Ahead/behind
ahead = _run_git(['rev-list', '--count', '@{u}..HEAD'], workspace)
behind = _run_git(['rev-list', '--count', 'HEAD..@{u}'], workspace)
return {
'branch': branch,
'dirty': dirty,
'modified': modified,
'untracked': untracked,
'ahead': int(ahead) if ahead and ahead.isdigit() else 0,
'behind': int(behind) if behind and behind.isdigit() else 0,
'is_git': True,
}