fix(streaming): route mid-tool-call partial-stream-stub through length continuation (#31998) (#32012)

* fix(streaming): route mid-tool-call partial-stream-stub through length continuation (#31998)

When a stream stalls mid-tool-call (e.g. a large write_file), the
partial-stream-stub recovery used finish_reason='stop' which caused the
conversation loop to treat the turn as complete, returning only the
warning text. When users said 'continue', the model retried the same
large tool call, hit the same stale timeout, and looped indefinitely.

Changes:
- chat_completion_helpers.py: change _stub_finish_reason from 'stop' to
  'length' for mid-tool-call partials. The stub still has tool_calls=None
  so no tool auto-executes — the model gets a fresh API call through the
  existing length-continuation machinery (bounded to 3 retries).
  Also attach _dropped_tool_names to the stub for downstream use.
- conversation_loop.py: add a third continuation prompt branch for
  partial-stream-stubs with dropped tool calls. Instead of the generic
  'continue where you left off' (which would retry the same large call),
  tell the model to break the output into smaller tool calls (~8K
  tokens each) to avoid stream timeouts.
- test_partial_stream_finish_reason.py: update existing test from
  finish_reason='stop' to 'length', add _dropped_tool_names assertion,
  add new test_dropped_tool_call_uses_chunking_prompt for the 3-way
  prompt branching.

Safety: tool_calls=None is preserved on the stub, so the conversation
loop enters the text-continuation branch (line 1513), NOT the tool-call
execution branch (line 3246). No tool auto-executes. The model simply
gets another API call with targeted guidance.

* refactor: extract constants and continuation prompt helper

- Move magic strings to hermes_constants.py (PARTIAL_STREAM_STUB_ID,
  FINISH_REASON_LENGTH)
- Extract _get_continuation_prompt() in conversation_loop.py — DRYs the
  3-way prompt branching and lets tests import the real function
- Trim verbose inline comments in chat_completion_helpers.py
- Tests import constants + helper instead of duplicating logic

---------

Co-authored-by: alt-glitch <balyan.sid@gmail.com>
This commit is contained in:
daimon-nous[bot] 2026-05-25 17:43:10 +05:30 committed by GitHub
parent 46d8b5dadf
commit ac5359a3f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 88 deletions

View File

@ -34,6 +34,7 @@ from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse, parse_qs, urlunparse from urllib.parse import urlparse, parse_qs, urlunparse
from hermes_cli.timeouts import get_provider_request_timeout, get_provider_stale_timeout from hermes_cli.timeouts import get_provider_request_timeout, get_provider_stale_timeout
from hermes_constants import PARTIAL_STREAM_STUB_ID, FINISH_REASON_LENGTH
from agent.error_classifier import classify_api_error, FailoverReason from agent.error_classifier import classify_api_error, FailoverReason
from agent.model_metadata import is_local_endpoint from agent.model_metadata import is_local_endpoint
from agent.message_sanitization import ( from agent.message_sanitization import (
@ -2172,37 +2173,15 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
if deltas_were_sent["yes"]: if deltas_were_sent["yes"]:
# Streaming failed AFTER some tokens were already delivered to # Streaming failed AFTER some tokens were already delivered to
# the platform. Re-raising would let the outer retry loop make # the platform. Re-raising would let the outer retry loop make
# a new API call, creating a duplicate message. Return a # Return a partial response stub with finish_reason="length"
# partial response stub instead and let the outer loop decide: # so the conversation loop's continuation machinery fires.
# # tool_calls=None prevents auto-execution of incomplete calls.
# - text-only partials → finish_reason="length" so the
# conversation loop persists the partial assistant content
# and asks the model to continue from where the stream
# died (issue #30963: partial stop misclassified as a
# clean completion was exiting the loop with budget
# remaining and an unfinished goal).
#
# - partial mid-tool-call → finish_reason="stop" stays.
# The user-visible warning we append says "Ask me to
# retry if you want to continue", so the agent should
# hand control back rather than auto-retry a tool call
# that may have side-effects.
#
# Recover whatever content was already streamed to the user.
# _current_streamed_assistant_text accumulates text fired
# through _fire_stream_delta, so it has exactly what the
# user saw before the connection died.
_partial_text = ( _partial_text = (
getattr(agent, "_current_streamed_assistant_text", "") or "" getattr(agent, "_current_streamed_assistant_text", "") or ""
).strip() or None ).strip() or None
# If the stream died while the model was emitting a tool call, # Append a user-visible warning if tool calls were dropped so
# the stub below will silently set `tool_calls=None` and the # the user and model both know what was attempted.
# agent loop will treat the turn as complete — the attempted
# action is lost with no user-facing signal. Append a
# human-visible warning to the stub content so (a) the user
# knows something failed, and (b) the next turn's model sees
# in conversation history what was attempted and can retry.
_partial_names = list(result.get("partial_tool_names") or []) _partial_names = list(result.get("partial_tool_names") or [])
if _partial_names: if _partial_names:
_name_str = ", ".join(_partial_names[:3]) _name_str = ", ".join(_partial_names[:3])
@ -2214,8 +2193,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
f"Ask me to retry if you want to continue." f"Ask me to retry if you want to continue."
) )
_partial_text = (_partial_text or "") + _warn _partial_text = (_partial_text or "") + _warn
# Also fire as a streaming delta so the user sees it now # Fire as streaming delta so the user sees it immediately.
# instead of only in the persisted transcript.
try: try:
agent._fire_stream_delta(_warn) agent._fire_stream_delta(_warn)
except Exception: except Exception:
@ -2225,7 +2203,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
"of text; surfaced warning to user: %s", "of text; surfaced warning to user: %s",
_partial_names, len(_partial_text or ""), result["error"], _partial_names, len(_partial_text or ""), result["error"],
) )
_stub_finish_reason = "stop" _stub_finish_reason = FINISH_REASON_LENGTH
else: else:
logger.warning( logger.warning(
"Partial stream delivered before error; returning " "Partial stream delivered before error; returning "
@ -2235,18 +2213,19 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
len(_partial_text or ""), len(_partial_text or ""),
result["error"], result["error"],
) )
_stub_finish_reason = "length" _stub_finish_reason = FINISH_REASON_LENGTH
_stub_msg = SimpleNamespace( _stub_msg = SimpleNamespace(
role="assistant", content=_partial_text, tool_calls=None, role="assistant", content=_partial_text, tool_calls=None,
reasoning_content=None, reasoning_content=None,
) )
return SimpleNamespace( return SimpleNamespace(
id="partial-stream-stub", id=PARTIAL_STREAM_STUB_ID,
model=getattr(agent, "model", "unknown"), model=getattr(agent, "model", "unknown"),
choices=[SimpleNamespace( choices=[SimpleNamespace(
index=0, message=_stub_msg, finish_reason=_stub_finish_reason, index=0, message=_stub_msg, finish_reason=_stub_finish_reason,
)], )],
usage=None, usage=None,
_dropped_tool_names=_partial_names or None,
) )
raise result["error"] raise result["error"]
return result["response"] return result["response"]

