diff --git a/CHANGELOG.md b/CHANGELOG.md index 13a760b..5063ca9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,11 @@ workspace subtree) and never enumerate blocked system roots. (`api/routes.py`, `api/workspace.py`, `static/panels.js`, `static/style.css`) (partial for #616) +## [v0.50.172] — 2026-04-23 + +### Fixed +- **Stop Generation preserves partial streamed content** — clicking Stop Generation previously discarded all text the agent had produced, showing only "*Task cancelled.*". The server now accumulates streamed tokens in a per-stream buffer and persists any partial assistant content to the session when a cancel fires. Thinking/reasoning blocks (`...`, including unclosed tags — the common cancel-mid-reasoning case) are stripped before saving. The partial content is flagged `_partial: true` and kept in conversation history so the model can continue from it on the next user message. (`api/config.py`, `api/streaming.py`) Closes #893. + ## [v0.50.171] — 2026-04-23 ### Fixed diff --git a/api/config.py b/api/config.py index 9eda921..aa2a486 100644 --- a/api/config.py +++ b/api/config.py @@ -1662,6 +1662,7 @@ STREAMS: dict = {} STREAMS_LOCK = threading.Lock() CANCEL_FLAGS: dict = {} AGENT_INSTANCES: dict = {} # stream_id -> AIAgent instance for interrupt propagation +STREAM_PARTIAL_TEXT: dict = {} # stream_id -> partial assistant text accumulated during streaming SERVER_START_TIME = time.time() # ── Thread-local env context ───────────────────────────────────────────────── diff --git a/api/streaming.py b/api/streaming.py index f079808..8ac8f32 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -17,7 +17,7 @@ from typing import Optional logger = logging.getLogger(__name__) from api.config import ( - STREAMS, STREAMS_LOCK, CANCEL_FLAGS, AGENT_INSTANCES, + STREAMS, STREAMS_LOCK, CANCEL_FLAGS, AGENT_INSTANCES, STREAM_PARTIAL_TEXT, LOCK, SESSIONS, SESSION_DIR, _get_session_agent_lock, _set_thread_env, _clear_thread_env, resolve_model_provider, @@ -815,6 +815,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta cancel_event = threading.Event() with STREAMS_LOCK: CANCEL_FLAGS[stream_id] = cancel_event + STREAM_PARTIAL_TEXT[stream_id] = '' # start accumulating partial text (#893) def put(event, data): # If cancelled, drop all further events except the cancel event itself @@ -961,6 +962,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta if text is None: return # end-of-stream sentinel _token_sent = True + # Accumulate partial text so cancel_stream() can persist it (#893) + if stream_id in STREAM_PARTIAL_TEXT: + STREAM_PARTIAL_TEXT[stream_id] += str(text) put('token', {'text': text}) def on_reasoning(text): @@ -1569,6 +1573,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta STREAMS.pop(stream_id, None) CANCEL_FLAGS.pop(stream_id, None) AGENT_INSTANCES.pop(stream_id, None) # Clean up agent instance reference + STREAM_PARTIAL_TEXT.pop(stream_id, None) # Clean up partial text buffer (#893) # ============================================================ # SECTION: HTTP Request Handler @@ -1643,13 +1648,18 @@ def cancel_stream(stream_id: str) -> bool: STREAMS.pop(stream_id, None) CANCEL_FLAGS.pop(stream_id, None) AGENT_INSTANCES.pop(stream_id, None) + # STREAM_PARTIAL_TEXT is intentionally NOT popped here — the agent thread may + # still be appending tokens. We capture the snapshot two lines below; the + # streaming finally block handles the cleanup when the thread exits. - # Capture session_id while holding STREAMS_LOCK (avoids a race where - # the agent thread deallocates the agent object after we release). + # Capture partial text and session_id while holding STREAMS_LOCK (avoids a + # race where the agent thread deallocates the agent object or clears the + # partial text after we release). # Session cleanup (get_session + save) must happen OUTSIDE the lock — # get_session() acquires LOCK, and the streaming thread does LOCK first # then STREAMS_LOCK, so inverting the order here would cause deadlock. _cancel_session_id = getattr(agent, 'session_id', None) if agent else None + _cancel_partial_text = STREAM_PARTIAL_TEXT.get(stream_id, '') # Session cleanup outside STREAMS_LOCK to preserve lock ordering. if _cancel_session_id: @@ -1659,12 +1669,40 @@ def cancel_stream(stream_id: str) -> bool: _cs.pending_user_message = None _cs.pending_attachments = [] _cs.pending_started_at = None - # Add cancel message to session messages so client sees consistent state. - # _error=True flags this as a synthetic UI marker so - # _sanitize_messages_for_api() (line 591-593) strips it from the - # conversation_history passed to the agent on the NEXT user message — - # otherwise the model would see "Task cancelled." in its history as a - # prior assistant turn and could respond accordingly. + # Persist any partial assistant text that was streamed before cancel (#893). + # Preserving partial content means the user sees what the agent had + # produced rather than losing it entirely. The marker is _partial=True + # (for session/UI identification) — NOT _error=True — so the partial + # content IS kept in the history sent to the agent on the next user + # message, letting the model continue from where it was cut off. + # See the inner comment on the append call below for the rationale. + partial_text = _cancel_partial_text.strip() if _cancel_partial_text else '' + if partial_text: + import re as _re + # Strip thinking/reasoning markup from partial content before saving. + # First pass: remove complete ... and ... blocks. + _stripped = _re.sub(r']*>.*?', + '', partial_text, + flags=_re.DOTALL | _re.IGNORECASE).strip() + # Second pass: strip trailing UNCLOSED think/thinking block (the common + # cancel case — user stops mid-reasoning before the close tag appears). + _stripped = _re.sub(r']*>.*', + '', _stripped, + flags=_re.DOTALL | _re.IGNORECASE).strip() + if _stripped: + # Mark _partial=True for session/UI identification only. + # Deliberately NOT _error=True — the partial content is real model + # output and should be visible in conversation history so the model + # can continue from it on the next turn (#893). + _cs.messages.append({ + 'role': 'assistant', + 'content': _stripped, + '_partial': True, + 'timestamp': int(time.time()), + }) + # Cancel marker — flagged _error=True so it is stripped from conversation + # history on the next turn (prevents model from seeing "Task cancelled." + # as a prior assistant reply). _cs.messages.append({ 'role': 'assistant', 'content': '*Task cancelled.*', diff --git a/tests/test_issue893_cancel_preserves_partial.py b/tests/test_issue893_cancel_preserves_partial.py new file mode 100644 index 0000000..df36f6a --- /dev/null +++ b/tests/test_issue893_cancel_preserves_partial.py @@ -0,0 +1,296 @@ +""" +Regression tests for #893 — cancel_stream() now preserves partial streamed +assistant content rather than discarding it. + +Before this fix, clicking Stop Generation threw away all streamed text. The +session was saved with only '*Task cancelled.*' appended, so the user lost +whatever the agent had produced up to that point. + +After this fix: +- Partial text is accumulated in STREAM_PARTIAL_TEXT[stream_id] via on_token() +- cancel_stream() reads that buffer, strips thinking markup, and persists it + as a '_partial: True' assistant message before the cancel marker +- _sanitize_messages_for_api() does NOT strip _partial messages, so the model + sees the partial content as prior context on the next turn +- The cancel marker itself keeps _error=True so the model does not see it +""" +import threading +import time + +import pytest + +import api.config as config +import api.streaming as streaming +from api.config import STREAM_PARTIAL_TEXT, STREAMS_LOCK + + +@pytest.fixture(autouse=True) +def _isolate_stream_state(): + """Isolate shared stream state between tests.""" + STREAM_PARTIAL_TEXT.clear() + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + yield + STREAM_PARTIAL_TEXT.clear() + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + + +class TestStreamPartialTextAccumulation: + + def test_stream_partial_text_initialized_on_stream_creation(self, tmp_path, monkeypatch): + """STREAM_PARTIAL_TEXT[stream_id] starts empty when a stream is registered.""" + import queue + sid = 'test_init_stream' + q = queue.Queue() + cancel_event = threading.Event() + with STREAMS_LOCK: + config.STREAMS[sid] = q + config.CANCEL_FLAGS[sid] = cancel_event + STREAM_PARTIAL_TEXT[sid] = '' + assert STREAM_PARTIAL_TEXT.get(sid) == '' + + def test_stream_partial_text_cleaned_up_on_stream_end(self): + """STREAM_PARTIAL_TEXT[stream_id] is removed when the stream dict is cleaned up.""" + import queue + sid = 'test_cleanup_stream' + q = queue.Queue() + with STREAMS_LOCK: + config.STREAMS[sid] = q + STREAM_PARTIAL_TEXT[sid] = 'some partial text' + with STREAMS_LOCK: + config.STREAMS.pop(sid, None) + STREAM_PARTIAL_TEXT.pop(sid, None) + assert sid not in STREAM_PARTIAL_TEXT + + +class TestCancelStreamPreservesPartial: + + def test_cancel_stream_saves_partial_text_to_session(self, tmp_path, monkeypatch): + """cancel_stream() persists accumulated partial text as an assistant message.""" + import queue + from api.models import Session + from api.streaming import cancel_stream + + session_dir = tmp_path / 'sessions' + session_dir.mkdir() + import api.models as _models + monkeypatch.setattr(config, 'SESSION_DIR', session_dir) + monkeypatch.setattr(config, 'SESSION_INDEX_FILE', session_dir / '_index.json') + monkeypatch.setattr(_models, 'SESSION_DIR', session_dir) + monkeypatch.setattr(_models, 'SESSION_INDEX_FILE', session_dir / '_index.json') + config.SESSIONS.clear() + _models.SESSIONS.clear() + + # Create a session and a fake running stream + s = Session(session_id='sess_partial', title='Test') + s.messages.append({'role': 'user', 'content': 'Tell me about Python'}) + s.active_stream_id = 'stream_partial' + s.save() + config.SESSIONS['sess_partial'] = s + + q = queue.Queue() + cancel_event = threading.Event() + with STREAMS_LOCK: + config.STREAMS['stream_partial'] = q + config.CANCEL_FLAGS['stream_partial'] = cancel_event + STREAM_PARTIAL_TEXT['stream_partial'] = 'Python is a high-level programming language' + + # Fake agent with session_id attribute + class FakeAgent: + session_id = 'sess_partial' + def interrupt(self, _): pass + config.AGENT_INSTANCES['stream_partial'] = FakeAgent() + + result = cancel_stream('stream_partial') + + assert result is True + + # Reload the session and check messages + from api.models import Session + saved = Session.load('sess_partial') + assert saved is not None + + msg_contents = [m.get('content', '') for m in saved.messages] + # Should have: user message, partial assistant content, cancel marker + assert any('Python is a high-level programming language' in c for c in msg_contents), ( + f"Partial text not found in session messages: {msg_contents}" + ) + assert any('*Task cancelled.*' in c for c in msg_contents), ( + "Cancel marker missing from session messages" + ) + # Partial message should NOT have _error=True (it's real content) + partial_msg = next(m for m in saved.messages + if 'Python is a high-level' in m.get('content', '')) + assert partial_msg.get('_partial') is True + assert not partial_msg.get('_error') + # Cancel marker should have _error=True + cancel_msg = next(m for m in saved.messages if '*Task cancelled.*' in m.get('content', '')) + assert cancel_msg.get('_error') is True + + def test_cancel_stream_with_no_partial_text_still_saves_cancel_marker(self, tmp_path, monkeypatch): + """If no tokens were streamed before cancel, only the cancel marker is saved.""" + import queue + from api.models import Session + from api.streaming import cancel_stream + + session_dir = tmp_path / 'sessions' + session_dir.mkdir() + import api.models as _models + monkeypatch.setattr(config, 'SESSION_DIR', session_dir) + monkeypatch.setattr(config, 'SESSION_INDEX_FILE', session_dir / '_index.json') + monkeypatch.setattr(_models, 'SESSION_DIR', session_dir) + monkeypatch.setattr(_models, 'SESSION_INDEX_FILE', session_dir / '_index.json') + config.SESSIONS.clear() + _models.SESSIONS.clear() + + s = Session(session_id='sess_nopartial', title='Test') + s.messages.append({'role': 'user', 'content': 'Hello'}) + s.active_stream_id = 'stream_nopartial' + s.save() + config.SESSIONS['sess_nopartial'] = s + + q = queue.Queue() + cancel_event = threading.Event() + with STREAMS_LOCK: + config.STREAMS['stream_nopartial'] = q + config.CANCEL_FLAGS['stream_nopartial'] = cancel_event + STREAM_PARTIAL_TEXT['stream_nopartial'] = '' # empty — cancel before any tokens + + class FakeAgent: + session_id = 'sess_nopartial' + def interrupt(self, _): pass + config.AGENT_INSTANCES['stream_nopartial'] = FakeAgent() + + cancel_stream('stream_nopartial') + + saved = Session.load('sess_nopartial') + msg_contents = [m.get('content', '') for m in saved.messages] + assert any('*Task cancelled.*' in c for c in msg_contents) + # No extra partial message when there was nothing streamed + assert not any(m.get('_partial') for m in saved.messages), ( + "Should not add partial message when no tokens were streamed" + ) + + def test_cancel_stream_strips_thinking_markup_from_partial(self, tmp_path, monkeypatch): + """Thinking blocks in partial text are stripped before saving.""" + import queue + from api.models import Session + from api.streaming import cancel_stream + + session_dir = tmp_path / 'sessions' + session_dir.mkdir() + import api.models as _models + monkeypatch.setattr(config, 'SESSION_DIR', session_dir) + monkeypatch.setattr(config, 'SESSION_INDEX_FILE', session_dir / '_index.json') + monkeypatch.setattr(_models, 'SESSION_DIR', session_dir) + monkeypatch.setattr(_models, 'SESSION_INDEX_FILE', session_dir / '_index.json') + config.SESSIONS.clear() + _models.SESSIONS.clear() + + s = Session(session_id='sess_thinking', title='Test') + s.messages.append({'role': 'user', 'content': 'Think about this'}) + s.active_stream_id = 'stream_thinking' + s.save() + config.SESSIONS['sess_thinking'] = s + + q = queue.Queue() + cancel_event = threading.Event() + with STREAMS_LOCK: + config.STREAMS['stream_thinking'] = q + config.CANCEL_FLAGS['stream_thinking'] = cancel_event + STREAM_PARTIAL_TEXT['stream_thinking'] = ( + 'internal reasoning here\nThe answer is 42' + ) + + class FakeAgent: + session_id = 'sess_thinking' + def interrupt(self, _): pass + config.AGENT_INSTANCES['stream_thinking'] = FakeAgent() + + cancel_stream('stream_thinking') + + saved = Session.load('sess_thinking') + partial_msg = next( + (m for m in saved.messages if m.get('_partial')), None + ) + assert partial_msg is not None, "Partial message should be saved when content remains after stripping" + assert '' not in partial_msg['content'], "Closed thinking block should be stripped" + assert 'The answer is 42' in partial_msg['content'], "Visible content should be preserved" + + def test_cancel_stream_strips_unclosed_think_tag(self, tmp_path, monkeypatch): + """The common cancel-mid-reasoning case: block without a closing tag.""" + import queue + from api.models import Session + from api.streaming import cancel_stream + + session_dir = tmp_path / 'sessions' + session_dir.mkdir() + import api.models as _models + monkeypatch.setattr(config, 'SESSION_DIR', session_dir) + monkeypatch.setattr(config, 'SESSION_INDEX_FILE', session_dir / '_index.json') + monkeypatch.setattr(_models, 'SESSION_DIR', session_dir) + monkeypatch.setattr(_models, 'SESSION_INDEX_FILE', session_dir / '_index.json') + config.SESSIONS.clear() + _models.SESSIONS.clear() + + s = Session(session_id='sess_unclosed', title='Test') + s.messages.append({'role': 'user', 'content': 'Please reason step by step'}) + s.active_stream_id = 'stream_unclosed' + s.save() + config.SESSIONS['sess_unclosed'] = s + + q = queue.Queue() + cancel_event = threading.Event() + with STREAMS_LOCK: + config.STREAMS['stream_unclosed'] = q + config.CANCEL_FLAGS['stream_unclosed'] = cancel_event + # Simulates user hitting Stop mid-reasoning — never closed + STREAM_PARTIAL_TEXT['stream_unclosed'] = ( + '\nStep 1: consider the problem...\nStep 2: the user wants' + ) + + class FakeAgent: + session_id = 'sess_unclosed' + def interrupt(self, _): pass + config.AGENT_INSTANCES['stream_unclosed'] = FakeAgent() + + cancel_stream('stream_unclosed') + + saved = Session.load('sess_unclosed') + # The entire content was inside an unclosed block — nothing visible + # remains after stripping, so no _partial message should be saved + partial_msg = next((m for m in saved.messages if m.get('_partial')), None) + assert partial_msg is None, ( + "Unclosed think block with no visible content should not produce a partial message" + ) + # Cancel marker should still be present + assert any('Task cancelled' in m.get('content', '') for m in saved.messages) + + +class TestPartialMessageInContext: + + def test_partial_message_included_in_api_sanitization(self): + """Partial messages (_partial=True) are included in API history (model should see them).""" + from api.streaming import _sanitize_messages_for_api + + messages = [ + {'role': 'user', 'content': 'Tell me about Python'}, + {'role': 'assistant', 'content': 'Python is a high-level', '_partial': True}, + {'role': 'assistant', 'content': '*Task cancelled.*', '_error': True}, + ] + clean = _sanitize_messages_for_api(messages) + roles = [m['role'] for m in clean] + contents = [m.get('content', '') for m in clean] + + # User message and partial assistant message should be included + assert 'user' in roles + assert any('Python is a high-level' in c for c in contents), ( + "Partial assistant content should be in API context so model can continue from it" + ) + # Cancel marker (_error=True) should be excluded + assert not any('Task cancelled' in c for c in contents), ( + "Cancel marker with _error=True must be stripped from API context" + )