fix(gateway): protect in-flight subagents from busy-mode interrupts (#30170)
When a user sends a conversational follow-up while delegate_task is
running, gateway/run.py calls running_agent.interrupt(event.text) on
the PARENT agent. AIAgent.interrupt() then cascades synchronously
through self._active_children and calls interrupt() on every child
subagent, aborting in-flight delegate_task work. The user sees the
fallback cascade with no root-cause in the gateway log, and minutes of
subagent progress are destroyed — the exact failure mode reported in
Add GatewayRunner._agent_has_active_subagents(running_agent) — a
static helper that returns True iff the parent is currently driving
subagents via delegate_task. The helper is type-defensive: it ignores
truthy MagicMock auto-attributes (so this doesn't accidentally fire
in every test mock that hits the busy path), the _AGENT_PENDING_SENTINEL
placeholder, and missing locks.
Wire the helper into both interrupt branches:
1. _handle_active_session_busy_message — the adapter-level busy
handler. When busy_input_mode == 'interrupt' AND the parent has
active subagents, demote to 'queue' semantics: skip the
parent.interrupt() call, merge the message into the pending
queue, and surface a dedicated ack ("⏳ Subagent working — your
message is queued for when it finishes (use /stop to cancel
everything).") so the operator knows the message wasn't lost and
discovers the explicit escape hatch.
2. The PRIORITY interrupt branch inside _handle_message — the
non-command fast path. Same rationale, same demotion. Routes
through _queue_or_replace_pending_event so the next-turn pickup
stays unchanged.
Explicit /stop and /new commands take a completely different path
(_interrupt_and_clear_session in the slash-command dispatch at line
~6771) and are NOT affected by this guard — the operator still has a
way to force-cancel everything when they actually mean it. Configured
'queue' and 'steer' modes are also untouched: 'queue' already does the
right thing, and 'steer' goes through running_agent.steer() which does
NOT cascade to children (so subagents survive a steer too).
This is Phase 1 of the fix outlined in #30170 — the minimum viable
change that stops subagent loss. Phase 2 (delegation-aware steer
forwarding to active children) and Phase 3 (async delegation, #11508)
are intentionally out of scope.
Refs #30170.
This commit is contained in:
parent
50aaf0c4ad
commit
99d62f6ba1
@ -3034,6 +3034,44 @@ class GatewayRunner:
|
|||||||
if agent is not _AGENT_PENDING_SENTINEL
|
if agent is not _AGENT_PENDING_SENTINEL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _agent_has_active_subagents(running_agent: Any) -> bool:
|
||||||
|
"""Return True when *running_agent* is currently driving subagents
|
||||||
|
via the ``delegate_task`` tool.
|
||||||
|
|
||||||
|
Background (#30170): ``AIAgent.interrupt()`` cascades through the
|
||||||
|
parent's ``_active_children`` list and calls ``interrupt()`` on
|
||||||
|
every child synchronously, which aborts in-flight subagent work
|
||||||
|
and produces a fallback cascade with no actionable signal.
|
||||||
|
Demoting ``busy_input_mode='interrupt'`` to ``queue`` semantics
|
||||||
|
whenever this helper returns True protects subagent work from
|
||||||
|
conversational follow-ups while leaving the explicit ``/stop``
|
||||||
|
path (which goes through ``_interrupt_and_clear_session``)
|
||||||
|
untouched. Safe-by-default: returns False on any attribute or
|
||||||
|
lock error so a missing/broken parent never blocks the existing
|
||||||
|
interrupt path.
|
||||||
|
"""
|
||||||
|
if running_agent is None or running_agent is _AGENT_PENDING_SENTINEL:
|
||||||
|
return False
|
||||||
|
children = getattr(running_agent, "_active_children", None)
|
||||||
|
# AIAgent always initialises this as a concrete list (see
|
||||||
|
# agent/agent_init.py). Reject anything that isn't a real
|
||||||
|
# collection — this guards against ``MagicMock()._active_children``
|
||||||
|
# auto-creating a truthy stub in tests and triggering the demotion
|
||||||
|
# against an agent that doesn't actually have subagents.
|
||||||
|
if not isinstance(children, (list, tuple, set)):
|
||||||
|
return False
|
||||||
|
if not children:
|
||||||
|
return False
|
||||||
|
lock = getattr(running_agent, "_active_children_lock", None)
|
||||||
|
try:
|
||||||
|
if lock is not None:
|
||||||
|
with lock:
|
||||||
|
return bool(children)
|
||||||
|
return bool(children)
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None:
|
def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None:
|
||||||
adapter = self.adapters.get(event.source.platform)
|
adapter = self.adapters.get(event.source.platform)
|
||||||
if not adapter:
|
if not adapter:
|
||||||
@ -3105,6 +3143,25 @@ class GatewayRunner:
|
|||||||
# queueing + interrupting. If the agent isn't running yet
|
# queueing + interrupting. If the agent isn't running yet
|
||||||
# (sentinel) or lacks steer(), or the payload is empty, fall back
|
# (sentinel) or lacks steer(), or the payload is empty, fall back
|
||||||
# to queue semantics so nothing is lost.
|
# to queue semantics so nothing is lost.
|
||||||
|
# #30170 — Subagent protection. ``AIAgent.interrupt()`` cascades
|
||||||
|
# to every entry in the parent's ``_active_children`` list and
|
||||||
|
# aborts in-flight ``delegate_task`` work. Demote ``interrupt``
|
||||||
|
# to ``queue`` when the parent is currently driving subagents so
|
||||||
|
# a conversational follow-up doesn't destroy minutes of subagent
|
||||||
|
# work. Explicit ``/stop`` and ``/new`` slash commands go through
|
||||||
|
# ``_interrupt_and_clear_session`` and are unaffected — the
|
||||||
|
# operator still has a way to force-cancel everything.
|
||||||
|
demoted_for_subagents = (
|
||||||
|
effective_mode == "interrupt"
|
||||||
|
and self._agent_has_active_subagents(running_agent)
|
||||||
|
)
|
||||||
|
if demoted_for_subagents:
|
||||||
|
logger.info(
|
||||||
|
"Demoting busy_input_mode 'interrupt' to 'queue' for session %s "
|
||||||
|
"because the running agent has active subagents (#30170)",
|
||||||
|
session_key,
|
||||||
|
)
|
||||||
|
effective_mode = "queue"
|
||||||
steered = False
|
steered = False
|
||||||
if effective_mode == "steer":
|
if effective_mode == "steer":
|
||||||
steer_text = (event.text or "").strip()
|
steer_text = (event.text or "").strip()
|
||||||
@ -3192,6 +3249,14 @@ class GatewayRunner:
|
|||||||
f"⏩ Steered into current run{status_detail}. "
|
f"⏩ Steered into current run{status_detail}. "
|
||||||
f"Your message arrives after the next tool call."
|
f"Your message arrives after the next tool call."
|
||||||
)
|
)
|
||||||
|
elif is_queue_mode and demoted_for_subagents:
|
||||||
|
# #30170 — explain the demotion so the user knows their
|
||||||
|
# follow-up didn't accidentally kill the subagent and
|
||||||
|
# discovers `/stop` as the explicit escape hatch.
|
||||||
|
message = (
|
||||||
|
f"⏳ Subagent working{status_detail} — your message is queued for "
|
||||||
|
f"when it finishes (use /stop to cancel everything)."
|
||||||
|
)
|
||||||
elif is_queue_mode:
|
elif is_queue_mode:
|
||||||
message = (
|
message = (
|
||||||
f"⏳ Queued for the next turn{status_detail}. "
|
f"⏳ Queued for the next turn{status_detail}. "
|
||||||
@ -7246,6 +7311,22 @@ class GatewayRunner:
|
|||||||
logger.debug("PRIORITY steer-fallback-to-queue for session %s", _quick_key)
|
logger.debug("PRIORITY steer-fallback-to-queue for session %s", _quick_key)
|
||||||
self._queue_or_replace_pending_event(_quick_key, event)
|
self._queue_or_replace_pending_event(_quick_key, event)
|
||||||
return None
|
return None
|
||||||
|
# #30170 — Subagent protection (PRIORITY path). Same rationale
|
||||||
|
# as ``_handle_active_session_busy_message``: an interrupt
|
||||||
|
# cascades through ``_active_children`` and aborts in-flight
|
||||||
|
# delegate_task work. Demote to queue semantics when the
|
||||||
|
# parent is currently driving subagents so a conversational
|
||||||
|
# follow-up doesn't destroy minutes of subagent progress.
|
||||||
|
# /stop reaches its dedicated handler above (line ~6771), so
|
||||||
|
# the operator still has a clean escape hatch.
|
||||||
|
if self._agent_has_active_subagents(running_agent):
|
||||||
|
logger.info(
|
||||||
|
"PRIORITY interrupt demoted to queue for session %s "
|
||||||
|
"because the running agent has active subagents (#30170)",
|
||||||
|
_quick_key,
|
||||||
|
)
|
||||||
|
self._queue_or_replace_pending_event(_quick_key, event)
|
||||||
|
return None
|
||||||
logger.debug("PRIORITY interrupt for session %s", _quick_key)
|
logger.debug("PRIORITY interrupt for session %s", _quick_key)
|
||||||
running_agent.interrupt(event.text)
|
running_agent.interrupt(event.text)
|
||||||
# NOTE: self._pending_messages was write-only (never consumed).
|
# NOTE: self._pending_messages was write-only (never consumed).
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user