View File

@ -65,7 +65,7 @@ from agent.prompt_caching import apply_anthropic_cache_control
from agent.retry_utils import jittered_backoff from agent.retry_utils import jittered_backoff
from agent.trajectory import has_incomplete_scratchpad from agent.trajectory import has_incomplete_scratchpad
from agent.usage_pricing import estimate_usage_cost, normalize_usage from agent.usage_pricing import estimate_usage_cost, normalize_usage
from hermes_constants import display_hermes_home as _dhh_fn from hermes_constants import display_hermes_home as _dhh_fn, PARTIAL_STREAM_STUB_ID
from hermes_logging import set_session_context from hermes_logging import set_session_context
from tools.schema_sanitizer import strip_pattern_and_format from tools.schema_sanitizer import strip_pattern_and_format
from tools.skill_provenance import set_current_write_origin from tools.skill_provenance import set_current_write_origin
@ -229,6 +229,37 @@ def _restore_or_build_system_prompt(agent, system_message, conversation_history)
) )
def _get_continuation_prompt(is_partial_stub: bool, dropped_tools: Optional[List[str]] = None) -> str:
if is_partial_stub and dropped_tools:
tool_list = ", ".join(dropped_tools[:3])
return (
"[System: Your previous tool call "
f"({tool_list}) was too large and "
"the stream timed out before it "
"could be delivered. Do NOT retry "
"the same tool call with the same "
"large content. Instead, break the "
"content into multiple smaller tool "
"calls (e.g. use multiple patch calls "
"or write smaller files). Each tool "
"call's arguments must be under ~8K "
"tokens to avoid stream timeouts.]"
)
elif is_partial_stub:
return (
"[System: The previous response was cut off by a "
"network error mid-stream. Continue exactly where "
"you left off. Do not restart or repeat prior text. "
"Finish the answer directly.]"
)
else:
return (
"[System: Your previous response was truncated by the output "
"length limit. Continue exactly where you left off. Do not "
"restart or repeat prior text. Finish the answer directly.]"
)
def run_conversation( def run_conversation(
agent, agent,
user_message: str, user_message: str,
@ -1414,7 +1445,7 @@ def run_conversation(
finish_reason = "length" finish_reason = "length"
if finish_reason == "length": if finish_reason == "length":
if getattr(response, "id", "") == "partial-stream-stub": if getattr(response, "id", "") == PARTIAL_STREAM_STUB_ID:
agent._vprint( agent._vprint(
f"{agent.log_prefix}⚠️ Stream interrupted by network error " f"{agent.log_prefix}⚠️ Stream interrupted by network error "
f"(finish_reason='length' on partial-stream-stub)", f"(finish_reason='length' on partial-stream-stub)",
@ -1518,37 +1549,36 @@ def run_conversation(
truncated_response_parts.append(assistant_message.content) truncated_response_parts.append(assistant_message.content)
if length_continue_retries < 3: if length_continue_retries < 3:
# Distinguish a real output-token truncation
# from a partial-stream-stub network error
# (#30963). Same continuation machinery,
# but the prompt has to tell the truth or
# the model goes off rails ("I wasn't
# truncated, I'm done").
_is_partial_stream_stub = ( _is_partial_stream_stub = (
getattr(response, "id", "") == "partial-stream-stub" getattr(response, "id", "") == PARTIAL_STREAM_STUB_ID
) )
if _is_partial_stream_stub: _dropped_tools = getattr(
response, "_dropped_tool_names", None
)
if _is_partial_stream_stub and _dropped_tools:
_tool_list = ", ".join(_dropped_tools[:3])
agent._vprint(
f"{agent.log_prefix}↻ Stream interrupted mid "
f"tool-call ({_tool_list}) — requesting "
f"chunked retry "
f"({length_continue_retries}/3)..."
)
elif _is_partial_stream_stub:
agent._vprint( agent._vprint(
f"{agent.log_prefix}↻ Stream interrupted — " f"{agent.log_prefix}↻ Stream interrupted — "
f"requesting continuation " f"requesting continuation "
f"({length_continue_retries}/3)..." f"({length_continue_retries}/3)..."
) )
_continue_content = (
"[System: The previous response was cut off by a "
"network error mid-stream. Continue exactly where "
"you left off. Do not restart or repeat prior text. "
"Finish the answer directly.]"
)
else: else:
agent._vprint( agent._vprint(
f"{agent.log_prefix}↻ Requesting continuation " f"{agent.log_prefix}↻ Requesting continuation "
f"({length_continue_retries}/3)..." f"({length_continue_retries}/3)..."
) )
_continue_content = (
"[System: Your previous response was truncated by the output " _continue_content = _get_continuation_prompt(
"length limit. Continue exactly where you left off. Do not " _is_partial_stream_stub, _dropped_tools
"restart or repeat prior text. Finish the answer directly.]" )
)
continue_msg = { continue_msg = {
"role": "user", "role": "user",
"content": _continue_content, "content": _continue_content,

View File

@ -432,6 +432,14 @@ def apply_ipv4_preference(force: bool = False) -> None:
socket.getaddrinfo = _ipv4_getaddrinfo # type: ignore[assignment] socket.getaddrinfo = _ipv4_getaddrinfo # type: ignore[assignment]
# ─── Streaming Response Constants ────────────────────────────────────────────
# Response ID for partial stream stubs used during error recovery
PARTIAL_STREAM_STUB_ID = "partial-stream-stub"
FINISH_REASON_LENGTH = "length"
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1" OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
OPENROUTER_MODELS_URL = f"{OPENROUTER_BASE_URL}/models" OPENROUTER_MODELS_URL = f"{OPENROUTER_BASE_URL}/models"

View File

@ -5,9 +5,9 @@ Pins the contract:
- text-only partial stream stub.finish_reason == "length" so the - text-only partial stream stub.finish_reason == "length" so the
conversation loop's existing length-continuation path can keep the conversation loop's existing length-continuation path can keep the
agent moving against an unfinished goal. agent moving against an unfinished goal.
- partial mid-tool-call stub.finish_reason == "stop" so the loop - partial mid-tool-call stub.finish_reason == "length" so the loop
hands control back to the user (matches the user-visible warning triggers continuation machinery with targeted chunking guidance
"Ask me to retry if you want to continue"). instead of ending the turn immediately.
- conversation_loop's length-continuation prompt distinguishes a real - conversation_loop's length-continuation prompt distinguishes a real
output-length truncation from a partial-stream-stub network error output-length truncation from a partial-stream-stub network error
via response.id. via response.id.
@ -20,6 +20,9 @@ from unittest.mock import MagicMock, patch
import pytest import pytest
from hermes_constants import PARTIAL_STREAM_STUB_ID, FINISH_REASON_LENGTH
from agent.conversation_loop import _get_continuation_prompt
# ── Helpers (mirrors test_streaming.py) ──────────────────────────────────── # ── Helpers (mirrors test_streaming.py) ────────────────────────────────────
@ -78,8 +81,8 @@ class TestPartialStreamStubFinishReason:
monkeypatch.setenv("HERMES_STREAM_RETRIES", "0") monkeypatch.setenv("HERMES_STREAM_RETRIES", "0")
response = agent._interruptible_streaming_api_call({}) response = agent._interruptible_streaming_api_call({})
assert response.id == "partial-stream-stub" assert response.id == PARTIAL_STREAM_STUB_ID
assert response.choices[0].finish_reason == "length", ( assert response.choices[0].finish_reason == FINISH_REASON_LENGTH, (
"Text-only partial streams must use finish_reason=length so the " "Text-only partial streams must use finish_reason=length so the "
"conversation loop continues from where the network died " "conversation loop continues from where the network died "
"(issue #30963)." "(issue #30963)."
@ -89,9 +92,11 @@ class TestPartialStreamStubFinishReason:
@patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client")
def test_partial_tool_call_keeps_stop(self, _mock_close, mock_create, monkeypatch): def test_partial_tool_call_uses_length(self, _mock_close, mock_create, monkeypatch):
"""Mid-tool-call partials keep finish_reason=stop on purpose — the """Mid-tool-call partials now use finish_reason=length so the
warning text asks the user to drive the retry, not the agent.""" conversation loop's continuation machinery fires — bounded 3-retry
with guidance to break output into smaller chunks (#31998).
tool_calls=None is preserved, so no tool auto-executes."""
def _stalling_stream(): def _stalling_stream():
yield _make_stream_chunk(content="Let me write the audit: ") yield _make_stream_chunk(content="Let me write the audit: ")
@ -114,12 +119,18 @@ class TestPartialStreamStubFinishReason:
monkeypatch.setenv("HERMES_STREAM_RETRIES", "0") monkeypatch.setenv("HERMES_STREAM_RETRIES", "0")
response = agent._interruptible_streaming_api_call({}) response = agent._interruptible_streaming_api_call({})
assert response.id == "partial-stream-stub" assert response.id == PARTIAL_STREAM_STUB_ID
assert response.choices[0].finish_reason == "stop", ( assert response.choices[0].finish_reason == FINISH_REASON_LENGTH, (
"Partial mid-tool-call must keep finish_reason=stop — the warning " "Partial mid-tool-call must use finish_reason=length so the "
"appended to content asks the user to retry, so the agent must " "continuation machinery fires instead of ending the turn "
"not auto-replay a tool call with possible side-effects." "immediately (#31998)."
) )
assert response.choices[0].message.tool_calls is None, (
"tool_calls must remain None (no auto-execution of side-effectful "
"tool calls)."
)
# The stub should carry dropped tool names for continuation prompt
assert getattr(response, "_dropped_tool_names", None) == ["write_file"]
content = response.choices[0].message.content or "" content = response.choices[0].message.content or ""
assert "Stream stalled mid tool-call" in content assert "Stream stalled mid tool-call" in content
assert "write_file" in content assert "write_file" in content
@ -129,30 +140,17 @@ class TestPartialStreamStubFinishReason:
class TestLengthContinuationPromptBranching: class TestLengthContinuationPromptBranching:
"""When finish_reason=length, the continuation prompt that reaches the """When finish_reason=length, the continuation prompt that reaches the
model has to tell the truth: real truncation vs. network interruption. model has to tell the truth: real truncation vs. network interruption
Lying ("you were truncated") on a partial-stream stub leads the model vs. dropped tool call (#31998). Three distinct prompts now exist."""
to no-op ("I wasn't truncated, I'm done"), defeating recovery."""
def _simulate_branch(self, response_id: str) -> str: def _simulate_branch(self, response_id: str, dropped_tools=None) -> str:
"""Return the continuation prompt text the loop would inject for """Return the continuation prompt text the loop would inject for
a `finish_reason=length` response with the given id. Mirrors the a `finish_reason=length` response with the given id."""
exact branch in agent/conversation_loop.py.""" is_partial = response_id == PARTIAL_STREAM_STUB_ID
response = SimpleNamespace(id=response_id) return _get_continuation_prompt(is_partial, dropped_tools)
if getattr(response, "id", "") == "partial-stream-stub":
return (
"[System: The previous response was cut off by a "
"network error mid-stream. Continue exactly where "
"you left off. Do not restart or repeat prior text. "
"Finish the answer directly.]"
)
return (
"[System: Your previous response was truncated by the output "
"length limit. Continue exactly where you left off. Do not "
"restart or repeat prior text. Finish the answer directly.]"
)
def test_partial_stream_stub_uses_network_prompt(self): def test_partial_stream_stub_uses_network_prompt(self):
prompt = self._simulate_branch("partial-stream-stub") prompt = self._simulate_branch(PARTIAL_STREAM_STUB_ID)
assert "network error mid-stream" in prompt assert "network error mid-stream" in prompt
assert "output length limit" not in prompt assert "output length limit" not in prompt
@ -165,6 +163,19 @@ class TestLengthContinuationPromptBranching:
prompt = self._simulate_branch("") prompt = self._simulate_branch("")
assert "output length limit" in prompt assert "output length limit" in prompt
def test_dropped_tool_call_uses_chunking_prompt(self):
"""When the stub dropped a tool call, the continuation prompt
must guide the model to break its output into smaller chunks
instead of retrying the same large tool call (#31998)."""
prompt = self._simulate_branch(
PARTIAL_STREAM_STUB_ID, dropped_tools=["write_file"],
)
assert "too large" in prompt
assert "break" in prompt.lower()
assert "write_file" in prompt
assert "network error" not in prompt
assert "output length limit" not in prompt
# ── Integration: live conversation loop ─────────────────────────────────── # ── Integration: live conversation loop ───────────────────────────────────
@ -208,12 +219,12 @@ class TestConversationLoopPartialStreamContinuation:
# First API call: the partial-stream stub (length on partial-stream-stub id). # First API call: the partial-stream stub (length on partial-stream-stub id).
partial_stub = SimpleNamespace( partial_stub = SimpleNamespace(
id="partial-stream-stub", id=PARTIAL_STREAM_STUB_ID,
model="test/model", model="test/model",
choices=[SimpleNamespace( choices=[SimpleNamespace(
index=0, index=0,
message=_mock_assistant_msg(content="The first half of "), message=_mock_assistant_msg(content="The first half of "),
finish_reason="length", finish_reason=FINISH_REASON_LENGTH,
)], )],
usage=None, usage=None,
) )