fix(codex): add time-to-first-byte watchdog for stalled Codex streams
The chatgpt.com/backend-api/codex endpoint has an intermittent failure mode where it accepts the connection but never emits a single stream event — the socket just hangs. Direct sequential probing reproduces it (0 events, no HTTP status), and a fresh reconnect then succeeds in ~2s. Today the only guard is the wall-clock stale timeout in interruptible_api_call, so a dead-on-arrival connection is held for the full stale window (90-900s depending on context / config) before the retry loop can reconnect — minutes of wasted wall time per stall, at a rate of ~20% of calls during affected windows. Add a TTFB watchdog scoped to the codex_responses path: - codex_runtime.run_codex_stream stamps agent._codex_stream_last_event_ts on *every* stream event (not just output-text deltas), so reasoning-only and tool-call-only turns are not mistaken for a stall. - interruptible_api_call resets that marker before the worker starts and, while it is still None, kills the connection once elapsed exceeds the TTFB cutoff (default 45s, tunable via HERMES_CODEX_TTFB_TIMEOUT_SECONDS, 0 disables). The raised TimeoutError flows through the existing retry path unchanged. Once any event has arrived the stream is healthy and only the existing wall-clock stale timeout applies, so legitimate long generations are never interrupted. Gated to codex_responses; the chat_completions non-stream, anthropic and bedrock branches have no first-event signal and are untouched. Adds tests/agent/test_codex_ttfb_watchdog.py covering the stall kill, the events-flowing pass-through, and the env-disable path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
a989a79c0c
commit
8601c4d44c
@ -256,6 +256,33 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
|||||||
# apply richer recovery (credential rotation, provider fallback).
|
# apply richer recovery (credential rotation, provider fallback).
|
||||||
_stale_timeout = agent._compute_non_stream_stale_timeout(api_kwargs)
|
_stale_timeout = agent._compute_non_stream_stale_timeout(api_kwargs)
|
||||||
|
|
||||||
|
# ── Time-to-first-byte (TTFB) watchdog for the Codex Responses stream ──
|
||||||
|
# The chatgpt.com/backend-api/codex endpoint has an intermittent failure
|
||||||
|
# mode where it accepts the connection but never emits a single stream
|
||||||
|
# event (observed directly: 0 events, no HTTP status, the socket just
|
||||||
|
# hangs). A fresh reconnect succeeds in ~2s, but the wall-clock stale
|
||||||
|
# timeout (often 180–900s) makes us wait minutes before retrying. While no
|
||||||
|
# stream event has arrived yet we apply a much shorter TTFB cutoff so the
|
||||||
|
# main retry loop can reconnect promptly. Once the first event arrives the
|
||||||
|
# stream is healthy, so we fall back to the wall-clock stale timeout and
|
||||||
|
# never interrupt a legitimate long generation. Gated to codex_responses:
|
||||||
|
# only that path streams events incrementally (the chat_completions
|
||||||
|
# non-stream, anthropic and bedrock branches here have no first-event
|
||||||
|
# signal). The marker advances on *any* event (see codex_runtime), so
|
||||||
|
# reasoning-only / tool-call-only turns are not mistaken for a stall.
|
||||||
|
# Operators can tune via HERMES_CODEX_TTFB_TIMEOUT_SECONDS (0 disables).
|
||||||
|
_ttfb_enabled = agent.api_mode == "codex_responses"
|
||||||
|
try:
|
||||||
|
_ttfb_timeout = float(os.getenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "45"))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
_ttfb_timeout = 45.0
|
||||||
|
if _ttfb_timeout <= 0:
|
||||||
|
_ttfb_enabled = False
|
||||||
|
if _ttfb_enabled:
|
||||||
|
# Reset before the worker starts so a marker left over from a previous
|
||||||
|
# call on this agent can't be misread as first-byte for this one.
|
||||||
|
agent._codex_stream_last_event_ts = None
|
||||||
|
|
||||||
_call_start = time.time()
|
_call_start = time.time()
|
||||||
agent._touch_activity("waiting for non-streaming API response")
|
agent._touch_activity("waiting for non-streaming API response")
|
||||||
|
|
||||||
@ -274,9 +301,48 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
|||||||
f"waiting for non-streaming response ({int(_elapsed)}s elapsed)"
|
f"waiting for non-streaming response ({int(_elapsed)}s elapsed)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_elapsed = time.time() - _call_start
|
||||||
|
|
||||||
|
# TTFB detector: the Codex stream has produced no event at all and
|
||||||
|
# we're past the first-byte cutoff → the backend opened the
|
||||||
|
# connection but isn't responding. Kill it so the retry loop can
|
||||||
|
# reconnect (a fresh connection typically succeeds in seconds),
|
||||||
|
# instead of waiting out the much longer wall-clock stale timeout.
|
||||||
|
if (
|
||||||
|
_ttfb_enabled
|
||||||
|
and _elapsed > _ttfb_timeout
|
||||||
|
and getattr(agent, "_codex_stream_last_event_ts", None) is None
|
||||||
|
):
|
||||||
|
logger.warning(
|
||||||
|
"Codex stream produced no bytes within TTFB cutoff "
|
||||||
|
"(%.0fs > %.0fs, model=%s). Backend accepted the connection "
|
||||||
|
"but sent no stream events. Killing connection so the retry "
|
||||||
|
"loop can reconnect.",
|
||||||
|
_elapsed, _ttfb_timeout, api_kwargs.get("model", "unknown"),
|
||||||
|
)
|
||||||
|
agent._emit_status(
|
||||||
|
f"⚠️ No first byte from provider in {int(_elapsed)}s "
|
||||||
|
f"(codex stream, model: {api_kwargs.get('model', 'unknown')}). "
|
||||||
|
f"Reconnecting."
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
_close_request_client_once("codex_ttfb_kill")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
agent._touch_activity(
|
||||||
|
f"codex stream killed after {int(_elapsed)}s with no first byte"
|
||||||
|
)
|
||||||
|
# Wait briefly for the worker to notice the closed connection.
|
||||||
|
t.join(timeout=2.0)
|
||||||
|
if result["error"] is None and result["response"] is None:
|
||||||
|
result["error"] = TimeoutError(
|
||||||
|
f"Codex stream produced no bytes within {int(_elapsed)}s "
|
||||||
|
f"(TTFB threshold: {int(_ttfb_timeout)}s)"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
# Stale-call detector: kill the connection if no response
|
# Stale-call detector: kill the connection if no response
|
||||||
# arrives within the configured timeout.
|
# arrives within the configured timeout.
|
||||||
_elapsed = time.time() - _call_start
|
|
||||||
if _elapsed > _stale_timeout:
|
if _elapsed > _stale_timeout:
|
||||||
_est_ctx = estimate_request_context_tokens(api_kwargs)
|
_est_ctx = estimate_request_context_tokens(api_kwargs)
|
||||||
_silent_hint: Optional[str] = None
|
_silent_hint: Optional[str] = None
|
||||||
|
|||||||
@ -19,6 +19,7 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
from typing import Any, Dict, List
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
@ -194,6 +195,11 @@ def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta
|
|||||||
try:
|
try:
|
||||||
with active_client.responses.stream(**api_kwargs) as stream:
|
with active_client.responses.stream(**api_kwargs) as stream:
|
||||||
for event in stream:
|
for event in stream:
|
||||||
|
# Mark stream activity for the TTFB watchdog in
|
||||||
|
# interruptible_api_call. The Codex backend can accept the
|
||||||
|
# connection but never emit a single event; this timestamp
|
||||||
|
# staying None tells the watchdog no bytes are flowing.
|
||||||
|
agent._codex_stream_last_event_ts = time.time()
|
||||||
agent._touch_activity("receiving stream response")
|
agent._touch_activity("receiving stream response")
|
||||||
if agent._interrupt_requested:
|
if agent._interrupt_requested:
|
||||||
break
|
break
|
||||||
|
|||||||
175
tests/agent/test_codex_ttfb_watchdog.py
Normal file
175
tests/agent/test_codex_ttfb_watchdog.py
Normal file
@ -0,0 +1,175 @@
|
|||||||
|
"""Regression tests for the Codex time-to-first-byte (TTFB) watchdog.
|
||||||
|
|
||||||
|
The chatgpt.com/backend-api/codex endpoint has an intermittent failure mode
|
||||||
|
where it accepts the connection but never emits a single stream event. The
|
||||||
|
watchdog in ``interruptible_api_call`` kills such a connection at a short TTFB
|
||||||
|
cutoff (instead of waiting out the much longer wall-clock stale timeout) so the
|
||||||
|
retry loop can reconnect promptly. Once any stream event arrives, the stream is
|
||||||
|
considered healthy and only the wall-clock stale timeout applies — long
|
||||||
|
generations must never be interrupted by the TTFB cutoff.
|
||||||
|
|
||||||
|
The "bytes flowing" signal is ``agent._codex_stream_last_event_ts``, set on
|
||||||
|
*any* event by ``codex_runtime.run_codex_stream`` — so reasoning-only or
|
||||||
|
tool-call-only turns (which emit no output-text deltas) are not mistaken for a
|
||||||
|
stall.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Stub optional heavy imports so run_agent imports cleanly in isolation.
|
||||||
|
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
|
||||||
|
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
|
||||||
|
sys.modules.setdefault("fal_client", types.SimpleNamespace())
|
||||||
|
|
||||||
|
|
||||||
|
def _make_codex_agent(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||||
|
(tmp_path / ".env").write_text("", encoding="utf-8")
|
||||||
|
(tmp_path / "config.yaml").write_text("{}\n", encoding="utf-8")
|
||||||
|
from run_agent import AIAgent
|
||||||
|
|
||||||
|
agent = AIAgent(
|
||||||
|
model="gpt-5.5",
|
||||||
|
provider="openai-codex",
|
||||||
|
api_key="sk-dummy",
|
||||||
|
base_url="https://chatgpt.com/backend-api/codex",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
platform="cli",
|
||||||
|
)
|
||||||
|
# The watchdog is gated on the codex_responses api_mode; assert/force it so
|
||||||
|
# the test is robust to detection-logic changes elsewhere.
|
||||||
|
agent.api_mode = "codex_responses"
|
||||||
|
monkeypatch.setattr(agent, "_emit_status", lambda *a, **k: None)
|
||||||
|
# Keep the wall-clock stale timeout high so any early kill is unambiguously
|
||||||
|
# the TTFB path, not the stale-call path.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_compute_non_stream_stale_timeout", lambda *a, **k: 60.0
|
||||||
|
)
|
||||||
|
return agent
|
||||||
|
|
||||||
|
|
||||||
|
def test_ttfb_kills_when_no_stream_event(tmp_path, monkeypatch):
|
||||||
|
"""Backend accepts the connection but emits no event -> killed at the TTFB
|
||||||
|
cutoff, well before the 60s wall-clock stale timeout, with a retryable
|
||||||
|
TimeoutError and a ``codex_ttfb_kill`` close reason."""
|
||||||
|
from agent import chat_completion_helpers as h
|
||||||
|
|
||||||
|
agent = _make_codex_agent(tmp_path, monkeypatch)
|
||||||
|
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1")
|
||||||
|
|
||||||
|
closes: list = []
|
||||||
|
dummy_client = SimpleNamespace()
|
||||||
|
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_abort_request_openai_client",
|
||||||
|
lambda c, reason=None: closes.append(reason),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_close_request_openai_client",
|
||||||
|
lambda c, reason=None: closes.append(reason),
|
||||||
|
)
|
||||||
|
|
||||||
|
stop = {"flag": False}
|
||||||
|
|
||||||
|
def fake_hang(api_kwargs, client=None, on_first_delta=None):
|
||||||
|
# Never set _codex_stream_last_event_ts: simulate zero events arriving.
|
||||||
|
deadline = time.time() + 30
|
||||||
|
while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested:
|
||||||
|
time.sleep(0.02)
|
||||||
|
raise RuntimeError("connection closed")
|
||||||
|
|
||||||
|
monkeypatch.setattr(agent, "_run_codex_stream", fake_hang)
|
||||||
|
|
||||||
|
t0 = time.time()
|
||||||
|
try:
|
||||||
|
with pytest.raises(TimeoutError) as excinfo:
|
||||||
|
h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"})
|
||||||
|
elapsed = time.time() - t0
|
||||||
|
assert "TTFB" in str(excinfo.value)
|
||||||
|
assert "codex_ttfb_kill" in closes
|
||||||
|
# ~1s cutoff + 2s join grace; must be far under the 60s stale timeout.
|
||||||
|
assert elapsed < 15, f"TTFB watchdog took {elapsed:.1f}s"
|
||||||
|
finally:
|
||||||
|
stop["flag"] = True
|
||||||
|
|
||||||
|
|
||||||
|
def test_ttfb_does_not_kill_when_events_flow(tmp_path, monkeypatch):
|
||||||
|
"""Once a stream event has arrived, a generation that runs past the TTFB
|
||||||
|
cutoff is NOT killed by the watchdog — it completes normally."""
|
||||||
|
from agent import chat_completion_helpers as h
|
||||||
|
|
||||||
|
agent = _make_codex_agent(tmp_path, monkeypatch)
|
||||||
|
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1")
|
||||||
|
|
||||||
|
closes: list = []
|
||||||
|
dummy_client = SimpleNamespace()
|
||||||
|
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_abort_request_openai_client",
|
||||||
|
lambda c, reason=None: closes.append(reason),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_close_request_openai_client",
|
||||||
|
lambda c, reason=None: closes.append(reason),
|
||||||
|
)
|
||||||
|
|
||||||
|
sentinel = SimpleNamespace(ok=True)
|
||||||
|
|
||||||
|
def fake_stream(api_kwargs, client=None, on_first_delta=None):
|
||||||
|
# Bytes flowing: mark stream activity right away, then keep generating
|
||||||
|
# past the 1s TTFB cutoff before returning a real response.
|
||||||
|
agent._codex_stream_last_event_ts = time.time()
|
||||||
|
if on_first_delta:
|
||||||
|
on_first_delta()
|
||||||
|
time.sleep(2.0)
|
||||||
|
return sentinel
|
||||||
|
|
||||||
|
monkeypatch.setattr(agent, "_run_codex_stream", fake_stream)
|
||||||
|
|
||||||
|
resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"})
|
||||||
|
assert resp is sentinel
|
||||||
|
assert "codex_ttfb_kill" not in closes
|
||||||
|
|
||||||
|
|
||||||
|
def test_ttfb_disabled_via_env_zero(tmp_path, monkeypatch):
|
||||||
|
"""Setting HERMES_CODEX_TTFB_TIMEOUT_SECONDS=0 disables the TTFB watchdog;
|
||||||
|
a no-event stall then falls through to the (here, 60s) stale timeout, so a
|
||||||
|
short hang is NOT killed by TTFB."""
|
||||||
|
from agent import chat_completion_helpers as h
|
||||||
|
|
||||||
|
agent = _make_codex_agent(tmp_path, monkeypatch)
|
||||||
|
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "0")
|
||||||
|
|
||||||
|
closes: list = []
|
||||||
|
dummy_client = SimpleNamespace()
|
||||||
|
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_abort_request_openai_client",
|
||||||
|
lambda c, reason=None: closes.append(reason),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
agent, "_close_request_openai_client",
|
||||||
|
lambda c, reason=None: closes.append(reason),
|
||||||
|
)
|
||||||
|
|
||||||
|
sentinel = SimpleNamespace(ok=True)
|
||||||
|
|
||||||
|
def fake_stream(api_kwargs, client=None, on_first_delta=None):
|
||||||
|
# No event marker, but only briefly — well under the 60s stale timeout.
|
||||||
|
time.sleep(2.0)
|
||||||
|
return sentinel
|
||||||
|
|
||||||
|
monkeypatch.setattr(agent, "_run_codex_stream", fake_stream)
|
||||||
|
|
||||||
|
resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"})
|
||||||
|
assert resp is sentinel
|
||||||
|
assert "codex_ttfb_kill" not in closes
|
||||||
Loading…
Reference in New Issue
Block a user