From b6d335feaa8367651fe6697ea78da44f277ee332 Mon Sep 17 00:00:00 2001 From: nesquena-hermes Date: Mon, 20 Apr 2026 17:33:03 -0700 Subject: [PATCH] perf: TTL cache for model list + incremental session index (#780) Fixes AWS IMDS timeout on model dropdown. Incremental index writes. Co-authored-by: starship-s --- CHANGELOG.md | 6 + api/config.py | 52 +++++- api/models.py | 76 ++++++-- tests/conftest.py | 27 +++ tests/test_session_index.py | 350 ++++++++++++++++++++++++++++++++++++ tests/test_ttl_cache.py | 226 +++++++++++++++++++++++ 6 files changed, 713 insertions(+), 24 deletions(-) create mode 100644 tests/test_session_index.py create mode 100644 tests/test_ttl_cache.py diff --git a/CHANGELOG.md b/CHANGELOG.md index de7815e..95cf6f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Hermes Web UI -- Changelog +## [v0.50.121] — 2026-04-20 + +### Performance +- **Model list no longer re-scans on every session load** — `get_available_models()` now caches its result for 60 seconds (configurable via `_AVAILABLE_MODELS_CACHE_TTL`). Config file changes (mtime) invalidate the cache immediately. This eliminates the ~4s AWS IMDS timeout that blocked the model dropdown on every page load for users on EC2 without an IAM role. Thread-safe via a dedicated lock; callers receive a `copy.deepcopy()` so mutations don't pollute the cache. (credit: @starship-s) +- **Session saves no longer trigger a full O(n) index rebuild** — `_write_session_index()` now does an incremental read-patch-write of the existing index JSON when called from `Session.save()`, rather than re-scanning every session file on disk. Falls back to a full rebuild when the index is missing or corrupt. Atomic write via `.tmp` + `os.replace()`. At 100+ sessions this is a meaningful speedup. (credit: @starship-s) + ## [v0.50.120] — 2026-04-20 ### Fixed diff --git a/api/config.py b/api/config.py index 2a6d292..87d9349 100644 --- a/api/config.py +++ b/api/config.py @@ -10,6 +10,7 @@ Discovery order for all paths: """ import collections +import copy import json import logging import os @@ -802,6 +803,26 @@ def set_hermes_default_model(model_id: str) -> dict: return get_available_models() +# ── TTL cache for get_available_models() ───────────────────────────────────── +_available_models_cache: dict | None = None +_available_models_cache_ts: float = 0.0 +_AVAILABLE_MODELS_CACHE_TTL: float = 60.0 # seconds — refresh at most once per minute +_available_models_cache_lock = threading.Lock() + + +def invalidate_models_cache(): + """Force the TTL cache for get_available_models() to be cleared. + + Call this after modifying config.cfg in-memory (e.g. in tests) so + the next call to get_available_models() picks up the changes rather + than returning a stale cached result. + """ + global _available_models_cache, _available_models_cache_ts + with _available_models_cache_lock: + _available_models_cache = None + _available_models_cache_ts = 0.0 + + def get_available_models() -> dict: """ Return available models grouped by provider. @@ -821,12 +842,24 @@ def get_available_models() -> dict: # Reload config from disk if config.yaml has changed since last load. # This ensures CLI model changes are picked up on page refresh without # a server restart, while avoiding clearing in-memory mocks during tests. (#585) - try: - _current_mtime = Path(_get_config_path()).stat().st_mtime - except OSError: - _current_mtime = 0.0 - if _current_mtime != _cfg_mtime: - reload_config() + # Must run BEFORE the TTL check so config edits within the 60s window are visible. + global _available_models_cache, _available_models_cache_ts + with _available_models_cache_lock: + try: + _current_mtime = Path(_get_config_path()).stat().st_mtime + except OSError: + _current_mtime = 0.0 + # Note: env-var changes (e.g. API key rotation) are not detected by mtime; + # cache will be stale for up to TTL seconds in that case. + if _current_mtime != _cfg_mtime: + reload_config() + # Config changed — force cache invalidation + _available_models_cache = None + _available_models_cache_ts = 0.0 + # Serve from TTL cache if fresh. + now = time.monotonic() + if _available_models_cache is not None and (now - _available_models_cache_ts) < _AVAILABLE_MODELS_CACHE_TTL: + return copy.deepcopy(_available_models_cache) active_provider = None default_model = get_effective_default_model(cfg) groups = [] @@ -1277,11 +1310,16 @@ def get_available_models() -> dict: } ) - return { + result = { "active_provider": active_provider, "default_model": default_model, "groups": groups, } + # Cache the result for TTL seconds + with _available_models_cache_lock: + _available_models_cache = result + _available_models_cache_ts = time.monotonic() + return copy.deepcopy(result) # ── Static file path ───────────────────────────────────────────────────────── diff --git a/api/models.py b/api/models.py index 87d9c41..ec1634f 100644 --- a/api/models.py +++ b/api/models.py @@ -4,6 +4,7 @@ Hermes Web UI -- Session model and in-memory session store. import collections import json import logging +import os import time import uuid from pathlib import Path @@ -19,22 +20,63 @@ from api.workspace import get_last_workspace logger = logging.getLogger(__name__) -def _write_session_index(): - """Rebuild the session index file for O(1) future reads.""" - entries = [] - for p in SESSION_DIR.glob('*.json'): - if p.name.startswith('_'): continue - try: - s = Session.load(p.stem) - if s: entries.append(s.compact()) - except Exception: - logger.debug("Failed to load session from %s", p) - with LOCK: - for s in SESSIONS.values(): - if not any(e['session_id'] == s.session_id for e in entries): - entries.append(s.compact()) - entries.sort(key=lambda s: s['updated_at'], reverse=True) - SESSION_INDEX_FILE.write_text(json.dumps(entries, ensure_ascii=False, indent=2), encoding='utf-8') +def _write_session_index(updates=None): + """Update the session index file. + + When *updates* is provided (a list of Session objects whose compact + entries should be refreshed), this does a targeted in-place update of + the existing index — O(1) for single-session changes. When *updates* + is None, a full rebuild is performed (used on startup / first call). + """ + # Lazy full-rebuild path — used when index doesn't exist yet. + if updates is None or not SESSION_INDEX_FILE.exists(): + entries = [] + for p in SESSION_DIR.glob('*.json'): + if p.name.startswith('_'): continue + try: + s = Session.load(p.stem) + if s: entries.append(s.compact()) + except Exception: + logger.debug("Failed to load session from %s", p) + with LOCK: + for s in SESSIONS.values(): + if not any(e['session_id'] == s.session_id for e in entries): + entries.append(s.compact()) + entries.sort(key=lambda s: s['updated_at'], reverse=True) + _tmp = SESSION_INDEX_FILE.with_suffix('.tmp') + _tmp.write_text(json.dumps(entries, ensure_ascii=False, indent=2), encoding='utf-8') + os.replace(_tmp, SESSION_INDEX_FILE) + return + + # Fast path: patch existing index with updated sessions. + # This avoids loading every session file on every single save(). + # LOCK covers the entire read-patch-write to prevent concurrent save() calls + # from both reading the same baseline and one losing its update. + _fallback = False + try: + with LOCK: + existing = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8')) + # Build lookup of updated entries + updated_map = {s.session_id: s.compact() for s in updates} + existing_ids = {e.get('session_id') for e in existing} + # Add any updated entries not yet in the index + for sid, entry in updated_map.items(): + if sid not in existing_ids: + existing.append(entry) + # Replace matching entries in-place + for i, e in enumerate(existing): + sid = e.get('session_id') + if sid in updated_map: + existing[i] = updated_map[sid] + existing.sort(key=lambda s: s.get('updated_at', 0), reverse=True) + _tmp = SESSION_INDEX_FILE.with_suffix('.tmp') + _tmp.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding='utf-8') + os.replace(_tmp, SESSION_INDEX_FILE) + except Exception: + _fallback = True + if _fallback: + # Corrupt or missing index — fall back to full rebuild (called outside LOCK to avoid deadlock) + _write_session_index(updates=None) class Session: @@ -86,7 +128,7 @@ class Session: json.dumps(self.__dict__, ensure_ascii=False, indent=2), encoding='utf-8', ) - _write_session_index() + _write_session_index(updates=[self]) @classmethod def load(cls, sid): diff --git a/tests/conftest.py b/tests/conftest.py index e50eb3c..f80d66c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -342,6 +342,33 @@ def base_url(): return TEST_BASE +# ── Per-test model cache invalidation ──────────────────────────────────────── +# The TTL cache for get_available_models() persists across tests within the +# same process. Tests that modify cfg in-memory won't trigger the mtime path, +# so the cache must be explicitly invalidated after each test that exercises +# provider/model detection. + +@pytest.fixture(autouse=True) +def _invalidate_models_cache_after_test(): + """Force the TTL cache to be cleared before and after every test. + + This prevents state bleed where a test that calls get_available_models() + populates the cache with a particular config, and the next test sees stale + results even though it has mutated _cfg_cache in-memory. + """ + try: + from api.config import invalidate_models_cache + invalidate_models_cache() + except Exception: + pass + yield + try: + from api.config import invalidate_models_cache + invalidate_models_cache() + except Exception: + pass + + # ── Per-test session cleanup ────────────────────────────────────────────────── @pytest.fixture(autouse=True) diff --git a/tests/test_session_index.py b/tests/test_session_index.py new file mode 100644 index 0000000..92846b8 --- /dev/null +++ b/tests/test_session_index.py @@ -0,0 +1,350 @@ +""" +Tests for the incremental session index in api/models.py. + +Validates: + - Incremental patch correctness (existing entries preserved, updated) + - New session appended to existing index + - First call (no index file) triggers full rebuild + - Corrupt index triggers fallback to full rebuild + - Concurrent saves don't lose data + - Atomic write leaves no .tmp file behind + - Deadlock guard on fallback path +""" +import json +import os +import threading +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +import api.models as models +from api.models import Session, _write_session_index + + +@pytest.fixture(autouse=True) +def _isolate_session_dir(tmp_path, monkeypatch): + """Redirect SESSION_DIR and SESSION_INDEX_FILE to a temp directory + so tests don't touch the real session store. + """ + session_dir = tmp_path / "sessions" + session_dir.mkdir() + index_file = session_dir / "_index.json" + + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file) + # Also patch the module-level references that Session uses + monkeypatch.setattr(models.Session, "__module__", models.__name__) + + # Clear the in-memory SESSIONS cache to avoid bleed + models.SESSIONS.clear() + + yield session_dir, index_file + + models.SESSIONS.clear() + + +def _make_session(session_id, title="Untitled", updated_at=None): + """Helper to create a Session with a known ID and title.""" + s = Session(session_id=session_id, title=title, messages=[{"role": "user", "content": "hi"}]) + if updated_at is not None: + s.updated_at = updated_at + return s + + +def _write_index_file(index_file, entries): + """Write entries list to the index file atomically.""" + tmp = index_file.with_suffix(".tmp") + tmp.write_text(json.dumps(entries, ensure_ascii=False, indent=2), encoding="utf-8") + os.replace(str(tmp), str(index_file)) + + +def _read_index(index_file): + """Read and parse the session index file.""" + return json.loads(index_file.read_text(encoding="utf-8")) + + +# ── 6. test_incremental_patch_correctness ───────────────────────────────── + +def test_incremental_patch_correctness(): + """Pre-write an index with 3 sessions (A, B, C). Create an updated + Session for B with a new title. Call _write_session_index(updates=[B]). + Verify A and C are unchanged, B has the new title, sort order preserved. + """ + + + # We need to get the fixture values — but since it's autouse, the monkeypatch + # has already been applied. Access the patched values directly. + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + # Create 3 sessions with different timestamps + sA = _make_session("sess_a", "Alpha", updated_at=100.0) + sB = _make_session("sess_b", "Bravo", updated_at=200.0) + sC = _make_session("sess_c", "Charlie", updated_at=300.0) + + # Write session files to disk (so full rebuild can find them) + for s in (sA, sB, sC): + s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + # Build initial index + _write_session_index(updates=None) + index = _read_index(index_file) + assert len(index) == 3 + + # Now update B with a new title + sB_updated = _make_session("sess_b", "Bravo Updated", updated_at=250.0) + sB_updated.path.write_text( + json.dumps(sB_updated.__dict__, ensure_ascii=False, indent=2), encoding="utf-8" + ) + + # Incremental update + _write_session_index(updates=[sB_updated]) + + # Verify + index = _read_index(index_file) + index_map = {e["session_id"]: e for e in index} + + assert index_map["sess_a"]["title"] == "Alpha", "A should be unchanged" + assert index_map["sess_c"]["title"] == "Charlie", "C should be unchanged" + assert index_map["sess_b"]["title"] == "Bravo Updated", "B should have new title" + + # Sort order: Charlie (300) > Bravo Updated (250) > Alpha (100) + assert index[0]["session_id"] == "sess_c" + assert index[1]["session_id"] == "sess_b" + assert index[2]["session_id"] == "sess_a" + + +# ── 7. test_new_session_appended_to_index ───────────────────────────────── + +def test_new_session_appended_to_index(): + """Pre-write index with sessions A, B. Call _write_session_index(updates=[C]) + where C is not in the existing index. Verify C appears in the index. + """ + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + sA = _make_session("sess_a", "Alpha", updated_at=100.0) + sB = _make_session("sess_b", "Bravo", updated_at=200.0) + + for s in (sA, sB): + s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + _write_session_index(updates=None) + + # Create a new session C not in the index + sC = _make_session("sess_c", "Charlie", updated_at=300.0) + sC.path.write_text(json.dumps(sC.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + _write_session_index(updates=[sC]) + + index = _read_index(index_file) + ids = {e["session_id"] for e in index} + assert "sess_c" in ids, "New session C should appear in the index" + assert "sess_a" in ids + assert "sess_b" in ids + + +# ── 8. test_first_call_full_rebuild ────────────────────────────────────── + +def test_first_call_full_rebuild(): + """When no index file exists, calling _write_session_index(updates=[session]) + should fall back to full rebuild and create the index. + """ + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + # No index file yet + assert not index_file.exists() + + sA = _make_session("sess_a", "Alpha", updated_at=100.0) + sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + # Call with updates — should trigger full rebuild since index doesn't exist + _write_session_index(updates=[sA]) + + # Index should now exist + assert index_file.exists(), "Index file should be created" + + index = _read_index(index_file) + ids = {e["session_id"] for e in index} + assert "sess_a" in ids, "Session A should appear in the rebuilt index" + + +# ── 9. test_corrupt_index_fallback ──────────────────────────────────────── + +def test_corrupt_index_fallback(): + """Write garbage/invalid JSON to SESSION_INDEX_FILE. Call + _write_session_index(updates=[session]). Verify it falls back to + full rebuild and the result is valid JSON with correct entries. + """ + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + # Write corrupt data + index_file.write_text("THIS IS NOT JSON {{{", encoding="utf-8") + + sA = _make_session("sess_a", "Alpha", updated_at=100.0) + sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + # Should not raise; should fall back to full rebuild + _write_session_index(updates=[sA]) + + # Index should now be valid JSON + assert index_file.exists() + index = _read_index(index_file) + assert isinstance(index, list), "Index should be a list" + + ids = {e["session_id"] for e in index} + assert "sess_a" in ids, "Session A should appear after fallback rebuild" + + +# ── 10. test_concurrent_saves_dont_lose_data ──────────────────────────── + +def test_concurrent_saves_dont_lose_data(): + """Create 2 threads, each calling Session.save() on different sessions + with a pre-existing index. Use a threading.Event barrier to force them + to run concurrently. Assert both updates are present in the final index. + """ + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + sA = _make_session("sess_a", "Alpha", updated_at=100.0) + sB = _make_session("sess_b", "Bravo", updated_at=200.0) + + for s in (sA, sB): + s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + # Build initial index + _write_session_index(updates=None) + + # Now update both sessions concurrently + barrier = threading.Event() + errors = [] + + def _update_session(session, new_title, new_updated_at): + try: + barrier.wait(timeout=5) + session.title = new_title + session.updated_at = new_updated_at + session.save() + except Exception as e: + errors.append(e) + + sA.title = "Alpha V2" + sA.updated_at = 150.0 + sB.title = "Bravo V2" + sB.updated_at = 250.0 + + t1 = threading.Thread(target=_update_session, args=(sA, "Alpha V2", 150.0)) + t2 = threading.Thread(target=_update_session, args=(sB, "Bravo V2", 250.0)) + + t1.start() + t2.start() + + # Release both threads simultaneously + barrier.set() + + t1.join(timeout=10) + t2.join(timeout=10) + + assert not errors, f"Errors during concurrent saves: {errors}" + + # Verify both updates are in the final index + index = _read_index(index_file) + index_map = {e["session_id"]: e for e in index} + + assert "sess_a" in index_map, "Session A should be in index" + assert "sess_b" in index_map, "Session B should be in index" + assert index_map["sess_a"]["title"] == "Alpha V2", "Session A title should be updated" + assert index_map["sess_b"]["title"] == "Bravo V2", "Session B title should be updated" + + +# ── 11. test_atomic_write_no_tmp_remains ───────────────────────────────── + +def test_atomic_write_no_tmp_remains(): + """After _write_session_index completes, no .tmp file should remain + in SESSION_DIR. + """ + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + sA = _make_session("sess_a", "Alpha", updated_at=100.0) + sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + _write_session_index(updates=[sA]) + + # Check for any .tmp files in SESSION_DIR + tmp_files = list(session_dir.glob("*.tmp")) + assert len(tmp_files) == 0, f"Unexpected .tmp files remain: {tmp_files}" + + # Also test incremental path + sA.title = "Alpha V2" + sA.updated_at = 200.0 + _write_session_index(updates=[sA]) + + tmp_files = list(session_dir.glob("*.tmp")) + assert len(tmp_files) == 0, f"Unexpected .tmp files after incremental write: {tmp_files}" + + +# ── 12. test_deadlock_guard_on_fallback ────────────────────────────────── + +def test_deadlock_guard_on_fallback(): + """Mock the index file read to raise an exception, then verify + _write_session_index(updates=[session]) completes without hanging. + + This tests that the fallback path (corrupt index -> full rebuild) + is called outside the LOCK, so it doesn't deadlock. + """ + session_dir = models.SESSION_DIR + index_file = models.SESSION_INDEX_FILE + + # Create a valid index file so the incremental path is attempted + _write_index_file(index_file, [ + {"session_id": "sess_a", "title": "Alpha", "updated_at": 100.0, + "workspace": "/tmp", "model": "test", "message_count": 0, + "created_at": 100.0, "pinned": False, "archived": False}, + ]) + + sB = _make_session("sess_b", "Bravo", updated_at=200.0) + sB.path.write_text(json.dumps(sB.__dict__, ensure_ascii=False, indent=2), encoding="utf-8") + + # Make the index file read raise an exception to trigger fallback + original_read_text = Path.read_text + call_count = 0 + + def _broken_read_text(self, *args, **kwargs): + nonlocal call_count + # Only break the index file read, not the session file reads + if str(self) == str(index_file) and call_count == 0: + call_count += 1 + raise OSError("Simulated corrupt index read") + return original_read_text(self, *args, **kwargs) + + with patch.object(Path, "read_text", _broken_read_text): + # This should complete without hanging (deadlock guard) + # Use a timeout to detect deadlock + done = threading.Event() + result = [None] + exc = [None] + + def _run(): + try: + _write_session_index(updates=[sB]) + result[0] = "done" + except Exception as e: + exc[0] = e + finally: + done.set() + + t = threading.Thread(target=_run) + t.start() + finished = done.wait(timeout=10) + + assert finished, "_write_session_index hung — likely deadlock in fallback path" + assert exc[0] is None, f"Unexpected exception: {exc[0]}" + + # The index should still be valid after fallback + index = _read_index(index_file) + assert isinstance(index, list) diff --git a/tests/test_ttl_cache.py b/tests/test_ttl_cache.py new file mode 100644 index 0000000..42a7d99 --- /dev/null +++ b/tests/test_ttl_cache.py @@ -0,0 +1,226 @@ +""" +Tests for the TTL cache in api/config.py — get_available_models(). + +Validates: + - Cache hit within TTL window + - TTL expiry triggers re-scan + - Config mtime change invalidates cache before TTL check + - copy.deepcopy() isolation (mutating returned dict doesn't pollute cache) + - invalidate_models_cache() direct invalidation +""" +import time +from unittest.mock import patch + +import api.config as config + + +def _reset_cache(): + """Reset TTL cache globals to a clean state.""" + config._available_models_cache = None + config._available_models_cache_ts = 0.0 + + +# ── 1. test_cache_hit_within_ttl ────────────────────────────────────────── + +def test_cache_hit_within_ttl(): + """Call get_available_models() twice within the TTL window. + The second call should return cached data without re-scanning providers. + We verify this by patching reload_config (called when cache is cold) + and asserting it is only invoked once. + """ + _reset_cache() + original_reload = config.reload_config + + call_count = 0 + + def _counting_reload(): + nonlocal call_count + call_count += 1 + return original_reload() + + with patch.object(config, "reload_config", wraps=original_reload, side_effect=_counting_reload): + saved_mtime = config._cfg_mtime + try: + # Force mtime mismatch so the first call triggers reload_config + cache fill + config._cfg_mtime = 0.0 + result1 = config.get_available_models() + first_call_count = call_count + + # Sync _cfg_mtime to the actual file so the second call doesn't + # re-trigger reload_config via mtime mismatch — we want it to hit the TTL cache. + try: + config._cfg_mtime = config.Path(config._get_config_path()).stat().st_mtime + except OSError: + config._cfg_mtime = 0.0 + + result2 = config.get_available_models() + + # Both results should have the same structure + assert "groups" in result1 + assert "groups" in result2 + + # reload_config should not have been called again for the second invocation + # (the TTL cache served it) + assert call_count == first_call_count, ( + f"Expected no extra reload_config calls, but got " + f"{call_count - first_call_count} extra" + ) + finally: + config._cfg_mtime = saved_mtime + _reset_cache() + + +# ── 2. test_ttl_expiry ─────────────────────────────────────────────────── + +def test_ttl_expiry(): + """Populate the cache, then advance time.monotonic() past 60s. + The next call should re-scan (not serve from cache). + """ + _reset_cache() + + # Ensure _cfg_mtime matches file so mtime check doesn't invalidate + try: + config._cfg_mtime = config.Path(config._get_config_path()).stat().st_mtime + except OSError: + config._cfg_mtime = 0.0 + + # First call populates cache + result1 = config.get_available_models() + assert config._available_models_cache is not None, "Cache should be populated" + + # Record the cache timestamp + cache_ts = config._available_models_cache_ts + + # Advance time.monotonic() by more than the TTL + original_monotonic = time.monotonic + offset = config._AVAILABLE_MODELS_CACHE_TTL + 10.0 # 70s past the real monotonic + + with patch.object(time, "monotonic", side_effect=lambda: original_monotonic() + offset): + result2 = config.get_available_models() + + # The cache should have been refreshed — the timestamp must be newer + assert config._available_models_cache_ts > cache_ts, ( + "Cache should have been refreshed after TTL expiry" + ) + + _reset_cache() + + +# ── 3. test_mtime_invalidation ─────────────────────────────────────────── + +def test_mtime_invalidation(): + """Populate the cache, then change _cfg_mtime to simulate a config file + change on disk. The next call should invalidate the cache and re-scan. + """ + _reset_cache() + + # Ensure _cfg_mtime matches file so first call doesn't re-scan due to mtime + try: + real_mtime = config.Path(config._get_config_path()).stat().st_mtime + except OSError: + real_mtime = 0.0 + config._cfg_mtime = real_mtime + + # First call populates cache + result1 = config.get_available_models() + assert config._available_models_cache is not None + + # Simulate config.yaml changed on disk by setting _cfg_mtime to 0 + # (which won't match the actual file mtime) + config._cfg_mtime = 0.0 + + # The next call should detect mtime mismatch, reload, and invalidate cache + old_cache = config._available_models_cache + old_ts = config._available_models_cache_ts + + result2 = config.get_available_models() + + # Cache must have been refreshed — timestamp advanced since we reset it + # to 0.0 on invalidation. + assert config._available_models_cache_ts > 0.0, ( + "Cache timestamp should be updated after invalidation + rebuild" + ) + + # Restore + config._cfg_mtime = real_mtime + _reset_cache() + + +# ── 4. test_deepcopy_isolation ──────────────────────────────────────────── + +def test_deepcopy_isolation(): + """Mutating the returned dict from get_available_models() must not + affect the cache or subsequent return values. + """ + _reset_cache() + + # Ensure _cfg_mtime matches file so mtime check doesn't invalidate + try: + config._cfg_mtime = config.Path(config._get_config_path()).stat().st_mtime + except OSError: + config._cfg_mtime = 0.0 + + # First call populates cache + result1 = config.get_available_models() + + # Mutate the returned dict + if result1["groups"]: + result1["groups"][0]["models"].clear() + result1["groups"].append({"provider": "FAKE", "models": [{"id": "fake-model"}]}) + result1["active_provider"] = "HACKED" + + # Second call should return an unmutated copy + result2 = config.get_available_models() + + # The mutated keys must not appear in the second result + assert result2["active_provider"] != "HACKED", "Mutation leaked into cache" + assert not any( + g.get("provider") == "FAKE" for g in result2["groups"] + ), "Fake provider leaked into cache" + + # If there were groups originally, the first group's models should not be empty + # (unless it genuinely had no models, which is unlikely) + if result1["groups"] and result2["groups"]: + # result1["groups"][0]["models"] was cleared, but result2 should be intact + assert len(result2["groups"][0].get("models", [])) > 0, ( + "Mutation of result1 cleared models in result2 — deepcopy failed" + ) + + _reset_cache() + + +# ── 5. test_invalidate_models_cache_direct ─────────────────────────────── + +def test_invalidate_models_cache_direct(): + """Call invalidate_models_cache() after populating the cache. + _AVAILABLE_MODELS_CACHE should be None and the next call should re-scan. + """ + _reset_cache() + + # Ensure _cfg_mtime matches file so mtime check doesn't invalidate + try: + config._cfg_mtime = config.Path(config._get_config_path()).stat().st_mtime + except OSError: + config._cfg_mtime = 0.0 + + # First call populates cache + result1 = config.get_available_models() + assert config._available_models_cache is not None, "Cache should be populated" + first_ts = config._available_models_cache_ts + + # Directly invalidate + config.invalidate_models_cache() + + # Cache must be cleared + assert config._available_models_cache is None, ( + "invalidate_models_cache() should set _AVAILABLE_MODELS_CACHE to None" + ) + + # Next call should re-scan and produce a fresh cache + result2 = config.get_available_models() + assert config._available_models_cache is not None, "Cache should be re-populated" + assert config._available_models_cache_ts >= first_ts, ( + "Cache timestamp should be updated after re-scan" + ) + + _reset_cache()