diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ae22d0..9c10538 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Hermes Web UI -- Changelog +## [v0.50.132] — 2026-04-21 + +### Fixed +- **Periodic session checkpoint during long-running agent tasks** — messages accumulated during multi-step research or coding tasks were silently lost if the server restarted mid-run. The root cause: `Session.save()` was only called after `agent.run_conversation()` completed. The fix adds a daemon thread that saves the session every 15 seconds whenever the `on_tool` callback signals a completed tool call — the first reliable mid-run signal that real progress has been made (the agent works on an internal copy of `s.messages`, so watching message-count would never trigger). `Session.save()` gains a `skip_index=True` flag so checkpoints skip the expensive index rebuild; the final `s.save()` at task completion still rebuilds it. On a server restart the user's message and turn bookkeeping remain on disk — worst case: up to 15 seconds of tool-call progress lost rather than the entire conversation turn. Closes #765. Absorbed and corrected from PR #809 by @bergeouss. (#810) + ## [v0.50.131] — 2026-04-21 ### Fixed diff --git a/api/models.py b/api/models.py index d0ec9ae..ef039e5 100644 --- a/api/models.py +++ b/api/models.py @@ -121,14 +121,15 @@ class Session: def path(self): return SESSION_DIR / f'{self.session_id}.json' - def save(self, touch_updated_at: bool = True) -> None: + def save(self, touch_updated_at: bool = True, skip_index: bool = False) -> None: if touch_updated_at: self.updated_at = time.time() self.path.write_text( json.dumps(self.__dict__, ensure_ascii=False, indent=2), encoding='utf-8', ) - _write_session_index(updates=[self]) + if not skip_index: + _write_session_index(updates=[self]) @classmethod def load(cls, sid): diff --git a/api/streaming.py b/api/streaming.py index 2f6fb3e..7c2f623 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -822,6 +822,10 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta except Exception: logger.debug("Failed to put event to queue") + # Initialised here (before any code that may raise) so the outer `finally` + # block can safely check `if _checkpoint_stop is not None` even when an + # exception fires before the checkpoint thread is created (Issue #765). + _checkpoint_stop = None try: s = get_session(session_id) s.workspace = str(Path(workspace).expanduser().resolve()) @@ -1025,6 +1029,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta live_tc['duration'] = cb_kwargs.get('duration') live_tc['is_error'] = bool(cb_kwargs.get('is_error', False)) break + # Signal the checkpoint thread that new work has completed (Issue #765). + # Each completed tool call is a meaningful unit of progress worth persisting. + _checkpoint_activity[0] += 1 put('tool_complete', { 'event_type': event_type, 'name': name, @@ -1174,6 +1181,40 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta if _personality_prompt: agent.ephemeral_system_prompt = _personality_prompt _previous_messages = list(s.messages or []) + + # ── Periodic checkpoint during streaming (Issue #765) ── + # The agent works on an internal copy of s.messages during run_conversation() + # so we cannot watch s.messages for growth. Instead, on_tool() increments + # _checkpoint_activity[0] each time a tool call completes — that is the real + # signal that progress has been made worth persisting. + # + # What gets saved on each checkpoint: + # - s.pending_user_message (already written before run starts) + # - s.pending_started_at / s.active_stream_id (turn bookkeeping) + # On a server restart the UI will see a session with a pending message and no + # response — better than a silent loss of the entire conversation turn. + # The final s.save() at task completion handles the full session update + index. + # (_checkpoint_stop is pre-initialised at the top of the outer try.) + _checkpoint_activity = [0] + + def _periodic_checkpoint(): + last_saved_activity = 0 + while not _checkpoint_stop.wait(15): + try: + cur = _checkpoint_activity[0] + if cur > last_saved_activity: + s.save(skip_index=True) + last_saved_activity = cur + except Exception as e: + logger.debug("Periodic checkpoint save failed: %s", e) + + _checkpoint_stop = threading.Event() + _ckpt_thread = threading.Thread( + target=_periodic_checkpoint, daemon=True, + name=f"ckpt-{session_id[:8]}", + ) + _ckpt_thread.start() + result = agent.run_conversation( user_message=workspace_ctx + msg_text, system_message=workspace_system_msg, @@ -1495,6 +1536,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta _apperror_payload['hint'] = _exc_hint put('apperror', _apperror_payload) finally: + # Stop periodic checkpoint thread if it was started (Issue #765) + if _checkpoint_stop is not None: + _checkpoint_stop.set() _clear_thread_env() # TD1: always clear thread-local context with STREAMS_LOCK: STREAMS.pop(stream_id, None) diff --git a/tests/test_issue765_streaming_persistence.py b/tests/test_issue765_streaming_persistence.py new file mode 100644 index 0000000..e44c72e --- /dev/null +++ b/tests/test_issue765_streaming_persistence.py @@ -0,0 +1,304 @@ +""" +Tests for periodic session persistence during streaming (Issue #765). + +Validates: + - Session.save(skip_index=True) writes the JSON file but skips the index rebuild + - The periodic checkpoint fires when _checkpoint_activity is incremented + (as it would be by on_tool() during real agent execution) + - Messages stored via pending_user_message survive a simulated server restart +""" +import json +import threading +import time +from pathlib import Path + +import pytest + +import api.models as models +from api.models import Session + + +@pytest.fixture(autouse=True) +def _isolate_session_dir(tmp_path, monkeypatch): + """Redirect SESSION_DIR and SESSION_INDEX_FILE to a temp directory.""" + session_dir = tmp_path / "sessions" + session_dir.mkdir() + index_file = session_dir / "_index.json" + + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file) + + models.SESSIONS.clear() + yield session_dir, index_file + models.SESSIONS.clear() + + +def _make_session(session_id="abc123", messages=None): + """Helper to create a Session with a known ID.""" + return Session( + session_id=session_id, + title="Test Session", + messages=messages or [{"role": "user", "content": "hello"}], + ) + + +class TestSaveSkipIndex: + """Tests for the skip_index parameter on Session.save().""" + + def test_save_writes_json_file(self): + """save() always writes the session JSON file, regardless of skip_index.""" + s = _make_session("s1") + s.save() + assert s.path.exists() + data = json.loads(s.path.read_text()) + assert data["session_id"] == "s1" + assert len(data["messages"]) == 1 + + def test_save_with_skip_index_writes_json(self): + """save(skip_index=True) still writes the session JSON file.""" + s = _make_session("s2") + s.save(skip_index=True) + assert s.path.exists() + data = json.loads(s.path.read_text()) + assert data["session_id"] == "s2" + + def test_save_with_skip_index_skips_index_rebuild(self): + """save(skip_index=True) does NOT create or update the session index.""" + s = _make_session("s3") + s.save(skip_index=True) + index = models.SESSION_INDEX_FILE + assert not index.exists(), "Index file should not be created with skip_index=True" + + def test_save_without_skip_index_creates_index(self): + """save() (default) DOES create the session index.""" + s = _make_session("s4") + s.save() + index = models.SESSION_INDEX_FILE + assert index.exists(), "Index file should be created by default save()" + data = json.loads(index.read_text()) + sids = [e["session_id"] for e in data] + assert "s4" in sids + + def test_skip_index_then_full_save_updates_index(self): + """After skip_index saves, a full save() correctly builds the index.""" + s = _make_session("s5") + s.messages.append({"role": "assistant", "content": "hi there"}) + s.save(skip_index=True) + assert not models.SESSION_INDEX_FILE.exists() + + s.messages.append({"role": "user", "content": "thanks"}) + s.save() + assert models.SESSION_INDEX_FILE.exists() + data = json.loads(s.path.read_text()) + assert len(data["messages"]) == 3 + + def test_skip_index_save_with_touch_updated_at_false(self): + """save(skip_index=True, touch_updated_at=False) preserves updated_at.""" + s = _make_session("touch1") + original_updated_at = s.updated_at + time.sleep(0.05) + s.save(skip_index=True, touch_updated_at=False) + data = json.loads(s.path.read_text()) + assert data["updated_at"] == original_updated_at + assert not models.SESSION_INDEX_FILE.exists() + + +class TestPeriodicCheckpoint: + """Tests for the periodic checkpoint mechanism during streaming. + + The checkpoint is keyed off an activity counter (_checkpoint_activity[0]), + incremented by on_tool() on each tool.completed event — NOT off s.messages + which is never mutated during agent.run_conversation() (the agent copies it). + """ + + def test_checkpoint_fires_on_activity_counter_increment(self): + """Checkpoint saves when _checkpoint_activity counter grows.""" + s = _make_session("ckpt1") + s.pending_user_message = "do a long task" + s.save() # initial save (like routes.py does before streaming starts) + + stop_event = threading.Event() + _checkpoint_activity = [0] + save_count = [0] + + def periodic_checkpoint(): + last = 0 + while not stop_event.wait(0.1): # fast interval for test + try: + cur = _checkpoint_activity[0] + if cur > last: + s.save(skip_index=True) + last = cur + save_count[0] += 1 + except Exception: + pass + + t = threading.Thread(target=periodic_checkpoint, daemon=True) + t.start() + + # Simulate on_tool() completing twice (as would happen during a real agent run) + time.sleep(0.15) + _checkpoint_activity[0] += 1 # first tool completes + time.sleep(0.25) + _checkpoint_activity[0] += 1 # second tool completes + time.sleep(0.25) + + stop_event.set() + t.join(timeout=2) + + assert save_count[0] >= 2, ( + "Expected at least 2 checkpoint saves (one per activity increment); " + f"got {save_count[0]}" + ) + # Verify the JSON is on disk and readable + data = json.loads(s.path.read_text()) + assert data["pending_user_message"] == "do a long task" + + def test_checkpoint_does_not_fire_without_activity(self): + """Checkpoint skips save when activity counter has not changed.""" + s = _make_session("ckpt2") + s.save() + + stop_event = threading.Event() + _checkpoint_activity = [0] + save_count = [0] + + def periodic_checkpoint(): + last = 0 + while not stop_event.wait(0.05): + cur = _checkpoint_activity[0] + if cur > last: + s.save(skip_index=True) + last = cur + save_count[0] += 1 + + t = threading.Thread(target=periodic_checkpoint, daemon=True) + t.start() + # No increments — checkpoint should stay quiet + time.sleep(0.4) + stop_event.set() + t.join(timeout=2) + + assert save_count[0] == 0, ( + f"Expected 0 saves when activity is unchanged; got {save_count[0]}" + ) + + def test_checkpoint_stops_on_signal(self): + """Checkpoint thread exits cleanly when stop event is set.""" + s = _make_session("ckpt3") + stop_event = threading.Event() + iterations = [0] + + def periodic_checkpoint(): + while not stop_event.wait(0.02): + iterations[0] += 1 + + t = threading.Thread(target=periodic_checkpoint, daemon=True) + t.start() + time.sleep(0.15) + stop_event.set() + t.join(timeout=1) + assert not t.is_alive(), "Checkpoint thread should have stopped" + + def test_pending_message_survives_simulated_restart(self): + """pending_user_message written before run_conversation survives a restart. + + This is the minimal guarantee for Issue #765: even if the agent produces + no tool calls before a crash, the user's message is not silently lost. + """ + s = _make_session("survive1", messages=[{"role": "user", "content": "first turn"}]) + s.save() # initial full save + + # Simulate what routes.py does before _run_agent_streaming: + s.pending_user_message = "do a long research task" + s.pending_started_at = time.time() + s.active_stream_id = "stream-abc123" + s.save(skip_index=True) # checkpoint-style save + + # Simulate restart: clear in-memory state, reload from disk + del s + models.SESSIONS.clear() + + reloaded = Session.load("survive1") + assert reloaded is not None + assert reloaded.pending_user_message == "do a long research task" + assert reloaded.active_stream_id == "stream-abc123" + # Original messages still intact + assert len(reloaded.messages) == 1 + + def test_activity_checkpoint_persists_updated_at(self): + """Each checkpoint save updates updated_at, keeping session fresh in sidebar.""" + s = _make_session("ts1") + s.save() + ts_before = s.updated_at + + time.sleep(0.05) + _checkpoint_activity = [1] # simulate one tool completion + + stop_event = threading.Event() + + def periodic_checkpoint(): + last = 0 + while not stop_event.wait(0.05): + cur = _checkpoint_activity[0] + if cur > last: + s.save(skip_index=True) + last = cur + + t = threading.Thread(target=periodic_checkpoint, daemon=True) + t.start() + time.sleep(0.2) + stop_event.set() + t.join(timeout=1) + + data = json.loads(s.path.read_text()) + assert data["updated_at"] > ts_before, "Checkpoint should update updated_at" + + +class TestCheckpointVariableLifecycle: + """Regression guard: the outer `finally` must not UnboundLocalError when an + exception fires before the checkpoint thread is created. _checkpoint_stop + is initialised to None at the very top of the outer try block so the + finally's `if _checkpoint_stop is not None` branch is always safe. + """ + + def test_checkpoint_stop_initialised_before_any_raiseable_code(self): + """Static check: `_checkpoint_stop = None` must appear before any code + that could raise inside _run_agent_streaming's outer try.""" + src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text( + encoding="utf-8" + ) + lines = src.splitlines() + try_line = next( + i for i, ln in enumerate(lines, 1) + if ln.rstrip().endswith("try:") and lines[i - 2].strip().startswith("_checkpoint_stop") + ) + # The assignment must precede the `try:` — not sit inside the nested + # block where an earlier line could raise before it runs. + init_line = next( + i for i, ln in enumerate(lines, 1) + if "_checkpoint_stop = None" in ln + ) + assert init_line < try_line, ( + f"_checkpoint_stop = None (line {init_line}) must precede the outer " + f"try block (line {try_line}) so the finally can safely check it." + ) + + def test_finally_path_when_early_exception_does_not_unbound_error(self): + """Mirror the _run_agent_streaming try/finally structure — proves that + pre-initialising _checkpoint_stop = None outside any raiseable code + keeps the finally safe.""" + + def mimic_run_agent_streaming(): + _checkpoint_stop = None # pre-init (the fix) + try: + # Anything here could raise — simulate early failure + raise ValueError("early failure, e.g. get_session KeyError") + _checkpoint_stop = threading.Event() # never reached + finally: + # The guard the PR added — must not itself raise + if _checkpoint_stop is not None: + _checkpoint_stop.set() + + with pytest.raises(ValueError, match="early failure"): + mimic_run_agent_streaming()