Files
isparkclaw-webui/tests/test_gateway_sync.py
nesquena-hermes d4a3adb7b1 fix(sessions): surface gateway SSE failures and add polling fallback (#828)
* fix(sessions): surface gateway SSE failures and add polling fallback

- add a JSON probe mode for the gateway SSE endpoint
- detect watcher-unavailable 503s from the browser
- fall back to periodic session refresh with a toast
- add probe payload tests and endpoint coverage

Fixes #635

* fix(sessions): surface gateway SSE failures and add polling fallback (#826)

Absorbed from PR #826 by @cloudyun888 (fixes #635).

When the gateway watcher thread is not running, the browser now shows a
toast notification and falls back to 30-second periodic polling for session
sync. Previously the SSE failure was completely silent with no user feedback.

Changes from original PR:
- Deleted misplaced test_gateway_sse_probe_unit.py (was at repo root, not
  discovered by `pytest tests/`); unit tests moved into tests/test_gateway_sync.py
- _gateway_sse_probe_payload now checks watcher._thread.is_alive() rather
  than just watcher is not None — a watcher instance with a dead poll thread
  now correctly reports unavailable and activates the polling fallback
- probeGatewaySSEStatus catch(e) now starts the polling fallback on network
  error rather than silently swallowing the failure
- Added 5 unit tests covering all watcher-alive/dead/missing/disabled branches

Co-authored-by: cloudyun888 <269269188+86cloudyun-afk@users.noreply.github.com>

* cleanup(gateway): public is_alive() + dedup probe/live watcher-alive check + changelog

Three small cleanups on top of @cloudyun888's PR #826 absorption:

1. Add GatewayWatcher.is_alive() public accessor so routes.py doesn't
   reach into the private _thread attribute.  The existing private-
   attribute check stays as a defensive fallback for any older in-
   memory instance or test double that doesn't implement the full API.

2. Dedupe the watcher_alive computation in _handle_gateway_sse_stream:
   the live-SSE path now calls _gateway_sse_probe_payload(...) and reads
   its watcher_running field instead of re-deriving the same logic
   inline.  Keeps probe and SSE in sync automatically.

3. CHANGELOG trailer was (#826, fixes #635, @cloudyun888) — this PR is
   #828, so updated to (#828, absorbs PR #826 by @cloudyun888, fixes
   #635) matching the repo convention for absorbed PRs (see #805).

Added two regression tests:
- test_gateway_watcher_is_alive_public_method — covers the three
  lifecycle states (before start, while running, after stop).
- test_probe_payload_prefers_public_is_alive — asserts the probe
  uses watcher.is_alive() rather than poking _thread when the
  public method exists.

Full suite: 1735 passed, 0 new failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: cloudyun888 <269269188+86cloudyun-afk@users.noreply.github.com>
Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 21:18:55 -07:00

559 lines
21 KiB
Python

"""
Tests for Phase 1: Real-time Gateway Session Sync.
Tests are ordered TDD-style:
1. Gateway sessions appear in /api/sessions when setting enabled
2. Gateway sessions excluded when setting disabled
3. Gateway sessions have correct metadata (source_tag, is_cli_session)
4. SSE stream endpoint opens and receives events
5. Watcher detects new sessions inserted into state.db
6. Settings UI has renamed label
"""
import json
import os
import pathlib
import sqlite3
import time
import urllib.error
import urllib.request
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
from tests._pytest_port import BASE
def get(path):
with urllib.request.urlopen(BASE + path, timeout=10) as r:
return json.loads(r.read()), r.status
def post(path, body=None):
data = json.dumps(body or {}).encode()
req = urllib.request.Request(BASE + path, data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read()), r.status
except urllib.error.HTTPError as e:
try:
return json.loads(e.read()), e.code
except Exception:
return {}, e.code
def _get_test_state_dir():
"""Return the test state directory (matches conftest.py TEST_STATE_DIR).
conftest.py sets HERMES_WEBUI_TEST_STATE_DIR in the test-process environment
(via os.environ.setdefault) so that tests writing directly to state.db always
use the same path the test server was started with. If the env var is not
set (e.g. when running this file standalone), fall back to the conftest
formula: HERMES_HOME/webui-mvp-test.
"""
# Use _pytest_port which applies the same auto-derivation as conftest.py
from tests._pytest_port import TEST_STATE_DIR as _ptsd
return _ptsd
def _get_state_db_path():
"""Return path to the test state.db."""
return _get_test_state_dir() / 'state.db'
def _ensure_state_db():
"""Create state.db with sessions and messages tables if it doesn't exist.
Returns a connection. Does NOT delete existing data (safe for parallel tests).
"""
db_path = _get_state_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
user_id TEXT,
model TEXT,
started_at REAL NOT NULL,
message_count INTEGER DEFAULT 0,
title TEXT
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
timestamp REAL NOT NULL
);
""")
conn.commit()
return conn
def _insert_gateway_session(conn, session_id='20260401_120000_abcdefgh', source='telegram',
title='Telegram Chat', model='anthropic/claude-sonnet-4-5',
started_at=None, message_count=2):
"""Insert a gateway session into state.db."""
conn.execute(
"INSERT OR REPLACE INTO sessions (id, source, title, model, started_at, message_count) "
"VALUES (?, ?, ?, ?, ?, ?)",
(session_id, source, title, model, started_at or time.time(), message_count)
)
# Delete any existing messages for this session (idempotent re-insert)
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
# Insert some messages
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, 'user', ?, ?)",
(session_id, 'Hello from Telegram', started_at or time.time())
)
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, 'assistant', ?, ?)",
(session_id, 'Hi there!', (started_at or time.time()) + 1)
)
conn.commit()
def _remove_test_sessions(conn, *session_ids):
"""Remove specific test sessions from state.db (parallel-safe cleanup)."""
for sid in session_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
conn.commit()
def _cleanup_state_db():
"""Remove state.db if it exists (only used for tests that need a blank slate)."""
db_path = _get_state_db_path()
for p in [db_path, db_path.parent / 'state.db-wal', db_path.parent / 'state.db-shm']:
try:
p.unlink(missing_ok=True)
except Exception:
pass
# ── Tests ──────────────────────────────────────────────────────────────────
def test_gateway_sessions_appear_when_enabled():
"""Gateway sessions from state.db appear in /api/sessions when show_cli_sessions is on."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_test_tg_001', source='telegram', title='TG Test Chat')
# Enable the setting
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw_ids = [s['session_id'] for s in sessions if s.get('session_id') == 'gw_test_tg_001']
assert len(gw_ids) == 1, f"Expected gateway session gw_test_tg_001, got {[s['session_id'] for s in sessions]}"
finally:
try:
_remove_test_sessions(conn, 'gw_test_tg_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sessions_excluded_when_disabled():
"""Gateway sessions are NOT returned when show_cli_sessions is off."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_test_dc_001', source='discord', title='DC Test Chat')
# Ensure setting is off
post('/api/settings', {'show_cli_sessions': False})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw_ids = [s['session_id'] for s in sessions if s.get('session_id') == 'gw_test_dc_001']
assert len(gw_ids) == 0, "Gateway session should not appear when setting is off"
finally:
try:
_remove_test_sessions(conn, 'gw_test_dc_001')
conn.close()
except Exception:
pass
def test_gateway_session_has_correct_metadata():
"""Gateway sessions include source_tag and is_cli_session fields."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_meta_001', source='telegram', title='Meta Test')
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw = next((s for s in sessions if s['session_id'] == 'gw_meta_001'), None)
assert gw is not None, "Gateway session not found"
assert gw.get('source_tag') == 'telegram', f"Expected source_tag=telegram, got {gw.get('source_tag')}"
assert gw.get('is_cli_session') is True, "is_cli_session should be True for agent sessions"
assert gw.get('title') == 'Meta Test'
finally:
try:
_remove_test_sessions(conn, 'gw_meta_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_session_has_message_count():
"""Gateway sessions report correct message_count from state.db."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_msg_001', source='discord', title='Msg Count Test', message_count=5)
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw = next((s for s in sessions if s['session_id'] == 'gw_msg_001'), None)
assert gw is not None
assert gw.get('message_count') == 5, f"Expected message_count=5, got {gw.get('message_count')}"
finally:
try:
_remove_test_sessions(conn, 'gw_msg_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sessions_multiple_sources():
"""Sessions from multiple gateway sources (telegram, discord, slack) all appear."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_multi_tg', source='telegram', title='TG Chat')
_insert_gateway_session(conn, session_id='gw_multi_dc', source='discord', title='DC Chat')
_insert_gateway_session(conn, session_id='gw_multi_sl', source='slack', title='SL Chat')
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
gw_ids = {s['session_id'] for s in sessions if s.get('session_id') in ('gw_multi_tg', 'gw_multi_dc', 'gw_multi_sl')}
assert len(gw_ids) == 3, f"Expected 3 gateway sessions, got {len(gw_ids)}: {gw_ids}"
finally:
try:
_remove_test_sessions(conn, 'gw_multi_tg', 'gw_multi_dc', 'gw_multi_sl')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_session_messages_readable():
"""Gateway session messages can be loaded via /api/session."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='gw_read_001', source='telegram', title='Readable')
post('/api/settings', {'show_cli_sessions': True})
data, status = get(f'/api/session?session_id=gw_read_001')
assert status == 200
msgs = data.get('session', {}).get('messages', [])
assert len(msgs) >= 2, f"Expected at least 2 messages, got {len(msgs)}"
assert msgs[0].get('role') == 'user'
assert msgs[0].get('content') == 'Hello from Telegram'
finally:
try:
_remove_test_sessions(conn, 'gw_read_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_importing_older_gateway_session_preserves_original_timestamps_and_order():
"""Importing an older gateway session should not bump it above newer WebUI sessions."""
conn = _ensure_state_db()
older_started_at = time.time() - 1800
imported_sid = 'gw_import_old_001'
newer_webui_sid = None
try:
newer_webui, status = post('/api/session/new', {'model': 'openai/gpt-5'})
assert status == 200, newer_webui
newer_webui_sid = newer_webui['session']['session_id']
rename, rename_status = post(
'/api/session/rename',
{'session_id': newer_webui_sid, 'title': 'Newer WebUI Session'},
)
assert rename_status == 200, rename
_insert_gateway_session(
conn,
session_id=imported_sid,
source='discord',
title='Older imported gateway session',
started_at=older_started_at,
)
post('/api/settings', {'show_cli_sessions': True})
imported, imported_status = post('/api/session/import_cli', {'session_id': imported_sid})
assert imported_status == 200, imported
imported_session = imported['session']
assert abs(imported_session['created_at'] - older_started_at) < 2, imported_session
assert abs(imported_session['updated_at'] - older_started_at) < 5, imported_session
sessions_payload, sessions_status = get('/api/sessions')
assert sessions_status == 200, sessions_payload
ordered_ids = [item['session_id'] for item in sessions_payload.get('sessions', [])]
assert newer_webui_sid in ordered_ids, ordered_ids
assert imported_sid in ordered_ids, ordered_ids
assert ordered_ids.index(newer_webui_sid) < ordered_ids.index(imported_sid), ordered_ids
finally:
try:
_remove_test_sessions(conn, imported_sid)
conn.close()
except Exception:
pass
if imported_sid:
try:
post('/api/session/delete', {'session_id': imported_sid})
except Exception:
pass
if newer_webui_sid:
try:
post('/api/session/delete', {'session_id': newer_webui_sid})
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sse_stream_endpoint_exists():
"""GET /api/sessions/gateway/stream returns a response (200 or 200-range)."""
# The SSE endpoint requires show_cli_sessions to be enabled
post('/api/settings', {'show_cli_sessions': True})
try:
req = urllib.request.Request(BASE + '/api/sessions/gateway/stream')
with urllib.request.urlopen(req, timeout=5) as r:
assert r.status in (200, 204), f"Expected 200/204, got {r.status}"
# SSE should have content-type text/event-stream
ctype = r.headers.get('Content-Type', '')
assert 'text/event-stream' in ctype, f"Expected text/event-stream, got {ctype}"
except Exception as e:
# Timeout is acceptable — means the connection is held open (SSE behavior)
if 'timed out' in str(e).lower() or 'timeout' in str(e).lower():
pass # Good: SSE keeps the connection open
else:
raise
finally:
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sse_stream_probe_reports_status():
"""Probe mode returns JSON watcher status instead of holding open an SSE stream."""
post('/api/settings', {'show_cli_sessions': True})
try:
req = urllib.request.Request(BASE + '/api/sessions/gateway/stream?probe=1')
with urllib.request.urlopen(req, timeout=5) as r:
assert r.status == 200, f"Expected 200, got {r.status}"
ctype = r.headers.get('Content-Type', '')
assert 'application/json' in ctype, f"Expected application/json, got {ctype}"
data = json.loads(r.read().decode('utf-8'))
assert data['enabled'] is True
assert 'watcher_running' in data
assert data['fallback_poll_ms'] == 30000
finally:
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_webui_sessions_not_duplicated():
"""If a session_id exists both in WebUI store and state.db, it's not duplicated."""
# Create a WebUI session with a known ID
body = {}
d, _ = post('/api/session/new', body)
webui_sid = d['session']['session_id']
try:
# Insert the same session_id into state.db as a gateway session
conn = _ensure_state_db()
_insert_gateway_session(conn, session_id=webui_sid, source='telegram', title='Dup Test')
conn.close()
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
matching = [s for s in sessions if s['session_id'] == webui_sid]
assert len(matching) == 1, f"Expected 1 entry for {webui_sid}, got {len(matching)}"
finally:
try:
conn2 = sqlite3.connect(str(_get_state_db_path()))
_remove_test_sessions(conn2, webui_sid)
conn2.close()
except Exception:
pass
post('/api/session/delete', {'session_id': webui_sid})
post('/api/settings', {'show_cli_sessions': False})
def test_gateway_sessions_no_state_db():
"""When state.db doesn't exist, /api/sessions works fine (no gateway sessions)."""
_cleanup_state_db()
post('/api/settings', {'show_cli_sessions': True})
try:
data, status = get('/api/sessions')
assert status == 200
# Should succeed with just webui sessions (or empty)
assert 'sessions' in data
finally:
post('/api/settings', {'show_cli_sessions': False})
def test_cli_sessions_still_work():
"""CLI sessions (source='cli') still appear alongside gateway sessions."""
conn = _ensure_state_db()
try:
_insert_gateway_session(conn, session_id='cli_legacy_001', source='cli', title='CLI Legacy')
_insert_gateway_session(conn, session_id='gw_new_001', source='telegram', title='GW New')
post('/api/settings', {'show_cli_sessions': True})
data, status = get('/api/sessions')
assert status == 200
sessions = data.get('sessions', [])
agent_ids = {s['session_id'] for s in sessions if s.get('session_id') in ('cli_legacy_001', 'gw_new_001')}
assert len(agent_ids) == 2, f"Expected 2 agent sessions (cli + gateway), got {len(agent_ids)}"
finally:
try:
_remove_test_sessions(conn, 'cli_legacy_001', 'gw_new_001')
conn.close()
except Exception:
pass
post('/api/settings', {'show_cli_sessions': False})
# ── Unit tests for _gateway_sse_probe_payload ────────────────────────────────
# These replace the deleted repo-root test_gateway_sse_probe_unit.py and account
# for the watcher_alive check (thread existence + is_alive()).
import sys
import threading
sys.path.insert(0, str(REPO_ROOT))
from api.routes import _gateway_sse_probe_payload
def test_probe_payload_when_disabled():
"""Probe returns 404 when show_cli_sessions is False."""
body, status = _gateway_sse_probe_payload({'show_cli_sessions': False}, watcher=None)
assert status == 404
assert body['ok'] is False
assert body['enabled'] is False
assert body['watcher_running'] is False
assert body['error'] == 'agent sessions not enabled'
assert body['fallback_poll_ms'] == 30000
def test_probe_payload_when_watcher_missing():
"""Probe returns 503 when enabled but no watcher instance."""
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=None)
assert status == 503
assert body['ok'] is False
assert body['enabled'] is True
assert body['watcher_running'] is False
assert body['error'] == 'watcher not started'
assert body['fallback_poll_ms'] == 30000
def test_probe_payload_when_watcher_instance_no_thread():
"""Probe returns 503 when watcher exists but _thread attribute is missing/None."""
class _FakeWatcher:
_thread = None
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=_FakeWatcher())
assert status == 503
assert body['watcher_running'] is False
def test_probe_payload_when_watcher_thread_alive():
"""Probe returns 200 when enabled and watcher thread is alive."""
class _FakeWatcher:
pass
w = _FakeWatcher()
t = threading.Thread(target=lambda: None)
t.daemon = True
t.start()
w._thread = t
# Thread may finish fast — loop-start a live daemon thread for reliability
import time as _time
done = threading.Event()
live = threading.Thread(target=done.wait, daemon=True)
live.start()
w._thread = live
try:
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w)
assert status == 200
assert body['ok'] is True
assert body['watcher_running'] is True
assert body['fallback_poll_ms'] == 30000
finally:
done.set()
live.join(timeout=1)
def test_probe_payload_when_watcher_thread_dead():
"""Probe returns 503 when watcher instance exists but thread has exited."""
class _FakeWatcher:
pass
w = _FakeWatcher()
t = threading.Thread(target=lambda: None)
t.start()
t.join() # wait for it to finish
w._thread = t
body, status = _gateway_sse_probe_payload({'show_cli_sessions': True}, watcher=w)
assert status == 503
assert body['watcher_running'] is False
assert body['ok'] is False
def test_gateway_watcher_is_alive_public_method():
"""GatewayWatcher.is_alive() is the public API the probe uses. Cover all
three states: before start(), while running, after stop()."""
from api.gateway_watcher import GatewayWatcher
w = GatewayWatcher()
# Before start(): no thread
assert w.is_alive() is False, "is_alive() must be False before start()"
# After start(): thread running
w.start()
try:
assert w.is_alive() is True, "is_alive() must be True while running"
finally:
w.stop()
# After stop(): thread cleared
assert w.is_alive() is False, "is_alive() must be False after stop()"
def test_probe_payload_prefers_public_is_alive():
"""Regression guard: _gateway_sse_probe_payload must call watcher.is_alive()
rather than poking at _thread directly when the public method exists."""
calls = []
class _WatcherWithPublicApi:
def is_alive(self):
calls.append('is_alive')
return True
# _thread is deliberately absent — must not be accessed.
body, status = _gateway_sse_probe_payload(
{'show_cli_sessions': True},
watcher=_WatcherWithPublicApi(),
)
assert status == 200
assert body['watcher_running'] is True
assert calls == ['is_alive'], (
"probe must prefer the public is_alive() method over poking _thread"
)