Compare commits

..

1 Commits

Author SHA1 Message Date
d77f01e31a feat: /stats command — comprehensive system dashboard
Add /stats slash command showing model config, context engine
(Semantic RLE status), skills usage (from curator telemetry),
curator state, cron jobs, 24h activity, and system memory.

Pure local compute — no LLM call, no prompt-cache impact.

- COMMAND_REGISTRY: stats (Info category)
- gateway/run.py: dispatch + _handle_stats_command handler
- tests/gateway/test_stats_command.py: 7 tests
2026-05-29 15:22:27 +00:00
6 changed files with 380 additions and 271 deletions

View File

@ -580,13 +580,6 @@ class ContextCompressor(ContextEngine):
self.summary_model = summary_model_override or ""
# Compression-model fallback: set by check_compression_model_feasibility
# when the primary aux compression model fails the minimum context check.
# If set, _generate_summary uses this provider/model for the LLM call
# instead of the main compressor attributes. Dict keys:
# provider, model, base_url, api_key
self._compression_fallback: Optional[Dict[str, str]] = None
# Stores the previous compaction summary for iterative updates
self._previous_summary: Optional[str] = None
# Anti-thrashing: track whether last compression was effective
@ -1076,20 +1069,6 @@ The user has requested that this compaction PRIORITISE preserving all informatio
}
if self.summary_model:
call_kwargs["model"] = self.summary_model
# Compression-model fallback: when the primary aux compression
# model was rejected for insufficient context, the feasibility
# check stored a replacement provider/model here. Override the
# entire main_runtime so call_llm routes the summary request to
# the fallback provider instead of the main one.
if self._compression_fallback:
_fb = self._compression_fallback
call_kwargs["main_runtime"] = {
"model": _fb["model"],
"provider": _fb["provider"],
"base_url": _fb.get("base_url", ""),
"api_key": _fb.get("api_key", ""),
"api_mode": _fb.get("api_mode", self.api_mode),
}
response = call_llm(**call_kwargs)
content = response.choices[0].message.content
# Handle cases where content is not a string (e.g., dict from llama.cpp)

View File

