From d4a3adb7b1204621f307b428498bc0614f46fa93 Mon Sep 17 00:00:00 2001 From: nesquena-hermes Date: Tue, 21 Apr 2026 21:18:55 -0700 Subject: [PATCH] fix(sessions): surface gateway SSE failures and add polling fallback (#828) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(sessions): surface gateway SSE failures and add polling fallback - add a JSON probe mode for the gateway SSE endpoint - detect watcher-unavailable 503s from the browser - fall back to periodic session refresh with a toast - add probe payload tests and endpoint coverage Fixes #635 * fix(sessions): surface gateway SSE failures and add polling fallback (#826) Absorbed from PR #826 by @cloudyun888 (fixes #635). When the gateway watcher thread is not running, the browser now shows a toast notification and falls back to 30-second periodic polling for session sync. Previously the SSE failure was completely silent with no user feedback. Changes from original PR: - Deleted misplaced test_gateway_sse_probe_unit.py (was at repo root, not discovered by `pytest tests/`); unit tests moved into tests/test_gateway_sync.py - _gateway_sse_probe_payload now checks watcher._thread.is_alive() rather than just watcher is not None — a watcher instance with a dead poll thread now correctly reports unavailable and activates the polling fallback - probeGatewaySSEStatus catch(e) now starts the polling fallback on network error rather than silently swallowing the failure - Added 5 unit tests covering all watcher-alive/dead/missing/disabled branches Co-authored-by: cloudyun888 <269269188+86cloudyun-afk@users.noreply.github.com> * cleanup(gateway): public is_alive() + dedup probe/live watcher-alive check + changelog Three small cleanups on top of @cloudyun888's PR #826 absorption: 1. Add GatewayWatcher.is_alive() public accessor so routes.py doesn't reach into the private _thread attribute. The existing private- attribute check stays as a defensive fallback for any older in- memory instance or test double that doesn't implement the full API. 2. Dedupe the watcher_alive computation in _handle_gateway_sse_stream: the live-SSE path now calls _gateway_sse_probe_payload(...) and reads its watcher_running field instead of re-deriving the same logic inline. Keeps probe and SSE in sync automatically. 3. CHANGELOG trailer was (#826, fixes #635, @cloudyun888) — this PR is #828, so updated to (#828, absorbs PR #826 by @cloudyun888, fixes #635) matching the repo convention for absorbed PRs (see #805). Added two regression tests: - test_gateway_watcher_is_alive_public_method — covers the three lifecycle states (before start, while running, after stop). - test_probe_payload_prefers_public_is_alive — asserts the probe uses watcher.is_alive() rather than poking _thread when the public method exists. Full suite: 1735 passed, 0 new failures. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: cloudyun888 <269269188+86cloudyun-afk@users.noreply.github.com> Co-authored-by: nesquena-hermes Co-authored-by: Nathan Esquenazi Co-authored-by: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 13 ++++ api/gateway_watcher.py | 13 ++++ api/routes.py | 50 ++++++++++++-- static/sessions.js | 58 +++++++++++++++- tests/test_gateway_sync.py | 138 +++++++++++++++++++++++++++++++++++++ 5 files changed, 264 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e36b1a5..a801f83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Hermes Web UI -- Changelog +## [v0.50.140] — 2026-04-22 + +### Fixed +- **Gateway SSE sync failures now surface to the user** — when the gateway watcher + thread is not running, the browser now shows a toast notification and automatically + falls back to 30-second polling for session sync. Previously this failed silently + with no feedback. (#828, absorbs PR #826 by @cloudyun888, fixes #635) +- `_gateway_sse_probe_payload` now checks `watcher._thread.is_alive()` rather than + just `watcher is not None`, so a watcher instance with a dead poll thread correctly + reports unavailable and triggers the polling fallback. +- Probe fetch network errors now also activate the polling fallback as a safe default + rather than silently swallowing the failure. + ## [v0.50.139] — 2026-04-22 ### Fixed diff --git a/api/gateway_watcher.py b/api/gateway_watcher.py index 409e4d5..594d18f 100644 --- a/api/gateway_watcher.py +++ b/api/gateway_watcher.py @@ -119,6 +119,19 @@ class GatewayWatcher: self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='gateway-watcher') self._thread.start() + def is_alive(self) -> bool: + """Return True when the poll thread is running. + + Public accessor used by ``/api/sessions/gateway/stream`` probe mode and + the live SSE handler to detect a watcher instance whose poll thread + died silently (e.g. uncaught exception in ``_poll_loop``). Callers + use this to decide whether to return 503 and trigger the client-side + polling fallback, instead of handing out an SSE connection that would + never emit events. + """ + t = self._thread + return t is not None and t.is_alive() + def stop(self): """Stop the watcher thread.""" self._stop_event.set() diff --git a/api/routes.py b/api/routes.py index 8db9b9b..38647ca 100644 --- a/api/routes.py +++ b/api/routes.py @@ -762,7 +762,7 @@ def handle_get(handler, parsed) -> bool: return _handle_sse_stream(handler, parsed) if parsed.path == '/api/sessions/gateway/stream': - return _handle_gateway_sse_stream(handler) + return _handle_gateway_sse_stream(handler, parsed) if parsed.path == "/api/media": return _handle_media(handler, parsed) @@ -1704,19 +1704,57 @@ def _handle_sse_stream(handler, parsed): return True -def _handle_gateway_sse_stream(handler): +def _gateway_sse_probe_payload(settings, watcher): + enabled = bool(settings.get('show_cli_sessions')) + # Use the public is_alive() accessor where available (current GatewayWatcher); + # fall back to the private _thread check for any older in-memory instance + # that might still be hanging around mid-upgrade, and for test doubles that + # don't implement the full public API. + if watcher is None: + watcher_alive = False + elif hasattr(watcher, 'is_alive') and callable(getattr(watcher, 'is_alive')): + watcher_alive = bool(watcher.is_alive()) + else: + _t = getattr(watcher, '_thread', None) + watcher_alive = _t is not None and _t.is_alive() + payload = { + 'enabled': enabled, + 'fallback_poll_ms': 30000, + 'ok': enabled and watcher_alive, + 'watcher_running': watcher_alive, + } + if not enabled: + payload['error'] = 'agent sessions not enabled' + return payload, 404 + if not watcher_alive: + payload['error'] = 'watcher not started' + return payload, 503 + return payload, 200 + + +def _handle_gateway_sse_stream(handler, parsed): """SSE endpoint for real-time gateway session updates. Streams change events from the gateway watcher background thread. Only active when show_cli_sessions (show_agent_sessions) setting is enabled. """ - # Check if the feature is enabled settings = load_settings() - if not settings.get('show_cli_sessions'): - return j(handler, {'error': 'agent sessions not enabled'}, status=404) from api.gateway_watcher import get_watcher watcher = get_watcher() - if watcher is None: + + probe = parse_qs(parsed.query).get('probe', [''])[0].lower() in {'1', 'true', 'yes'} + if probe: + payload, status = _gateway_sse_probe_payload(settings, watcher) + return j(handler, payload, status=status) + + # Check if the feature is enabled + if not settings.get('show_cli_sessions'): + return j(handler, {'error': 'agent sessions not enabled'}, status=404) + + # Same watcher_alive semantics as the probe path — centralised via + # the helper so both branches stay in sync. + _probe_body, _probe_status = _gateway_sse_probe_payload(settings, watcher) + if not _probe_body['watcher_running']: return j(handler, {'error': 'watcher not started'}, status=503) handler.send_response(200) diff --git a/static/sessions.js b/static/sessions.js index ef0dbdc..4d1cf10 100644 --- a/static/sessions.js +++ b/static/sessions.js @@ -361,6 +361,53 @@ async function renderSessionList(){ // ── Gateway session SSE (real-time sync for agent sessions) ── let _gatewaySSE = null; +let _gatewayPollTimer = null; +let _gatewayProbeInFlight = false; +let _gatewaySSEWarningShown = false; +const _gatewayFallbackPollMs = 30000; + +function startGatewayPollFallback(ms){ + const intervalMs = Math.max(5000, Number(ms) || _gatewayFallbackPollMs); + if(_gatewayPollTimer) clearInterval(_gatewayPollTimer); + _gatewayPollTimer = setInterval(() => { renderSessionList(); }, intervalMs); +} + +function stopGatewayPollFallback(){ + if(_gatewayPollTimer){ + clearInterval(_gatewayPollTimer); + _gatewayPollTimer = null; + } +} + +async function probeGatewaySSEStatus(){ + if(_gatewayProbeInFlight || !window._showCliSessions) return; + _gatewayProbeInFlight = true; + try{ + const resp = await fetch('/api/sessions/gateway/stream?probe=1', { credentials:'same-origin' }); + const data = await resp.json().catch(() => ({})); + if(resp.ok && data.watcher_running){ + stopGatewayPollFallback(); + _gatewaySSEWarningShown = false; + return; + } + if(resp.status === 503 || data.watcher_running === false){ + startGatewayPollFallback(data.fallback_poll_ms || _gatewayFallbackPollMs); + renderSessionList(); + if(!_gatewaySSEWarningShown && typeof showToast === 'function'){ + showToast('Gateway sync unavailable — falling back to periodic refresh.', 5000); + _gatewaySSEWarningShown = true; + } + } + }catch(e){ + // Network error during probe — server may be unreachable. + // Start fallback polling as a safe default; it will self-cancel + // when the SSE connection recovers and sessions_changed fires. + startGatewayPollFallback(_gatewayFallbackPollMs); + renderSessionList(); + }finally{ + _gatewayProbeInFlight = false; + } +} function startGatewaySSE(){ stopGatewaySSE(); @@ -371,6 +418,8 @@ function startGatewaySSE(){ try{ const data = JSON.parse(ev.data); if(data.sessions){ + stopGatewayPollFallback(); + _gatewaySSEWarningShown = false; renderSessionList(); // re-fetch and re-render // If the active session received new gateway messages, refresh the conversation view. // S.busy check prevents stomping on an in-progress WebUI response. @@ -400,9 +449,11 @@ function startGatewaySSE(){ }catch(e){ /* ignore parse errors */ } }); _gatewaySSE.onerror = () => { - // EventSource auto-reconnects; no action needed + void probeGatewaySSEStatus(); }; - }catch(e){ /* SSE not available */ } + }catch(e){ + void probeGatewaySSEStatus(); + } } function stopGatewaySSE(){ @@ -410,6 +461,9 @@ function stopGatewaySSE(){ _gatewaySSE.close(); _gatewaySSE = null; } + stopGatewayPollFallback(); + _gatewayProbeInFlight = false; + _gatewaySSEWarningShown = false; } let _searchDebounceTimer = null; diff --git a/tests/test_gateway_sync.py b/tests/test_gateway_sync.py index 39adc5c..01e6234 100644 --- a/tests/test_gateway_sync.py +++ b/tests/test_gateway_sync.py @@ -352,6 +352,23 @@ def test_gateway_sse_stream_endpoint_exists(): post('/api/settings', {'show_cli_sessions': False}) +def test_gateway_sse_stream_probe_reports_status(): + """Probe mode returns JSON watcher status instead of holding open an SSE stream.""" + post('/api/settings', {'show_cli_sessions': True}) + try: + req = urllib.request.Request(BASE + '/api/sessions/gateway/stream?probe=1') + with urllib.request.urlopen(req, timeout=5) as r: + assert r.status == 200, f"Expected 200, got {r.status}" + ctype = r.headers.get('Content-Type', '') + assert 'application/json' in ctype, f"Expected application/json, got {ctype}" + data = json.loads(r.read().decode('utf-8')) + assert data['enabled'] is True + assert 'watcher_running' in data + assert data['fallback_poll_ms'] == 30000 + finally: + post('/api/settings', {'show_cli_sessions': False}) + + def test_gateway_webui_sessions_not_duplicated(): """If a session_id exists both in WebUI store and state.db, it's not duplicated.""" # Create a WebUI session with a known ID @@ -418,3 +435,124 @@ def test_cli_sessions_still_work(): except Exception: pass post('/api/settings', {'show_cli_sessions': False}) + + +# ── Unit tests for _gateway_sse_probe_payload ──────────────────────────────── +# These replace the deleted repo-root test_gateway_sse_probe_unit.py and account +# for the watcher_alive check (thread existence + is_alive()). + +import sys +import threading +sys.path.insert(0, str(REPO_ROOT)) +from api.routes import _gateway_sse_probe_payload + + +def test_probe_payload_when_disabled(): + """Probe returns 404 when show_cli_sessions is False.""" + body, status = _gateway_sse_probe_payload({'show_cli_sessions': False}, watcher=None) + assert status == 404 + assert body['ok'] is False + assert body['enabled'] is False + assert body['watcher_running'] is False + assert body['error'] == 'agent sessions not enabled' + assert body['fallback_poll_ms'] == 30000 + + +def test_probe_payload_when_watcher_missing(): + """Probe returns 503 when enabled but no watcher instance.""" + body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=None) + assert status == 503 + assert body['ok'] is False + assert body['enabled'] is True + assert body['watcher_running'] is False + assert body['error'] == 'watcher not started' + assert body['fallback_poll_ms'] == 30000 + + +def test_probe_payload_when_watcher_instance_no_thread(): + """Probe returns 503 when watcher exists but _thread attribute is missing/None.""" + class _FakeWatcher: + _thread = None + body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=_FakeWatcher()) + assert status == 503 + assert body['watcher_running'] is False + + +def test_probe_payload_when_watcher_thread_alive(): + """Probe returns 200 when enabled and watcher thread is alive.""" + class _FakeWatcher: + pass + w = _FakeWatcher() + t = threading.Thread(target=lambda: None) + t.daemon = True + t.start() + w._thread = t + # Thread may finish fast — loop-start a live daemon thread for reliability + import time as _time + done = threading.Event() + live = threading.Thread(target=done.wait, daemon=True) + live.start() + w._thread = live + try: + body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w) + assert status == 200 + assert body['ok'] is True + assert body['watcher_running'] is True + assert body['fallback_poll_ms'] == 30000 + finally: + done.set() + live.join(timeout=1) + + +def test_probe_payload_when_watcher_thread_dead(): + """Probe returns 503 when watcher instance exists but thread has exited.""" + class _FakeWatcher: + pass + w = _FakeWatcher() + t = threading.Thread(target=lambda: None) + t.start() + t.join() # wait for it to finish + w._thread = t + body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w) + assert status == 503 + assert body['watcher_running'] is False + assert body['ok'] is False + + +def test_gateway_watcher_is_alive_public_method(): + """GatewayWatcher.is_alive() is the public API the probe uses. Cover all + three states: before start(), while running, after stop().""" + from api.gateway_watcher import GatewayWatcher + w = GatewayWatcher() + # Before start(): no thread + assert w.is_alive() is False, "is_alive() must be False before start()" + # After start(): thread running + w.start() + try: + assert w.is_alive() is True, "is_alive() must be True while running" + finally: + w.stop() + # After stop(): thread cleared + assert w.is_alive() is False, "is_alive() must be False after stop()" + + +def test_probe_payload_prefers_public_is_alive(): + """Regression guard: _gateway_sse_probe_payload must call watcher.is_alive() + rather than poking at _thread directly when the public method exists.""" + calls = [] + + class _WatcherWithPublicApi: + def is_alive(self): + calls.append('is_alive') + return True + # _thread is deliberately absent — must not be accessed. + + body, status = _gateway_sse_probe_payload( + {'show_cli_sessions': True}, + watcher=_WatcherWithPublicApi(), + ) + assert status == 200 + assert body['watcher_running'] is True + assert calls == ['is_alive'], ( + "probe must prefer the public is_alive() method over poking _thread" + )