fix(sessions): surface gateway SSE failures and add polling fallback (#828)
* fix(sessions): surface gateway SSE failures and add polling fallback - add a JSON probe mode for the gateway SSE endpoint - detect watcher-unavailable 503s from the browser - fall back to periodic session refresh with a toast - add probe payload tests and endpoint coverage Fixes #635 * fix(sessions): surface gateway SSE failures and add polling fallback (#826) Absorbed from PR #826 by @cloudyun888 (fixes #635). When the gateway watcher thread is not running, the browser now shows a toast notification and falls back to 30-second periodic polling for session sync. Previously the SSE failure was completely silent with no user feedback. Changes from original PR: - Deleted misplaced test_gateway_sse_probe_unit.py (was at repo root, not discovered by `pytest tests/`); unit tests moved into tests/test_gateway_sync.py - _gateway_sse_probe_payload now checks watcher._thread.is_alive() rather than just watcher is not None — a watcher instance with a dead poll thread now correctly reports unavailable and activates the polling fallback - probeGatewaySSEStatus catch(e) now starts the polling fallback on network error rather than silently swallowing the failure - Added 5 unit tests covering all watcher-alive/dead/missing/disabled branches Co-authored-by: cloudyun888 <269269188+86cloudyun-afk@users.noreply.github.com> * cleanup(gateway): public is_alive() + dedup probe/live watcher-alive check + changelog Three small cleanups on top of @cloudyun888's PR #826 absorption: 1. Add GatewayWatcher.is_alive() public accessor so routes.py doesn't reach into the private _thread attribute. The existing private- attribute check stays as a defensive fallback for any older in- memory instance or test double that doesn't implement the full API. 2. Dedupe the watcher_alive computation in _handle_gateway_sse_stream: the live-SSE path now calls _gateway_sse_probe_payload(...) and reads its watcher_running field instead of re-deriving the same logic inline. Keeps probe and SSE in sync automatically. 3. CHANGELOG trailer was (#826, fixes #635, @cloudyun888) — this PR is #828, so updated to (#828, absorbs PR #826 by @cloudyun888, fixes #635) matching the repo convention for absorbed PRs (see #805). Added two regression tests: - test_gateway_watcher_is_alive_public_method — covers the three lifecycle states (before start, while running, after stop). - test_probe_payload_prefers_public_is_alive — asserts the probe uses watcher.is_alive() rather than poking _thread when the public method exists. Full suite: 1735 passed, 0 new failures. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: cloudyun888 <269269188+86cloudyun-afk@users.noreply.github.com> Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com> Co-authored-by: Nathan Esquenazi <nesquena@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
13
CHANGELOG.md
13
CHANGELOG.md
@@ -1,5 +1,18 @@
|
||||
# Hermes Web UI -- Changelog
|
||||
|
||||
## [v0.50.140] — 2026-04-22
|
||||
|
||||
### Fixed
|
||||
- **Gateway SSE sync failures now surface to the user** — when the gateway watcher
|
||||
thread is not running, the browser now shows a toast notification and automatically
|
||||
falls back to 30-second polling for session sync. Previously this failed silently
|
||||
with no feedback. (#828, absorbs PR #826 by @cloudyun888, fixes #635)
|
||||
- `_gateway_sse_probe_payload` now checks `watcher._thread.is_alive()` rather than
|
||||
just `watcher is not None`, so a watcher instance with a dead poll thread correctly
|
||||
reports unavailable and triggers the polling fallback.
|
||||
- Probe fetch network errors now also activate the polling fallback as a safe default
|
||||
rather than silently swallowing the failure.
|
||||
|
||||
## [v0.50.139] — 2026-04-22
|
||||
|
||||
### Fixed
|
||||
|
||||
@@ -119,6 +119,19 @@ class GatewayWatcher:
|
||||
self._thread = threading.Thread(target=self._poll_loop, daemon=True, name='gateway-watcher')
|
||||
self._thread.start()
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
"""Return True when the poll thread is running.
|
||||
|
||||
Public accessor used by ``/api/sessions/gateway/stream`` probe mode and
|
||||
the live SSE handler to detect a watcher instance whose poll thread
|
||||
died silently (e.g. uncaught exception in ``_poll_loop``). Callers
|
||||
use this to decide whether to return 503 and trigger the client-side
|
||||
polling fallback, instead of handing out an SSE connection that would
|
||||
never emit events.
|
||||
"""
|
||||
t = self._thread
|
||||
return t is not None and t.is_alive()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the watcher thread."""
|
||||
self._stop_event.set()
|
||||
|
||||
@@ -762,7 +762,7 @@ def handle_get(handler, parsed) -> bool:
|
||||
return _handle_sse_stream(handler, parsed)
|
||||
|
||||
if parsed.path == '/api/sessions/gateway/stream':
|
||||
return _handle_gateway_sse_stream(handler)
|
||||
return _handle_gateway_sse_stream(handler, parsed)
|
||||
|
||||
if parsed.path == "/api/media":
|
||||
return _handle_media(handler, parsed)
|
||||
@@ -1704,19 +1704,57 @@ def _handle_sse_stream(handler, parsed):
|
||||
return True
|
||||
|
||||
|
||||
def _handle_gateway_sse_stream(handler):
|
||||
def _gateway_sse_probe_payload(settings, watcher):
|
||||
enabled = bool(settings.get('show_cli_sessions'))
|
||||
# Use the public is_alive() accessor where available (current GatewayWatcher);
|
||||
# fall back to the private _thread check for any older in-memory instance
|
||||
# that might still be hanging around mid-upgrade, and for test doubles that
|
||||
# don't implement the full public API.
|
||||
if watcher is None:
|
||||
watcher_alive = False
|
||||
elif hasattr(watcher, 'is_alive') and callable(getattr(watcher, 'is_alive')):
|
||||
watcher_alive = bool(watcher.is_alive())
|
||||
else:
|
||||
_t = getattr(watcher, '_thread', None)
|
||||
watcher_alive = _t is not None and _t.is_alive()
|
||||
payload = {
|
||||
'enabled': enabled,
|
||||
'fallback_poll_ms': 30000,
|
||||
'ok': enabled and watcher_alive,
|
||||
'watcher_running': watcher_alive,
|
||||
}
|
||||
if not enabled:
|
||||
payload['error'] = 'agent sessions not enabled'
|
||||
return payload, 404
|
||||
if not watcher_alive:
|
||||
payload['error'] = 'watcher not started'
|
||||
return payload, 503
|
||||
return payload, 200
|
||||
|
||||
|
||||
def _handle_gateway_sse_stream(handler, parsed):
|
||||
"""SSE endpoint for real-time gateway session updates.
|
||||
Streams change events from the gateway watcher background thread.
|
||||
Only active when show_cli_sessions (show_agent_sessions) setting is enabled.
|
||||
"""
|
||||
# Check if the feature is enabled
|
||||
settings = load_settings()
|
||||
if not settings.get('show_cli_sessions'):
|
||||
return j(handler, {'error': 'agent sessions not enabled'}, status=404)
|
||||
|
||||
from api.gateway_watcher import get_watcher
|
||||
watcher = get_watcher()
|
||||
if watcher is None:
|
||||
|
||||
probe = parse_qs(parsed.query).get('probe', [''])[0].lower() in {'1', 'true', 'yes'}
|
||||
if probe:
|
||||
payload, status = _gateway_sse_probe_payload(settings, watcher)
|
||||
return j(handler, payload, status=status)
|
||||
|
||||
# Check if the feature is enabled
|
||||
if not settings.get('show_cli_sessions'):
|
||||
return j(handler, {'error': 'agent sessions not enabled'}, status=404)
|
||||
|
||||
# Same watcher_alive semantics as the probe path — centralised via
|
||||
# the helper so both branches stay in sync.
|
||||
_probe_body, _probe_status = _gateway_sse_probe_payload(settings, watcher)
|
||||
if not _probe_body['watcher_running']:
|
||||
return j(handler, {'error': 'watcher not started'}, status=503)
|
||||
|
||||
handler.send_response(200)
|
||||
|
||||
@@ -361,6 +361,53 @@ async function renderSessionList(){
|
||||
|
||||
// ── Gateway session SSE (real-time sync for agent sessions) ──
|
||||
let _gatewaySSE = null;
|
||||
let _gatewayPollTimer = null;
|
||||
let _gatewayProbeInFlight = false;
|
||||
let _gatewaySSEWarningShown = false;
|
||||
const _gatewayFallbackPollMs = 30000;
|
||||
|
||||
function startGatewayPollFallback(ms){
|
||||
const intervalMs = Math.max(5000, Number(ms) || _gatewayFallbackPollMs);
|
||||
if(_gatewayPollTimer) clearInterval(_gatewayPollTimer);
|
||||
_gatewayPollTimer = setInterval(() => { renderSessionList(); }, intervalMs);
|
||||
}
|
||||
|
||||
function stopGatewayPollFallback(){
|
||||
if(_gatewayPollTimer){
|
||||
clearInterval(_gatewayPollTimer);
|
||||
_gatewayPollTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function probeGatewaySSEStatus(){
|
||||
if(_gatewayProbeInFlight || !window._showCliSessions) return;
|
||||
_gatewayProbeInFlight = true;
|
||||
try{
|
||||
const resp = await fetch('/api/sessions/gateway/stream?probe=1', { credentials:'same-origin' });
|
||||
const data = await resp.json().catch(() => ({}));
|
||||
if(resp.ok && data.watcher_running){
|
||||
stopGatewayPollFallback();
|
||||
_gatewaySSEWarningShown = false;
|
||||
return;
|
||||
}
|
||||
if(resp.status === 503 || data.watcher_running === false){
|
||||
startGatewayPollFallback(data.fallback_poll_ms || _gatewayFallbackPollMs);
|
||||
renderSessionList();
|
||||
if(!_gatewaySSEWarningShown && typeof showToast === 'function'){
|
||||
showToast('Gateway sync unavailable — falling back to periodic refresh.', 5000);
|
||||
_gatewaySSEWarningShown = true;
|
||||
}
|
||||
}
|
||||
}catch(e){
|
||||
// Network error during probe — server may be unreachable.
|
||||
// Start fallback polling as a safe default; it will self-cancel
|
||||
// when the SSE connection recovers and sessions_changed fires.
|
||||
startGatewayPollFallback(_gatewayFallbackPollMs);
|
||||
renderSessionList();
|
||||
}finally{
|
||||
_gatewayProbeInFlight = false;
|
||||
}
|
||||
}
|
||||
|
||||
function startGatewaySSE(){
|
||||
stopGatewaySSE();
|
||||
@@ -371,6 +418,8 @@ function startGatewaySSE(){
|
||||
try{
|
||||
const data = JSON.parse(ev.data);
|
||||
if(data.sessions){
|
||||
stopGatewayPollFallback();
|
||||
_gatewaySSEWarningShown = false;
|
||||
renderSessionList(); // re-fetch and re-render
|
||||
// If the active session received new gateway messages, refresh the conversation view.
|
||||
// S.busy check prevents stomping on an in-progress WebUI response.
|
||||
@@ -400,9 +449,11 @@ function startGatewaySSE(){
|
||||
}catch(e){ /* ignore parse errors */ }
|
||||
});
|
||||
_gatewaySSE.onerror = () => {
|
||||
// EventSource auto-reconnects; no action needed
|
||||
void probeGatewaySSEStatus();
|
||||
};
|
||||
}catch(e){ /* SSE not available */ }
|
||||
}catch(e){
|
||||
void probeGatewaySSEStatus();
|
||||
}
|
||||
}
|
||||
|
||||
function stopGatewaySSE(){
|
||||
@@ -410,6 +461,9 @@ function stopGatewaySSE(){
|
||||
_gatewaySSE.close();
|
||||
_gatewaySSE = null;
|
||||
}
|
||||
stopGatewayPollFallback();
|
||||
_gatewayProbeInFlight = false;
|
||||
_gatewaySSEWarningShown = false;
|
||||
}
|
||||
|
||||
let _searchDebounceTimer = null;
|
||||
|
||||
@@ -352,6 +352,23 @@ def test_gateway_sse_stream_endpoint_exists():
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_sse_stream_probe_reports_status():
|
||||
"""Probe mode returns JSON watcher status instead of holding open an SSE stream."""
|
||||
post('/api/settings', {'show_cli_sessions': True})
|
||||
try:
|
||||
req = urllib.request.Request(BASE + '/api/sessions/gateway/stream?probe=1')
|
||||
with urllib.request.urlopen(req, timeout=5) as r:
|
||||
assert r.status == 200, f"Expected 200, got {r.status}"
|
||||
ctype = r.headers.get('Content-Type', '')
|
||||
assert 'application/json' in ctype, f"Expected application/json, got {ctype}"
|
||||
data = json.loads(r.read().decode('utf-8'))
|
||||
assert data['enabled'] is True
|
||||
assert 'watcher_running' in data
|
||||
assert data['fallback_poll_ms'] == 30000
|
||||
finally:
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
def test_gateway_webui_sessions_not_duplicated():
|
||||
"""If a session_id exists both in WebUI store and state.db, it's not duplicated."""
|
||||
# Create a WebUI session with a known ID
|
||||
@@ -418,3 +435,124 @@ def test_cli_sessions_still_work():
|
||||
except Exception:
|
||||
pass
|
||||
post('/api/settings', {'show_cli_sessions': False})
|
||||
|
||||
|
||||
# ── Unit tests for _gateway_sse_probe_payload ────────────────────────────────
|
||||
# These replace the deleted repo-root test_gateway_sse_probe_unit.py and account
|
||||
# for the watcher_alive check (thread existence + is_alive()).
|
||||
|
||||
import sys
|
||||
import threading
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
from api.routes import _gateway_sse_probe_payload
|
||||
|
||||
|
||||
def test_probe_payload_when_disabled():
|
||||
"""Probe returns 404 when show_cli_sessions is False."""
|
||||
body, status = _gateway_sse_probe_payload({'show_cli_sessions': False}, watcher=None)
|
||||
assert status == 404
|
||||
assert body['ok'] is False
|
||||
assert body['enabled'] is False
|
||||
assert body['watcher_running'] is False
|
||||
assert body['error'] == 'agent sessions not enabled'
|
||||
assert body['fallback_poll_ms'] == 30000
|
||||
|
||||
|
||||
def test_probe_payload_when_watcher_missing():
|
||||
"""Probe returns 503 when enabled but no watcher instance."""
|
||||
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=None)
|
||||
assert status == 503
|
||||
assert body['ok'] is False
|
||||
assert body['enabled'] is True
|
||||
assert body['watcher_running'] is False
|
||||
assert body['error'] == 'watcher not started'
|
||||
assert body['fallback_poll_ms'] == 30000
|
||||
|
||||
|
||||
def test_probe_payload_when_watcher_instance_no_thread():
|
||||
"""Probe returns 503 when watcher exists but _thread attribute is missing/None."""
|
||||
class _FakeWatcher:
|
||||
_thread = None
|
||||
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=_FakeWatcher())
|
||||
assert status == 503
|
||||
assert body['watcher_running'] is False
|
||||
|
||||
|
||||
def test_probe_payload_when_watcher_thread_alive():
|
||||
"""Probe returns 200 when enabled and watcher thread is alive."""
|
||||
class _FakeWatcher:
|
||||
pass
|
||||
w = _FakeWatcher()
|
||||
t = threading.Thread(target=lambda: None)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
w._thread = t
|
||||
# Thread may finish fast — loop-start a live daemon thread for reliability
|
||||
import time as _time
|
||||
done = threading.Event()
|
||||
live = threading.Thread(target=done.wait, daemon=True)
|
||||
live.start()
|
||||
w._thread = live
|
||||
try:
|
||||
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w)
|
||||
assert status == 200
|
||||
assert body['ok'] is True
|
||||
assert body['watcher_running'] is True
|
||||
assert body['fallback_poll_ms'] == 30000
|
||||
finally:
|
||||
done.set()
|
||||
live.join(timeout=1)
|
||||
|
||||
|
||||
def test_probe_payload_when_watcher_thread_dead():
|
||||
"""Probe returns 503 when watcher instance exists but thread has exited."""
|
||||
class _FakeWatcher:
|
||||
pass
|
||||
w = _FakeWatcher()
|
||||
t = threading.Thread(target=lambda: None)
|
||||
t.start()
|
||||
t.join() # wait for it to finish
|
||||
w._thread = t
|
||||
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w)
|
||||
assert status == 503
|
||||
assert body['watcher_running'] is False
|
||||
assert body['ok'] is False
|
||||
|
||||
|
||||
def test_gateway_watcher_is_alive_public_method():
|
||||
"""GatewayWatcher.is_alive() is the public API the probe uses. Cover all
|
||||
three states: before start(), while running, after stop()."""
|
||||
from api.gateway_watcher import GatewayWatcher
|
||||
w = GatewayWatcher()
|
||||
# Before start(): no thread
|
||||
assert w.is_alive() is False, "is_alive() must be False before start()"
|
||||
# After start(): thread running
|
||||
w.start()
|
||||
try:
|
||||
assert w.is_alive() is True, "is_alive() must be True while running"
|
||||
finally:
|
||||
w.stop()
|
||||
# After stop(): thread cleared
|
||||
assert w.is_alive() is False, "is_alive() must be False after stop()"
|
||||
|
||||
|
||||
def test_probe_payload_prefers_public_is_alive():
|
||||
"""Regression guard: _gateway_sse_probe_payload must call watcher.is_alive()
|
||||
rather than poking at _thread directly when the public method exists."""
|
||||
calls = []
|
||||
|
||||
class _WatcherWithPublicApi:
|
||||
def is_alive(self):
|
||||
calls.append('is_alive')
|
||||
return True
|
||||
# _thread is deliberately absent — must not be accessed.
|
||||
|
||||
body, status = _gateway_sse_probe_payload(
|
||||
{'show_cli_sessions': True},
|
||||
watcher=_WatcherWithPublicApi(),
|
||||
)
|
||||
assert status == 200
|
||||
assert body['watcher_running'] is True
|
||||
assert calls == ['is_alive'], (
|
||||
"probe must prefer the public is_alive() method over poking _thread"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user