Files
isparkclaw-webui/tests/test_session_index.py
Frank Song 13dbd818c9 fix: prune stale session index entries after session-id rotation — v0.50.148 (PR #847 by @franksong2702)
Prune ghost _index.json rows whose backing session file no longer exists, on both incremental index writes and all_sessions() reads. Fixes duplicate session entries after session-id rotation (e.g. context compression). Also pre-snapshots in_memory_ids under a single LOCK acquisition in all_sessions() rather than one per row.

Closes #846.

Review additions: optimised lock pattern in all_sessions() (one LOCK acquisition instead of N). Tests: 1856 passing.
2026-04-22 16:26:38 +00:00

413 lines
14 KiB
Python

"""
Tests for the incremental session index in api/models.py.
Validates:
- Incremental patch correctness (existing entries preserved, updated)
- New session appended to existing index
- First call (no index file) triggers full rebuild
- Corrupt index triggers fallback to full rebuild
- Concurrent saves don't lose data
- Atomic write leaves no .tmp file behind
- Deadlock guard on fallback path
"""
import json
import os
import threading
import time
from pathlib import Path
from unittest.mock import patch
import pytest
import api.models as models
from api.models import Session, _write_session_index
@pytest.fixture(autouse=True)
def _isolate_session_dir(tmp_path, monkeypatch):
"""Redirect SESSION_DIR and SESSION_INDEX_FILE to a temp directory
so tests don't touch the real session store.
"""
session_dir = tmp_path / "sessions"
session_dir.mkdir()
index_file = session_dir / "_index.json"
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file)
# Also patch the module-level references that Session uses
monkeypatch.setattr(models.Session, "__module__", models.__name__)
# Clear the in-memory SESSIONS cache to avoid bleed
models.SESSIONS.clear()
yield session_dir, index_file
models.SESSIONS.clear()
def _make_session(session_id, title="Untitled", updated_at=None):
"""Helper to create a Session with a known ID and title."""
s = Session(session_id=session_id, title=title, messages=[{"role": "user", "content": "hi"}])
if updated_at is not None:
s.updated_at = updated_at
return s
def _write_index_file(index_file, entries):
"""Write entries list to the index file atomically."""
tmp = index_file.with_suffix(".tmp")
tmp.write_text(json.dumps(entries, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(str(tmp), str(index_file))
def _read_index(index_file):
"""Read and parse the session index file."""
return json.loads(index_file.read_text(encoding="utf-8"))
# ── 6. test_incremental_patch_correctness ─────────────────────────────────
def test_incremental_patch_correctness():
"""Pre-write an index with 3 sessions (A, B, C). Create an updated
Session for B with a new title. Call _write_session_index(updates=[B]).
Verify A and C are unchanged, B has the new title, sort order preserved.
"""
# We need to get the fixture values — but since it's autouse, the monkeypatch
# has already been applied. Access the patched values directly.
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
# Create 3 sessions with different timestamps
sA = _make_session("sess_a", "Alpha", updated_at=100.0)
sB = _make_session("sess_b", "Bravo", updated_at=200.0)
sC = _make_session("sess_c", "Charlie", updated_at=300.0)
# Write session files to disk (so full rebuild can find them)
for s in (sA, sB, sC):
s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
# Build initial index
_write_session_index(updates=None)
index = _read_index(index_file)
assert len(index) == 3
# Now update B with a new title
sB_updated = _make_session("sess_b", "Bravo Updated", updated_at=250.0)
sB_updated.path.write_text(
json.dumps(sB_updated.__dict__, ensure_ascii=False, indent=2), encoding="utf-8"
)
# Incremental update
_write_session_index(updates=[sB_updated])
# Verify
index = _read_index(index_file)
index_map = {e["session_id"]: e for e in index}
assert index_map["sess_a"]["title"] == "Alpha", "A should be unchanged"
assert index_map["sess_c"]["title"] == "Charlie", "C should be unchanged"
assert index_map["sess_b"]["title"] == "Bravo Updated", "B should have new title"
# Sort order: Charlie (300) > Bravo Updated (250) > Alpha (100)
assert index[0]["session_id"] == "sess_c"
assert index[1]["session_id"] == "sess_b"
assert index[2]["session_id"] == "sess_a"
# ── 7. test_new_session_appended_to_index ─────────────────────────────────
def test_new_session_appended_to_index():
"""Pre-write index with sessions A, B. Call _write_session_index(updates=[C])
where C is not in the existing index. Verify C appears in the index.
"""
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
sA = _make_session("sess_a", "Alpha", updated_at=100.0)
sB = _make_session("sess_b", "Bravo", updated_at=200.0)
for s in (sA, sB):
s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
_write_session_index(updates=None)
# Create a new session C not in the index
sC = _make_session("sess_c", "Charlie", updated_at=300.0)
sC.path.write_text(json.dumps(sC.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
_write_session_index(updates=[sC])
index = _read_index(index_file)
ids = {e["session_id"] for e in index}
assert "sess_c" in ids, "New session C should appear in the index"
assert "sess_a" in ids
assert "sess_b" in ids
def test_incremental_update_prunes_stale_entries():
"""Ghost rows whose backing JSON file is gone must be dropped on the fast path.
This covers session-id rotation paths (e.g. compression) where the old id can
linger in `_index.json` after the file has been renamed.
"""
index_file = models.SESSION_INDEX_FILE
stale = {
"session_id": "ghost_sid",
"title": "Ghost",
"updated_at": 150.0,
"workspace": "/tmp",
"model": "test",
"message_count": 1,
"created_at": 100.0,
"pinned": False,
"archived": False,
}
_write_index_file(index_file, [stale])
sA = _make_session("sess_a", "Alpha", updated_at=200.0)
sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
_write_session_index(updates=[sA])
index = _read_index(index_file)
ids = {e["session_id"] for e in index}
assert "sess_a" in ids
assert "ghost_sid" not in ids, "stale entry with no backing file must be pruned"
# ── 8. test_first_call_full_rebuild ──────────────────────────────────────
def test_first_call_full_rebuild():
"""When no index file exists, calling _write_session_index(updates=[session])
should fall back to full rebuild and create the index.
"""
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
# No index file yet
assert not index_file.exists()
sA = _make_session("sess_a", "Alpha", updated_at=100.0)
sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
# Call with updates — should trigger full rebuild since index doesn't exist
_write_session_index(updates=[sA])
# Index should now exist
assert index_file.exists(), "Index file should be created"
index = _read_index(index_file)
ids = {e["session_id"] for e in index}
assert "sess_a" in ids, "Session A should appear in the rebuilt index"
# ── 9. test_corrupt_index_fallback ────────────────────────────────────────
def test_corrupt_index_fallback():
"""Write garbage/invalid JSON to SESSION_INDEX_FILE. Call
_write_session_index(updates=[session]). Verify it falls back to
full rebuild and the result is valid JSON with correct entries.
"""
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
# Write corrupt data
index_file.write_text("THIS IS NOT JSON {{{", encoding="utf-8")
sA = _make_session("sess_a", "Alpha", updated_at=100.0)
sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
# Should not raise; should fall back to full rebuild
_write_session_index(updates=[sA])
# Index should now be valid JSON
assert index_file.exists()
index = _read_index(index_file)
assert isinstance(index, list), "Index should be a list"
ids = {e["session_id"] for e in index}
assert "sess_a" in ids, "Session A should appear after fallback rebuild"
# ── 10. test_concurrent_saves_dont_lose_data ────────────────────────────
def test_concurrent_saves_dont_lose_data():
"""Create 2 threads, each calling Session.save() on different sessions
with a pre-existing index. Use a threading.Event barrier to force them
to run concurrently. Assert both updates are present in the final index.
"""
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
sA = _make_session("sess_a", "Alpha", updated_at=100.0)
sB = _make_session("sess_b", "Bravo", updated_at=200.0)
for s in (sA, sB):
s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
# Build initial index
_write_session_index(updates=None)
# Now update both sessions concurrently
barrier = threading.Event()
errors = []
def _update_session(session, new_title, new_updated_at):
try:
barrier.wait(timeout=5)
session.title = new_title
session.updated_at = new_updated_at
session.save()
except Exception as e:
errors.append(e)
sA.title = "Alpha V2"
sA.updated_at = 150.0
sB.title = "Bravo V2"
sB.updated_at = 250.0
t1 = threading.Thread(target=_update_session, args=(sA, "Alpha V2", 150.0))
t2 = threading.Thread(target=_update_session, args=(sB, "Bravo V2", 250.0))
t1.start()
t2.start()
# Release both threads simultaneously
barrier.set()
t1.join(timeout=10)
t2.join(timeout=10)
assert not errors, f"Errors during concurrent saves: {errors}"
# Verify both updates are in the final index
index = _read_index(index_file)
index_map = {e["session_id"]: e for e in index}
assert "sess_a" in index_map, "Session A should be in index"
assert "sess_b" in index_map, "Session B should be in index"
assert index_map["sess_a"]["title"] == "Alpha V2", "Session A title should be updated"
assert index_map["sess_b"]["title"] == "Bravo V2", "Session B title should be updated"
# ── 11. test_atomic_write_no_tmp_remains ─────────────────────────────────
def test_atomic_write_no_tmp_remains():
"""After _write_session_index completes, no .tmp file should remain
in SESSION_DIR.
"""
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
sA = _make_session("sess_a", "Alpha", updated_at=100.0)
sA.path.write_text(json.dumps(sA.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
_write_session_index(updates=[sA])
# Check for any .tmp files in SESSION_DIR
tmp_files = list(session_dir.glob("*.tmp"))
assert len(tmp_files) == 0, f"Unexpected .tmp files remain: {tmp_files}"
# Also test incremental path
sA.title = "Alpha V2"
sA.updated_at = 200.0
_write_session_index(updates=[sA])
tmp_files = list(session_dir.glob("*.tmp"))
assert len(tmp_files) == 0, f"Unexpected .tmp files after incremental write: {tmp_files}"
# ── 12. test_deadlock_guard_on_fallback ──────────────────────────────────
def test_deadlock_guard_on_fallback():
"""Mock the index file read to raise an exception, then verify
_write_session_index(updates=[session]) completes without hanging.
This tests that the fallback path (corrupt index -> full rebuild)
is called outside the LOCK, so it doesn't deadlock.
"""
session_dir = models.SESSION_DIR
index_file = models.SESSION_INDEX_FILE
# Create a valid index file so the incremental path is attempted
_write_index_file(index_file, [
{"session_id": "sess_a", "title": "Alpha", "updated_at": 100.0,
"workspace": "/tmp", "model": "test", "message_count": 0,
"created_at": 100.0, "pinned": False, "archived": False},
])
sB = _make_session("sess_b", "Bravo", updated_at=200.0)
sB.path.write_text(json.dumps(sB.__dict__, ensure_ascii=False, indent=2), encoding="utf-8")
# Make the index file read raise an exception to trigger fallback
original_read_text = Path.read_text
call_count = 0
def _broken_read_text(self, *args, **kwargs):
nonlocal call_count
# Only break the index file read, not the session file reads
if str(self) == str(index_file) and call_count == 0:
call_count += 1
raise OSError("Simulated corrupt index read")
return original_read_text(self, *args, **kwargs)
with patch.object(Path, "read_text", _broken_read_text):
# This should complete without hanging (deadlock guard)
# Use a timeout to detect deadlock
done = threading.Event()
result = [None]
exc = [None]
def _run():
try:
_write_session_index(updates=[sB])
result[0] = "done"
except Exception as e:
exc[0] = e
finally:
done.set()
t = threading.Thread(target=_run)
t.start()
finished = done.wait(timeout=10)
assert finished, "_write_session_index hung — likely deadlock in fallback path"
assert exc[0] is None, f"Unexpected exception: {exc[0]}"
# The index should still be valid after fallback
index = _read_index(index_file)
assert isinstance(index, list)
def test_all_sessions_ignores_stale_index_entries():
"""Reading via all_sessions() must not surface ghost rows from _index.json."""
index_file = models.SESSION_INDEX_FILE
valid_session = _make_session("sess_a", "Alpha", updated_at=200.0)
valid_session.path.write_text(
json.dumps(valid_session.__dict__, ensure_ascii=False, indent=2),
encoding="utf-8",
)
valid = valid_session.compact()
stale = {
"session_id": "ghost_sid",
"title": "Ghost",
"updated_at": 150.0,
"workspace": "/tmp",
"model": "test",
"message_count": 1,
"created_at": 100.0,
"pinned": False,
"archived": False,
}
_write_index_file(index_file, [stale, valid])
rows = models.all_sessions()
ids = {e["session_id"] for e in rows}
assert "sess_a" in ids
assert "ghost_sid" not in ids