@ -221,101 +221,9 @@ def check_compression_model_feasibility(agent: Any) -> None:
new_threshold,
)
except ValueError:
# Primary compression model failed the minimum context check
# (context_length < MINIMUM_CONTEXT_LENGTH). Before giving up,
# try the user's fallback provider chain so a model switch or
# provider outage doesn't silently disable compression.
_fallback_chain = getattr(agent, '_fallback_chain', None) or []
_tried = [f"{aux_model} ({_aux_cfg_provider or 'auto'}): {aux_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"]
for _fb_entry in _fallback_chain:
_fb_provider = _fb_entry.get("provider", "")
_fb_model = _fb_entry.get("model", "")
if not _fb_provider or not _fb_model:
continue
try:
from agent.auxiliary_client import resolve_provider_client
_fb_client, _fb_resolved_model = resolve_provider_client(
_fb_provider,
_fb_model,
explicit_base_url=_fb_entry.get("base_url", ""),
explicit_api_key=_fb_entry.get("api_key", ""),
main_runtime=agent._current_main_runtime(),
)
if _fb_client is None or not _fb_resolved_model:
_tried.append(f"{_fb_model} ({_fb_provider}): unavailable")
continue
_fb_base_url = str(getattr(_fb_client, "base_url", ""))
_fb_api_key_raw = getattr(_fb_client, "api_key", "")
_fb_api_key = (
""
if callable(_fb_api_key_raw) and not isinstance(_fb_api_key_raw, str)
else str(_fb_api_key_raw or "")
)
_fb_context = get_model_context_length(
_fb_resolved_model,
base_url=_fb_base_url,
api_key=_fb_api_key,
provider=_fb_provider,
custom_providers=getattr(agent, "_custom_providers", None),
)
if _fb_context and _fb_context < MINIMUM_CONTEXT_LENGTH:
_tried.append(
f"{_fb_resolved_model} ({_fb_provider}): "
f"{_fb_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"
)
continue
# ── Found a suitable fallback ──────────────────────────
logger.warning(
"Compression model %s (%s) has only %d token context "
"(minimum %d). Falling back to %s (%s) with %d token context.",
aux_model, _aux_cfg_provider or "auto", aux_context,
MINIMUM_CONTEXT_LENGTH, _fb_resolved_model, _fb_provider,
_fb_context or 0,
)
agent.context_compressor._compression_fallback = {
"provider": _fb_provider,
"model": _fb_resolved_model,
"base_url": _fb_base_url,
"api_key": _fb_api_key,
}
_msg = (
f"⚠ Compression model {aux_model} has only "
f"{aux_context:,} token context (minimum "
f"{MINIMUM_CONTEXT_LENGTH:,} required). "
f"Falling back to {_fb_resolved_model} ({_fb_provider}) "
f"for summaries."
)
agent._compression_warning = _msg
agent._emit_status(_msg)
return
except Exception as _fb_err:
_tried.append(f"{_fb_model} ({_fb_provider}): {_fb_err}")
continue
# No fallback worked — warn and let compression run without
# summaries (same behavior as 'no auxiliary LLM' above).
_all_tried = "; ".join(_tried)
_msg = (
f"⚠ No suitable compression model available. "
f"Tried: {_all_tried}. "
f"Compression will drop middle turns without summaries. "
f"Run `hermes setup` or set "
f"auxiliary.compression.model in config.yaml."
)
agent._compression_warning = _msg
agent._emit_status(_msg)
logger.warning("Compression model fallback exhausted: %s", _all_tried)
return
# Hard rejections (aux below minimum context) must propagate
# so the session refuses to start.
raise
except Exception as exc:
logger.debug(
"Compression feasibility check failed (non-fatal): %s", exc

View File

@ -7535,6 +7535,9 @@ class GatewayRunner:
if canonical == "insights":
return await self._handle_insights_command(event)
if canonical == "stats":
return await self._handle_stats_command(event)
if canonical == "reload-mcp":
return await self._handle_reload_mcp_command(event)
@ -13234,6 +13237,166 @@ class GatewayRunner:
logger.error("Insights command error: %s", e, exc_info=True)
return t("gateway.insights.error", error=e)
async def _handle_stats_command(self, event: MessageEvent) -> str:
"""Handle /stats command — comprehensive system and model report."""
import json, time as _time, os as _os
from datetime import datetime, timezone, timedelta
source = event.source
session_entry = self.session_store.get_or_create_session(source)
lines: list[str] = []
_now = datetime.now(timezone.utc)
lines.append("📊 **Hermes Stats**")
lines.append("")
# ── Model ──
cfg = self._read_user_config()
model_cfg = cfg.get("model", {}) if isinstance(cfg, dict) else {}
main_model = model_cfg.get("default", "") if isinstance(model_cfg, dict) else str(model_cfg)
main_provider = model_cfg.get("provider", "") if isinstance(model_cfg, dict) else ""
fallback_chain = cfg.get("fallback_providers", []) if isinstance(cfg, dict) else []
fb_models = [f"{fb.get('model','?')} ({fb.get('provider','?')})" for fb in fallback_chain[:3]]
ctx_len = "?"
ctx_engine_name = "compressor"
try:
_cache_lock = getattr(self, "_agent_cache_lock", None)
if _cache_lock:
with _cache_lock:
_cached = self._agent_cache.get(session_entry.session_key)
if _cached:
_agent, _, _ts = _cached
_cc = getattr(_agent, "context_compressor", None)
if _cc:
ctx_len = f"{getattr(_cc, 'context_length', 0):,}"
_ce = getattr(_cc, "name", None) or type(_cc).__name__
ctx_engine_name = _ce.lower() if _ce != "ContextCompressor" else "compressor"
except Exception:
pass
lines.append("**🤖 Model**")
lines.append(f" Main: `{main_model}` ({main_provider})")
if fb_models:
lines.append(f" Fallback: {', '.join(fb_models)}")
lines.append(f" Context: {ctx_len} tokens | Engine: {ctx_engine_name}")
lines.append("")
# ── Context Engine (Semantic RLE) ──
_rle_dir = _os.path.expanduser("~/.hermes/hermes-agent/plugins/context_engine/semantic_rle")
lines.append("**🧠 Context Engine**")
if _os.path.isdir(_rle_dir):
lines.append(" Semantic RLE: installed ✓")
else:
lines.append(" Semantic RLE: not installed")
lines.append("")
# ── Skills Usage ──
_skills_usage_path = _os.path.expanduser("~/.hermes/skills/.usage.json")
skills_active = 0
skills_agent = 0
skills_top = []
if _os.path.isfile(_skills_usage_path):
try:
with open(_skills_usage_path) as f:
_su = json.load(f)
for _sn, _sd in _su.items():
if _sd.get("state") == "active":
skills_active += 1
if _sd.get("created_by") == "agent":
skills_agent += 1
_by_use = sorted(_su.items(), key=lambda x: x[1].get("use_count", 0), reverse=True)
skills_top = [(_sn, _sd.get("use_count", 0)) for _sn, _sd in _by_use[:5]]
except Exception:
pass
lines.append("**📚 Skills**")
lines.append(f" Active: {skills_active} | Agent-created: {skills_agent}")
if skills_top:
_top_str = ", ".join(f"`{n}` ({c})" for n, c in skills_top)
lines.append(f" Top: {_top_str}")
lines.append("")
# ── Curator ──
_curator_state_path = _os.path.expanduser("~/.hermes/skills/.curator_state.json")
curator_runs = 0
curator_last = "never"
curator_archived = 0
if _os.path.isfile(_curator_state_path):
try:
with open(_curator_state_path) as f:
_cs = json.load(f)
curator_runs = _cs.get("runs", 0)
_cls = _cs.get("last_run")
if _cls:
_dt = datetime.fromisoformat(_cls.replace("Z", "+00:00"))
_delta = _now - _dt
if _delta.days > 0:
curator_last = f"{_delta.days}d ago"
elif _delta.seconds > 3600:
curator_last = f"{_delta.seconds // 3600}h ago"
else:
curator_last = f"{_delta.seconds // 60}m ago"
curator_archived = _cs.get("archived_count", 0)
except Exception:
pass
lines.append("**🌱 Curator**")
lines.append(f" Runs: {curator_runs} | Last: {curator_last} | Archived: {curator_archived}")
lines.append("")
# ── Cron ──
try:
from cron.jobs import load_jobs
_jobs = load_jobs()
_active_jobs = [j for j in _jobs if j.get("paused") != True]
_job_names = [j.get("name", j.get("job_id", "?")[:8]) for j in _active_jobs[:5]]
lines.append("**⏰ Cron**")
if _job_names:
lines.append(f" Active: {len(_active_jobs)}{', '.join(_job_names)}")
else:
lines.append(" Active: 0")
lines.append("")
except Exception:
pass
# ── Recent Activity ──
try:
from hermes_state import SessionDB
_db = SessionDB()
_recent = _db.list_sessions(limit=5, offset=0)
_db.close()
if _recent:
_recent_24h = 0
_recent_msgs = 0
_cutoff = _now - timedelta(hours=24)
for _rs in _recent:
_t = _rs.get("updated_at") or _rs.get("created_at")
if _t:
_dt = datetime.fromisoformat(str(_t).replace("Z", "+00:00")) if isinstance(_t, str) else _t
if _dt > _cutoff:
_recent_24h += 1
_recent_msgs += _rs.get("message_count", 0)
lines.append("**⏱️ Activity (24h)**")
lines.append(f" Sessions: {_recent_24h} | Messages: {_recent_msgs}")
_latest = _recent[0] if _recent else {}
_latest_preview = (_latest.get("preview") or "")[:80]
if _latest_preview:
lines.append(f" Latest: _{_latest_preview}_")
lines.append("")
except Exception:
pass
# ── System ──
try:
import psutil
_proc = psutil.Process()
_mem_mb = _proc.memory_info().rss / 1024 / 1024
lines.append("**🔧 System**")
lines.append(f" Memory: {_mem_mb:.0f} MB")
except ImportError:
pass
except Exception:
pass
return "\n".join(lines)
async def _handle_reload_mcp_command(self, event: MessageEvent) -> Optional[str]:
"""Handle /reload-mcp — reconnect MCP servers and rebuild the cached agent.

View File

@ -98,6 +98,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
aliases=("bg", "btw"), args_hint="<prompt>"),
CommandDef("agents", "Show active agents and running tasks", "Session",
aliases=("tasks",)),
CommandDef("stats", "Show comprehensive system stats — model, skills, curator, cron, activity", "Info"),
CommandDef("queue", "Queue a prompt for the next turn (doesn't interrupt)", "Session",
aliases=("q",), args_hint="<prompt>"),
CommandDef("steer", "Inject a message after the next tool call without interrupting", "Session",

View File

@ -0,0 +1,204 @@
"""Tests for /stats command — comprehensive system report."""
from unittest.mock import MagicMock, AsyncMock, patch
import json, os, tempfile, time
from datetime import datetime, timezone, timedelta
import pytest
# ---- Helpers ----
def _mock_event(command="stats", args=""):
"""Build a minimal MessageEvent for slash-command tests."""
event = MagicMock()
event.source.platform.value = "telegram"
event.source.chat_id = "test-chat"
event.source.chat_type = "dm"
event.get_command.return_value = command
event.get_command_args.return_value = args
return event
def _make_runner():
"""Build a GatewayRunner with minimum wiring for _handle_stats_command."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
# Session store mocks
runner.session_store = MagicMock()
session_entry = MagicMock()
session_entry.session_key = "agent:main:telegram:dm:test-chat"
session_entry.session_id = "20260529_test"
runner.session_store.get_or_create_session.return_value = session_entry
# Config mock
runner._read_user_config = MagicMock(return_value={
"model": {"default": "deepseek-v4-pro", "provider": "opencode_go"},
"fallback_providers": [
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
],
})
# Agent cache
runner._agent_cache = {}
runner._agent_cache_lock = MagicMock()
runner._agent_cache_lock.__enter__ = lambda s: None
runner._agent_cache_lock.__exit__ = lambda s, *a: None
return runner
# ---- Tests ----
class TestStatsCommand:
"""Unit tests for _handle_stats_command."""
@pytest.mark.asyncio
async def test_output_has_expected_sections(self):
"""Output contains all major dashboard sections."""
runner = _make_runner()
event = _mock_event()
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = [
{"session_id": "s1", "updated_at": datetime.now(timezone.utc).isoformat(), "message_count": 5}
]
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
# All sections should be present for a working config
assert "Hermes Stats" in result
assert "Model" in result
assert "deepseek-v4-pro" in result
assert "opencode_go" in result
assert "Context Engine" in result
assert "Skills" in result
assert "Curator" in result
assert "Cron" in result
assert "Activity" in result
@pytest.mark.asyncio
async def test_model_section_shows_fallback(self):
"""Fallback chain is rendered when configured."""
runner = _make_runner()
runner._read_user_config.return_value["fallback_providers"] = [
{"provider": "custom", "model": "gemma-local"},
]
event = _mock_event()
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "gemma-local" in result
assert "custom" in result
@pytest.mark.asyncio
async def test_no_fallback_shown_when_empty(self):
"""No fallback line when chain is empty."""
runner = _make_runner()
runner._read_user_config.return_value["fallback_providers"] = []
event = _mock_event()
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "Fallback" not in result
@pytest.mark.asyncio
async def test_curator_section_no_state_file(self):
"""Curator section shows defaults when no state file exists."""
runner = _make_runner()
event = _mock_event()
with (
patch("hermes_state.SessionDB") as mock_db_cls,
patch("os.path.isfile", return_value=False),
):
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "Runs: 0" in result
assert "Archived: 0" in result
@pytest.mark.asyncio
async def test_activity_section_with_recent_sessions(self):
"""Activity section shows 24h stats from session DB."""
runner = _make_runner()
event = _mock_event()
now = datetime.now(timezone.utc)
recent = [
{
"session_id": "s1", "title": "Debug session",
"updated_at": now.isoformat(),
"message_count": 42,
"preview": "Fixed the bug",
},
{
"session_id": "s2",
"updated_at": (now - timedelta(hours=48)).isoformat(),
"message_count": 10,
},
]
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = recent
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
# First session is within 24h
assert "Sessions: 1" in result
assert "Messages: 42" in result
@pytest.mark.asyncio
async def test_cron_section_handles_import_error(self):
"""Cron section is skipped gracefully when cron module unavailable."""
runner = _make_runner()
event = _mock_event()
with (
patch("hermes_state.SessionDB") as mock_db_cls,
patch("cron.jobs.load_jobs", side_effect=ImportError("no cron")),
):
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert isinstance(result, str)
assert "Hermes Stats" in result
# Cron section should be absent (not crash)
assert "Cron" not in result
@pytest.mark.asyncio
async def test_rle_section_not_installed(self):
"""RLE section shows 'not installed' when plugin dir missing."""
runner = _make_runner()
event = _mock_event()
with (
patch("hermes_state.SessionDB") as mock_db_cls,
patch("os.path.isdir", return_value=False),
):
mock_db = MagicMock()
mock_db.list_sessions.return_value = [
{"session_id": "s1", "updated_at": datetime.now(timezone.utc).isoformat(), "message_count": 0}
]
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "not installed" in result

View File

@ -57,7 +57,6 @@ def _make_agent(
compressor = MagicMock(spec=ContextCompressor)
compressor.context_length = main_context
compressor.threshold_tokens = int(main_context * threshold_percent)
compressor._compression_fallback = None
agent.context_compressor = compressor
return agent
@ -102,169 +101,24 @@ def test_auto_corrects_threshold_when_aux_context_below_threshold(mock_get_clien
@patch("agent.model_metadata.get_model_context_length", return_value=32_768)
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_rejects_aux_below_minimum_context(mock_get_client, mock_ctx_len):
"""When aux context < MINIMUM_CONTEXT_LENGTH (64K) and no fallback
providers are configured, a warning is emitted and compression will
operate without summaries. Previously this raised ValueError; now it
degrades gracefully so a model switch doesn't kill the session."""
"""Hard floor: aux context < MINIMUM_CONTEXT_LENGTH (64K) → session
refuses to start (ValueError), mirroring the main-model rejection."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_client = MagicMock()
mock_client.base_url = "https://openrouter.ai/api/v1"
mock_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_client, "tiny-aux-model")
messages = []
agent._emit_status = lambda msg: messages.append(msg)
agent._emit_status = lambda msg: None
# No fallback chain → should warn, not raise
agent._fallback_chain = []
agent._check_compression_model_feasibility()
assert len(messages) == 1
assert "No suitable compression model" in messages[0]
assert "tiny-aux-model" in messages[0]
assert "32,768" in messages[0]
assert "64,000" in messages[0]
assert agent._compression_warning is not None
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_falls_back_to_chain_when_aux_below_minimum(mock_get_client, mock_ctx_len):
"""When the primary aux model fails the context-length floor, the
feasibility check tries each fallback provider in order, using the
first one that meets MINIMUM_CONTEXT_LENGTH."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
# Primary aux model: too small (32K)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny-aux-model")
# Fallback chain: two providers, first one meets the floor
agent._fallback_chain = [
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
{"provider": "custom", "model": "gemma-local",
"base_url": "http://127.0.0.1:8081/v1", "api_key": "no-key"},
]
# Mock resolve_provider_client for the fallback resolution
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://api.opencode.ai/v1"
mock_fb_client.api_key = "sk-fallback"
# get_model_context_length: first return 32K (primary fail),
# then return 128K (fallback success)
mock_ctx_len.side_effect = [32_768, 128_000]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
with patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_fb_client, "deepseek-v4-pro")) as mock_resolve:
with pytest.raises(ValueError) as exc_info:
agent._check_compression_model_feasibility()
# Should have resolved the fallback provider
mock_resolve.assert_called_once()
# First two positional args: provider, model
assert mock_resolve.call_args[0][0] == "opencode_go"
assert mock_resolve.call_args[0][1] == "deepseek-v4-pro"
# Warning should mention the fallback choice
assert len(messages) == 1
assert "Falling back to" in messages[0]
assert "deepseek-v4-pro" in messages[0]
assert "opencode_go" in messages[0]
# Fallback dict stored on compressor
fb = agent.context_compressor._compression_fallback
assert fb is not None
assert fb["provider"] == "opencode_go"
assert fb["model"] == "deepseek-v4-pro"
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_falls_back_past_unavailable_provider(mock_get_client, mock_ctx_len):
"""When the first fallback provider is unavailable, skip it and
try the next one."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny")
# Fallback chain: first unavailable, second works
agent._fallback_chain = [
{"provider": "broken-provider", "model": "broken-model"},
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
]
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://api.opencode.ai/v1"
mock_fb_client.api_key = "sk-fallback"
# Primary: 32K (fail), broken-provider: unavailable, opencode_go: 128K
mock_ctx_len.side_effect = [32_768, None, 128_000]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
# First resolve returns None (unavailable), second returns client
mock_resolve_values = [(None, None), (mock_fb_client, "deepseek-v4-pro")]
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=mock_resolve_values) as mock_resolve:
agent._check_compression_model_feasibility()
# Should have tried both fallbacks
assert mock_resolve.call_count == 2
# Should succeed with the second fallback
fb = agent.context_compressor._compression_fallback
assert fb is not None
assert fb["provider"] == "opencode_go"
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_warns_when_all_fallbacks_exhausted(mock_get_client, mock_ctx_len):
"""When every fallback provider also fails the context floor or is
unavailable, emit a warning and degrade to no-summary mode without
raising."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny-main")
agent._fallback_chain = [
{"provider": "small-provider", "model": "small-model"},
]
# Fallback also too small
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://small.api/v1"
mock_fb_client.api_key = "sk-small"
mock_ctx_len.side_effect = [32_768, 16_384]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
# Mock compressor won't have _compression_fallback until set —
# initialize it so the final assertion works.
agent.context_compressor._compression_fallback = None
with patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_fb_client, "small-model")):
agent._check_compression_model_feasibility()
assert len(messages) == 1
assert "No suitable compression model" in messages[0]
assert "small-model" in messages[0]
assert agent._compression_warning is not None
# No fallback on compressor
assert agent.context_compressor._compression_fallback is None
err = str(exc_info.value)
assert "tiny-aux-model" in err
assert "32,768" in err
assert "64,000" in err
assert "below the minimum" in err
@patch("agent.model_metadata.get_model_context_length", return_value=200_000)