fix(cancel): preserve partial streamed response on Stop Generation (#893) (#902)

* fix(cancel): preserve partial streamed response on Stop Generation (#893)

* docs(cancel): fix misleading comment — partial message is NOT _error=True

The outer comment block claimed `_error=True so _sanitize_messages_for_api()
strips it from future conversation history`, but the actual append call
sets only `_partial=True` (correctly matching the inner comment six lines
below and the PR description). Updated the outer comment to match reality
so a future reader doesn't try to "fix" the code to match the wrong comment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
Co-authored-by: Nathan Esquenazi <nesquena@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
nesquena-hermes
2026-04-23 11:16:59 -07:00
committed by GitHub
parent 4089972b09
commit 9dd6e3f338
4 changed files with 349 additions and 9 deletions

View File

@@ -1662,6 +1662,7 @@ STREAMS: dict = {}
STREAMS_LOCK = threading.Lock()
CANCEL_FLAGS: dict = {}
AGENT_INSTANCES: dict = {} # stream_id -> AIAgent instance for interrupt propagation
STREAM_PARTIAL_TEXT: dict = {} # stream_id -> partial assistant text accumulated during streaming
SERVER_START_TIME = time.time()
# ── Thread-local env context ─────────────────────────────────────────────────

View File

@@ -17,7 +17,7 @@ from typing import Optional
logger = logging.getLogger(__name__)
from api.config import (
STREAMS, STREAMS_LOCK, CANCEL_FLAGS, AGENT_INSTANCES,
STREAMS, STREAMS_LOCK, CANCEL_FLAGS, AGENT_INSTANCES, STREAM_PARTIAL_TEXT,
LOCK, SESSIONS, SESSION_DIR,
_get_session_agent_lock, _set_thread_env, _clear_thread_env,
resolve_model_provider,
@@ -815,6 +815,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
cancel_event = threading.Event()
with STREAMS_LOCK:
CANCEL_FLAGS[stream_id] = cancel_event
STREAM_PARTIAL_TEXT[stream_id] = '' # start accumulating partial text (#893)
def put(event, data):
# If cancelled, drop all further events except the cancel event itself
@@ -961,6 +962,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
if text is None:
return # end-of-stream sentinel
_token_sent = True
# Accumulate partial text so cancel_stream() can persist it (#893)
if stream_id in STREAM_PARTIAL_TEXT:
STREAM_PARTIAL_TEXT[stream_id] += str(text)
put('token', {'text': text})
def on_reasoning(text):
@@ -1569,6 +1573,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
STREAMS.pop(stream_id, None)
CANCEL_FLAGS.pop(stream_id, None)
AGENT_INSTANCES.pop(stream_id, None) # Clean up agent instance reference
STREAM_PARTIAL_TEXT.pop(stream_id, None) # Clean up partial text buffer (#893)
# ============================================================
# SECTION: HTTP Request Handler
@@ -1643,13 +1648,18 @@ def cancel_stream(stream_id: str) -> bool:
STREAMS.pop(stream_id, None)
CANCEL_FLAGS.pop(stream_id, None)
AGENT_INSTANCES.pop(stream_id, None)
# STREAM_PARTIAL_TEXT is intentionally NOT popped here — the agent thread may
# still be appending tokens. We capture the snapshot two lines below; the
# streaming finally block handles the cleanup when the thread exits.
# Capture session_id while holding STREAMS_LOCK (avoids a race where
# the agent thread deallocates the agent object after we release).
# Capture partial text and session_id while holding STREAMS_LOCK (avoids a
# race where the agent thread deallocates the agent object or clears the
# partial text after we release).
# Session cleanup (get_session + save) must happen OUTSIDE the lock —
# get_session() acquires LOCK, and the streaming thread does LOCK first
# then STREAMS_LOCK, so inverting the order here would cause deadlock.
_cancel_session_id = getattr(agent, 'session_id', None) if agent else None
_cancel_partial_text = STREAM_PARTIAL_TEXT.get(stream_id, '')
# Session cleanup outside STREAMS_LOCK to preserve lock ordering.
if _cancel_session_id:
@@ -1659,12 +1669,40 @@ def cancel_stream(stream_id: str) -> bool:
_cs.pending_user_message = None
_cs.pending_attachments = []
_cs.pending_started_at = None
# Add cancel message to session messages so client sees consistent state.
# _error=True flags this as a synthetic UI marker so
# _sanitize_messages_for_api() (line 591-593) strips it from the
# conversation_history passed to the agent on the NEXT user message —
# otherwise the model would see "Task cancelled." in its history as a
# prior assistant turn and could respond accordingly.
# Persist any partial assistant text that was streamed before cancel (#893).
# Preserving partial content means the user sees what the agent had
# produced rather than losing it entirely. The marker is _partial=True
# (for session/UI identification) — NOT _error=True — so the partial
# content IS kept in the history sent to the agent on the next user
# message, letting the model continue from where it was cut off.
# See the inner comment on the append call below for the rationale.
partial_text = _cancel_partial_text.strip() if _cancel_partial_text else ''
if partial_text:
import re as _re
# Strip thinking/reasoning markup from partial content before saving.
# First pass: remove complete <think>...</think> and <thinking>...</thinking> blocks.
_stripped = _re.sub(r'<think(?:ing)?\b[^>]*>.*?</think(?:ing)?>',
'', partial_text,
flags=_re.DOTALL | _re.IGNORECASE).strip()
# Second pass: strip trailing UNCLOSED think/thinking block (the common
# cancel case — user stops mid-reasoning before the close tag appears).
_stripped = _re.sub(r'<think(?:ing)?\b[^>]*>.*',
'', _stripped,
flags=_re.DOTALL | _re.IGNORECASE).strip()
if _stripped:
# Mark _partial=True for session/UI identification only.
# Deliberately NOT _error=True — the partial content is real model
# output and should be visible in conversation history so the model
# can continue from it on the next turn (#893).
_cs.messages.append({
'role': 'assistant',
'content': _stripped,
'_partial': True,
'timestamp': int(time.time()),
})
# Cancel marker — flagged _error=True so it is stripped from conversation
# history on the next turn (prevents model from seeing "Task cancelled."
# as a prior assistant reply).
_cs.messages.append({
'role': 'assistant',
'content': '*Task cancelled.*',