fix: periodic session checkpoint during streaming — v0.50.132 (#810)
Closes #765. Supersedes #809 (@bergeouss). Co-authored-by: bergeouss <bergeouss@users.noreply.github.com>
This commit is contained in:
@@ -1,5 +1,10 @@
|
||||
# Hermes Web UI -- Changelog
|
||||
|
||||
## [v0.50.132] — 2026-04-21
|
||||
|
||||
### Fixed
|
||||
- **Periodic session checkpoint during long-running agent tasks** — messages accumulated during multi-step research or coding tasks were silently lost if the server restarted mid-run. The root cause: `Session.save()` was only called after `agent.run_conversation()` completed. The fix adds a daemon thread that saves the session every 15 seconds whenever the `on_tool` callback signals a completed tool call — the first reliable mid-run signal that real progress has been made (the agent works on an internal copy of `s.messages`, so watching message-count would never trigger). `Session.save()` gains a `skip_index=True` flag so checkpoints skip the expensive index rebuild; the final `s.save()` at task completion still rebuilds it. On a server restart the user's message and turn bookkeeping remain on disk — worst case: up to 15 seconds of tool-call progress lost rather than the entire conversation turn. Closes #765. Absorbed and corrected from PR #809 by @bergeouss. (#810)
|
||||
|
||||
## [v0.50.131] — 2026-04-21
|
||||
|
||||
### Fixed
|
||||
|
||||
@@ -121,14 +121,15 @@ class Session:
|
||||
def path(self):
|
||||
return SESSION_DIR / f'{self.session_id}.json'
|
||||
|
||||
def save(self, touch_updated_at: bool = True) -> None:
|
||||
def save(self, touch_updated_at: bool = True, skip_index: bool = False) -> None:
|
||||
if touch_updated_at:
|
||||
self.updated_at = time.time()
|
||||
self.path.write_text(
|
||||
json.dumps(self.__dict__, ensure_ascii=False, indent=2),
|
||||
encoding='utf-8',
|
||||
)
|
||||
_write_session_index(updates=[self])
|
||||
if not skip_index:
|
||||
_write_session_index(updates=[self])
|
||||
|
||||
@classmethod
|
||||
def load(cls, sid):
|
||||
|
||||
@@ -822,6 +822,10 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
except Exception:
|
||||
logger.debug("Failed to put event to queue")
|
||||
|
||||
# Initialised here (before any code that may raise) so the outer `finally`
|
||||
# block can safely check `if _checkpoint_stop is not None` even when an
|
||||
# exception fires before the checkpoint thread is created (Issue #765).
|
||||
_checkpoint_stop = None
|
||||
try:
|
||||
s = get_session(session_id)
|
||||
s.workspace = str(Path(workspace).expanduser().resolve())
|
||||
@@ -1025,6 +1029,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
live_tc['duration'] = cb_kwargs.get('duration')
|
||||
live_tc['is_error'] = bool(cb_kwargs.get('is_error', False))
|
||||
break
|
||||
# Signal the checkpoint thread that new work has completed (Issue #765).
|
||||
# Each completed tool call is a meaningful unit of progress worth persisting.
|
||||
_checkpoint_activity[0] += 1
|
||||
put('tool_complete', {
|
||||
'event_type': event_type,
|
||||
'name': name,
|
||||
@@ -1174,6 +1181,40 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
if _personality_prompt:
|
||||
agent.ephemeral_system_prompt = _personality_prompt
|
||||
_previous_messages = list(s.messages or [])
|
||||
|
||||
# ── Periodic checkpoint during streaming (Issue #765) ──
|
||||
# The agent works on an internal copy of s.messages during run_conversation()
|
||||
# so we cannot watch s.messages for growth. Instead, on_tool() increments
|
||||
# _checkpoint_activity[0] each time a tool call completes — that is the real
|
||||
# signal that progress has been made worth persisting.
|
||||
#
|
||||
# What gets saved on each checkpoint:
|
||||
# - s.pending_user_message (already written before run starts)
|
||||
# - s.pending_started_at / s.active_stream_id (turn bookkeeping)
|
||||
# On a server restart the UI will see a session with a pending message and no
|
||||
# response — better than a silent loss of the entire conversation turn.
|
||||
# The final s.save() at task completion handles the full session update + index.
|
||||
# (_checkpoint_stop is pre-initialised at the top of the outer try.)
|
||||
_checkpoint_activity = [0]
|
||||
|
||||
def _periodic_checkpoint():
|
||||
last_saved_activity = 0
|
||||
while not _checkpoint_stop.wait(15):
|
||||
try:
|
||||
cur = _checkpoint_activity[0]
|
||||
if cur > last_saved_activity:
|
||||
s.save(skip_index=True)
|
||||
last_saved_activity = cur
|
||||
except Exception as e:
|
||||
logger.debug("Periodic checkpoint save failed: %s", e)
|
||||
|
||||
_checkpoint_stop = threading.Event()
|
||||
_ckpt_thread = threading.Thread(
|
||||
target=_periodic_checkpoint, daemon=True,
|
||||
name=f"ckpt-{session_id[:8]}",
|
||||
)
|
||||
_ckpt_thread.start()
|
||||
|
||||
result = agent.run_conversation(
|
||||
user_message=workspace_ctx + msg_text,
|
||||
system_message=workspace_system_msg,
|
||||
@@ -1495,6 +1536,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
_apperror_payload['hint'] = _exc_hint
|
||||
put('apperror', _apperror_payload)
|
||||
finally:
|
||||
# Stop periodic checkpoint thread if it was started (Issue #765)
|
||||
if _checkpoint_stop is not None:
|
||||
_checkpoint_stop.set()
|
||||
_clear_thread_env() # TD1: always clear thread-local context
|
||||
with STREAMS_LOCK:
|
||||
STREAMS.pop(stream_id, None)
|
||||
|
||||
304
tests/test_issue765_streaming_persistence.py
Normal file
304
tests/test_issue765_streaming_persistence.py
Normal file
@@ -0,0 +1,304 @@
|
||||
"""
|
||||
Tests for periodic session persistence during streaming (Issue #765).
|
||||
|
||||
Validates:
|
||||
- Session.save(skip_index=True) writes the JSON file but skips the index rebuild
|
||||
- The periodic checkpoint fires when _checkpoint_activity is incremented
|
||||
(as it would be by on_tool() during real agent execution)
|
||||
- Messages stored via pending_user_message survive a simulated server restart
|
||||
"""
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
import api.models as models
|
||||
from api.models import Session
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate_session_dir(tmp_path, monkeypatch):
|
||||
"""Redirect SESSION_DIR and SESSION_INDEX_FILE to a temp directory."""
|
||||
session_dir = tmp_path / "sessions"
|
||||
session_dir.mkdir()
|
||||
index_file = session_dir / "_index.json"
|
||||
|
||||
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
|
||||
monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file)
|
||||
|
||||
models.SESSIONS.clear()
|
||||
yield session_dir, index_file
|
||||
models.SESSIONS.clear()
|
||||
|
||||
|
||||
def _make_session(session_id="abc123", messages=None):
|
||||
"""Helper to create a Session with a known ID."""
|
||||
return Session(
|
||||
session_id=session_id,
|
||||
title="Test Session",
|
||||
messages=messages or [{"role": "user", "content": "hello"}],
|
||||
)
|
||||
|
||||
|
||||
class TestSaveSkipIndex:
|
||||
"""Tests for the skip_index parameter on Session.save()."""
|
||||
|
||||
def test_save_writes_json_file(self):
|
||||
"""save() always writes the session JSON file, regardless of skip_index."""
|
||||
s = _make_session("s1")
|
||||
s.save()
|
||||
assert s.path.exists()
|
||||
data = json.loads(s.path.read_text())
|
||||
assert data["session_id"] == "s1"
|
||||
assert len(data["messages"]) == 1
|
||||
|
||||
def test_save_with_skip_index_writes_json(self):
|
||||
"""save(skip_index=True) still writes the session JSON file."""
|
||||
s = _make_session("s2")
|
||||
s.save(skip_index=True)
|
||||
assert s.path.exists()
|
||||
data = json.loads(s.path.read_text())
|
||||
assert data["session_id"] == "s2"
|
||||
|
||||
def test_save_with_skip_index_skips_index_rebuild(self):
|
||||
"""save(skip_index=True) does NOT create or update the session index."""
|
||||
s = _make_session("s3")
|
||||
s.save(skip_index=True)
|
||||
index = models.SESSION_INDEX_FILE
|
||||
assert not index.exists(), "Index file should not be created with skip_index=True"
|
||||
|
||||
def test_save_without_skip_index_creates_index(self):
|
||||
"""save() (default) DOES create the session index."""
|
||||
s = _make_session("s4")
|
||||
s.save()
|
||||
index = models.SESSION_INDEX_FILE
|
||||
assert index.exists(), "Index file should be created by default save()"
|
||||
data = json.loads(index.read_text())
|
||||
sids = [e["session_id"] for e in data]
|
||||
assert "s4" in sids
|
||||
|
||||
def test_skip_index_then_full_save_updates_index(self):
|
||||
"""After skip_index saves, a full save() correctly builds the index."""
|
||||
s = _make_session("s5")
|
||||
s.messages.append({"role": "assistant", "content": "hi there"})
|
||||
s.save(skip_index=True)
|
||||
assert not models.SESSION_INDEX_FILE.exists()
|
||||
|
||||
s.messages.append({"role": "user", "content": "thanks"})
|
||||
s.save()
|
||||
assert models.SESSION_INDEX_FILE.exists()
|
||||
data = json.loads(s.path.read_text())
|
||||
assert len(data["messages"]) == 3
|
||||
|
||||
def test_skip_index_save_with_touch_updated_at_false(self):
|
||||
"""save(skip_index=True, touch_updated_at=False) preserves updated_at."""
|
||||
s = _make_session("touch1")
|
||||
original_updated_at = s.updated_at
|
||||
time.sleep(0.05)
|
||||
s.save(skip_index=True, touch_updated_at=False)
|
||||
data = json.loads(s.path.read_text())
|
||||
assert data["updated_at"] == original_updated_at
|
||||
assert not models.SESSION_INDEX_FILE.exists()
|
||||
|
||||
|
||||
class TestPeriodicCheckpoint:
|
||||
"""Tests for the periodic checkpoint mechanism during streaming.
|
||||
|
||||
The checkpoint is keyed off an activity counter (_checkpoint_activity[0]),
|
||||
incremented by on_tool() on each tool.completed event — NOT off s.messages
|
||||
which is never mutated during agent.run_conversation() (the agent copies it).
|
||||
"""
|
||||
|
||||
def test_checkpoint_fires_on_activity_counter_increment(self):
|
||||
"""Checkpoint saves when _checkpoint_activity counter grows."""
|
||||
s = _make_session("ckpt1")
|
||||
s.pending_user_message = "do a long task"
|
||||
s.save() # initial save (like routes.py does before streaming starts)
|
||||
|
||||
stop_event = threading.Event()
|
||||
_checkpoint_activity = [0]
|
||||
save_count = [0]
|
||||
|
||||
def periodic_checkpoint():
|
||||
last = 0
|
||||
while not stop_event.wait(0.1): # fast interval for test
|
||||
try:
|
||||
cur = _checkpoint_activity[0]
|
||||
if cur > last:
|
||||
s.save(skip_index=True)
|
||||
last = cur
|
||||
save_count[0] += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
t = threading.Thread(target=periodic_checkpoint, daemon=True)
|
||||
t.start()
|
||||
|
||||
# Simulate on_tool() completing twice (as would happen during a real agent run)
|
||||
time.sleep(0.15)
|
||||
_checkpoint_activity[0] += 1 # first tool completes
|
||||
time.sleep(0.25)
|
||||
_checkpoint_activity[0] += 1 # second tool completes
|
||||
time.sleep(0.25)
|
||||
|
||||
stop_event.set()
|
||||
t.join(timeout=2)
|
||||
|
||||
assert save_count[0] >= 2, (
|
||||
"Expected at least 2 checkpoint saves (one per activity increment); "
|
||||
f"got {save_count[0]}"
|
||||
)
|
||||
# Verify the JSON is on disk and readable
|
||||
data = json.loads(s.path.read_text())
|
||||
assert data["pending_user_message"] == "do a long task"
|
||||
|
||||
def test_checkpoint_does_not_fire_without_activity(self):
|
||||
"""Checkpoint skips save when activity counter has not changed."""
|
||||
s = _make_session("ckpt2")
|
||||
s.save()
|
||||
|
||||
stop_event = threading.Event()
|
||||
_checkpoint_activity = [0]
|
||||
save_count = [0]
|
||||
|
||||
def periodic_checkpoint():
|
||||
last = 0
|
||||
while not stop_event.wait(0.05):
|
||||
cur = _checkpoint_activity[0]
|
||||
if cur > last:
|
||||
s.save(skip_index=True)
|
||||
last = cur
|
||||
save_count[0] += 1
|
||||
|
||||
t = threading.Thread(target=periodic_checkpoint, daemon=True)
|
||||
t.start()
|
||||
# No increments — checkpoint should stay quiet
|
||||
time.sleep(0.4)
|
||||
stop_event.set()
|
||||
t.join(timeout=2)
|
||||
|
||||
assert save_count[0] == 0, (
|
||||
f"Expected 0 saves when activity is unchanged; got {save_count[0]}"
|
||||
)
|
||||
|
||||
def test_checkpoint_stops_on_signal(self):
|
||||
"""Checkpoint thread exits cleanly when stop event is set."""
|
||||
s = _make_session("ckpt3")
|
||||
stop_event = threading.Event()
|
||||
iterations = [0]
|
||||
|
||||
def periodic_checkpoint():
|
||||
while not stop_event.wait(0.02):
|
||||
iterations[0] += 1
|
||||
|
||||
t = threading.Thread(target=periodic_checkpoint, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.15)
|
||||
stop_event.set()
|
||||
t.join(timeout=1)
|
||||
assert not t.is_alive(), "Checkpoint thread should have stopped"
|
||||
|
||||
def test_pending_message_survives_simulated_restart(self):
|
||||
"""pending_user_message written before run_conversation survives a restart.
|
||||
|
||||
This is the minimal guarantee for Issue #765: even if the agent produces
|
||||
no tool calls before a crash, the user's message is not silently lost.
|
||||
"""
|
||||
s = _make_session("survive1", messages=[{"role": "user", "content": "first turn"}])
|
||||
s.save() # initial full save
|
||||
|
||||
# Simulate what routes.py does before _run_agent_streaming:
|
||||
s.pending_user_message = "do a long research task"
|
||||
s.pending_started_at = time.time()
|
||||
s.active_stream_id = "stream-abc123"
|
||||
s.save(skip_index=True) # checkpoint-style save
|
||||
|
||||
# Simulate restart: clear in-memory state, reload from disk
|
||||
del s
|
||||
models.SESSIONS.clear()
|
||||
|
||||
reloaded = Session.load("survive1")
|
||||
assert reloaded is not None
|
||||
assert reloaded.pending_user_message == "do a long research task"
|
||||
assert reloaded.active_stream_id == "stream-abc123"
|
||||
# Original messages still intact
|
||||
assert len(reloaded.messages) == 1
|
||||
|
||||
def test_activity_checkpoint_persists_updated_at(self):
|
||||
"""Each checkpoint save updates updated_at, keeping session fresh in sidebar."""
|
||||
s = _make_session("ts1")
|
||||
s.save()
|
||||
ts_before = s.updated_at
|
||||
|
||||
time.sleep(0.05)
|
||||
_checkpoint_activity = [1] # simulate one tool completion
|
||||
|
||||
stop_event = threading.Event()
|
||||
|
||||
def periodic_checkpoint():
|
||||
last = 0
|
||||
while not stop_event.wait(0.05):
|
||||
cur = _checkpoint_activity[0]
|
||||
if cur > last:
|
||||
s.save(skip_index=True)
|
||||
last = cur
|
||||
|
||||
t = threading.Thread(target=periodic_checkpoint, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.2)
|
||||
stop_event.set()
|
||||
t.join(timeout=1)
|
||||
|
||||
data = json.loads(s.path.read_text())
|
||||
assert data["updated_at"] > ts_before, "Checkpoint should update updated_at"
|
||||
|
||||
|
||||
class TestCheckpointVariableLifecycle:
|
||||
"""Regression guard: the outer `finally` must not UnboundLocalError when an
|
||||
exception fires before the checkpoint thread is created. _checkpoint_stop
|
||||
is initialised to None at the very top of the outer try block so the
|
||||
finally's `if _checkpoint_stop is not None` branch is always safe.
|
||||
"""
|
||||
|
||||
def test_checkpoint_stop_initialised_before_any_raiseable_code(self):
|
||||
"""Static check: `_checkpoint_stop = None` must appear before any code
|
||||
that could raise inside _run_agent_streaming's outer try."""
|
||||
src = (Path(__file__).parent.parent / "api" / "streaming.py").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
lines = src.splitlines()
|
||||
try_line = next(
|
||||
i for i, ln in enumerate(lines, 1)
|
||||
if ln.rstrip().endswith("try:") and lines[i - 2].strip().startswith("_checkpoint_stop")
|
||||
)
|
||||
# The assignment must precede the `try:` — not sit inside the nested
|
||||
# block where an earlier line could raise before it runs.
|
||||
init_line = next(
|
||||
i for i, ln in enumerate(lines, 1)
|
||||
if "_checkpoint_stop = None" in ln
|
||||
)
|
||||
assert init_line < try_line, (
|
||||
f"_checkpoint_stop = None (line {init_line}) must precede the outer "
|
||||
f"try block (line {try_line}) so the finally can safely check it."
|
||||
)
|
||||
|
||||
def test_finally_path_when_early_exception_does_not_unbound_error(self):
|
||||
"""Mirror the _run_agent_streaming try/finally structure — proves that
|
||||
pre-initialising _checkpoint_stop = None outside any raiseable code
|
||||
keeps the finally safe."""
|
||||
|
||||
def mimic_run_agent_streaming():
|
||||
_checkpoint_stop = None # pre-init (the fix)
|
||||
try:
|
||||
# Anything here could raise — simulate early failure
|
||||
raise ValueError("early failure, e.g. get_session KeyError")
|
||||
_checkpoint_stop = threading.Event() # never reached
|
||||
finally:
|
||||
# The guard the PR added — must not itself raise
|
||||
if _checkpoint_stop is not None:
|
||||
_checkpoint_stop.set()
|
||||
|
||||
with pytest.raises(ValueError, match="early failure"):
|
||||
mimic_run_agent_streaming()
|
||||
Reference in New Issue
Block a user