"""Tests for _check_compression_model_feasibility() — warns when the auxiliary compression model's context is smaller than the main model's compression threshold. Two-phase design: 1. __init__ → runs the check, prints via _vprint (CLI), stores warning 2. run_conversation (first call) → replays stored warning through status_callback (gateway platforms) """ from unittest.mock import MagicMock, patch import pytest from run_agent import AIAgent from agent.context_compressor import ContextCompressor @pytest.fixture(autouse=True) def _stable_aux_provider_config(): """Keep feasibility tests independent from the developer's config.yaml.""" with patch( "agent.auxiliary_client._resolve_task_provider_model", return_value=("auto", None, None, None, None), ): yield def _make_agent( *, compression_enabled: bool = True, threshold_percent: float = 0.50, main_context: int = 200_000, ) -> AIAgent: """Build a minimal AIAgent with a compressor, skipping __init__.""" agent = AIAgent.__new__(AIAgent) agent.model = "test-main-model" agent.provider = "openrouter" agent.base_url = "https://openrouter.ai/api/v1" agent.api_key = "sk-test" agent.api_mode = "chat_completions" agent.quiet_mode = True agent.log_prefix = "" agent.compression_enabled = compression_enabled agent._print_fn = None agent.suppress_status_output = False agent._stream_consumers = [] agent._executing_tools = False agent._mute_post_response = False agent.status_callback = None agent.tool_progress_callback = None agent._compression_warning = None agent._aux_compression_context_length_config = None agent._custom_providers = [] agent.tools = [] compressor = MagicMock(spec=ContextCompressor) compressor.context_length = main_context compressor.threshold_tokens = int(main_context * threshold_percent) compressor._compression_fallback = None agent.context_compressor = compressor return agent # ── Core warning logic ────────────────────────────────────────────── @patch("agent.model_metadata.get_model_context_length", return_value=80_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_auto_corrects_threshold_when_aux_context_below_threshold(mock_get_client, mock_ctx_len): """Auto-correction: aux >= 64K floor but < threshold → lower threshold to aux_context so compression still works this session.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) # threshold = 100,000 — aux has 80,000 (above 64K floor, below threshold) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "google/gemini-3-flash-preview") messages = [] agent._emit_status = lambda msg: messages.append(msg) agent._check_compression_model_feasibility() assert len(messages) == 1 assert "Compression model" in messages[0] assert "80,000" in messages[0] # aux context assert "100,000" in messages[0] # old threshold assert "Auto-lowered" in messages[0] # Actionable persistence guidance included assert "config.yaml" in messages[0] assert "auxiliary:" in messages[0] assert "compression:" in messages[0] assert "threshold:" in messages[0] # Warning stored for gateway replay assert agent._compression_warning is not None # Threshold on the live compressor was actually lowered to aux_context. assert agent.context_compressor.threshold_tokens == 80_000 @patch("agent.model_metadata.get_model_context_length", return_value=32_768) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_rejects_aux_below_minimum_context(mock_get_client, mock_ctx_len): """When aux context < MINIMUM_CONTEXT_LENGTH (64K) and no fallback providers are configured, a warning is emitted and compression will operate without summaries. Previously this raised ValueError; now it degrades gracefully so a model switch doesn't kill the session.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "tiny-aux-model") messages = [] agent._emit_status = lambda msg: messages.append(msg) # No fallback chain → should warn, not raise agent._fallback_chain = [] agent._check_compression_model_feasibility() assert len(messages) == 1 assert "No suitable compression model" in messages[0] assert "tiny-aux-model" in messages[0] assert "32,768" in messages[0] assert "64,000" in messages[0] assert agent._compression_warning is not None @patch("agent.model_metadata.get_model_context_length") @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_falls_back_to_chain_when_aux_below_minimum(mock_get_client, mock_ctx_len): """When the primary aux model fails the context-length floor, the feasibility check tries each fallback provider in order, using the first one that meets MINIMUM_CONTEXT_LENGTH.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) # Primary aux model: too small (32K) mock_primary_client = MagicMock() mock_primary_client.base_url = "https://openrouter.ai/api/v1" mock_primary_client.api_key = "sk-aux" mock_get_client.return_value = (mock_primary_client, "tiny-aux-model") # Fallback chain: two providers, first one meets the floor agent._fallback_chain = [ {"provider": "opencode_go", "model": "deepseek-v4-pro"}, {"provider": "custom", "model": "gemma-local", "base_url": "http://127.0.0.1:8081/v1", "api_key": "no-key"}, ] # Mock resolve_provider_client for the fallback resolution mock_fb_client = MagicMock() mock_fb_client.base_url = "https://api.opencode.ai/v1" mock_fb_client.api_key = "sk-fallback" # get_model_context_length: first return 32K (primary fail), # then return 128K (fallback success) mock_ctx_len.side_effect = [32_768, 128_000] messages = [] agent._emit_status = lambda msg: messages.append(msg) with patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_fb_client, "deepseek-v4-pro")) as mock_resolve: agent._check_compression_model_feasibility() # Should have resolved the fallback provider mock_resolve.assert_called_once() # First two positional args: provider, model assert mock_resolve.call_args[0][0] == "opencode_go" assert mock_resolve.call_args[0][1] == "deepseek-v4-pro" # Warning should mention the fallback choice assert len(messages) == 1 assert "Falling back to" in messages[0] assert "deepseek-v4-pro" in messages[0] assert "opencode_go" in messages[0] # Fallback dict stored on compressor fb = agent.context_compressor._compression_fallback assert fb is not None assert fb["provider"] == "opencode_go" assert fb["model"] == "deepseek-v4-pro" @patch("agent.model_metadata.get_model_context_length") @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_falls_back_past_unavailable_provider(mock_get_client, mock_ctx_len): """When the first fallback provider is unavailable, skip it and try the next one.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_primary_client = MagicMock() mock_primary_client.base_url = "https://openrouter.ai/api/v1" mock_primary_client.api_key = "sk-aux" mock_get_client.return_value = (mock_primary_client, "tiny") # Fallback chain: first unavailable, second works agent._fallback_chain = [ {"provider": "broken-provider", "model": "broken-model"}, {"provider": "opencode_go", "model": "deepseek-v4-pro"}, ] mock_fb_client = MagicMock() mock_fb_client.base_url = "https://api.opencode.ai/v1" mock_fb_client.api_key = "sk-fallback" # Primary: 32K (fail), broken-provider: unavailable, opencode_go: 128K mock_ctx_len.side_effect = [32_768, None, 128_000] messages = [] agent._emit_status = lambda msg: messages.append(msg) # First resolve returns None (unavailable), second returns client mock_resolve_values = [(None, None), (mock_fb_client, "deepseek-v4-pro")] with patch("agent.auxiliary_client.resolve_provider_client", side_effect=mock_resolve_values) as mock_resolve: agent._check_compression_model_feasibility() # Should have tried both fallbacks assert mock_resolve.call_count == 2 # Should succeed with the second fallback fb = agent.context_compressor._compression_fallback assert fb is not None assert fb["provider"] == "opencode_go" @patch("agent.model_metadata.get_model_context_length") @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_warns_when_all_fallbacks_exhausted(mock_get_client, mock_ctx_len): """When every fallback provider also fails the context floor or is unavailable, emit a warning and degrade to no-summary mode without raising.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_primary_client = MagicMock() mock_primary_client.base_url = "https://openrouter.ai/api/v1" mock_primary_client.api_key = "sk-aux" mock_get_client.return_value = (mock_primary_client, "tiny-main") agent._fallback_chain = [ {"provider": "small-provider", "model": "small-model"}, ] # Fallback also too small mock_fb_client = MagicMock() mock_fb_client.base_url = "https://small.api/v1" mock_fb_client.api_key = "sk-small" mock_ctx_len.side_effect = [32_768, 16_384] messages = [] agent._emit_status = lambda msg: messages.append(msg) # Mock compressor won't have _compression_fallback until set — # initialize it so the final assertion works. agent.context_compressor._compression_fallback = None with patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_fb_client, "small-model")): agent._check_compression_model_feasibility() assert len(messages) == 1 assert "No suitable compression model" in messages[0] assert "small-model" in messages[0] assert agent._compression_warning is not None # No fallback on compressor assert agent.context_compressor._compression_fallback is None @patch("agent.model_metadata.get_model_context_length", return_value=200_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_no_warning_when_aux_context_sufficient(mock_get_client, mock_ctx_len): """No warning when aux model context >= main model threshold.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) # threshold = 100,000 — aux has 200,000 (sufficient) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "google/gemini-2.5-flash") messages = [] agent._emit_status = lambda msg: messages.append(msg) agent._check_compression_model_feasibility() assert len(messages) == 0 assert agent._compression_warning is None def test_feasibility_check_passes_live_main_runtime(): """Compression feasibility should probe using the live session runtime.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) agent.model = "gpt-5.4" agent.provider = "openai-codex" agent.base_url = "https://chatgpt.com/backend-api/codex" agent.api_key = "codex-token" agent.api_mode = "codex_responses" mock_client = MagicMock() mock_client.base_url = "https://chatgpt.com/backend-api/codex" mock_client.api_key = "codex-token" with patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(mock_client, "gpt-5.4")) as mock_get_client, \ patch("agent.model_metadata.get_model_context_length", return_value=200_000): agent._emit_status = lambda msg: None agent._check_compression_model_feasibility() mock_get_client.assert_called_once_with( "compression", main_runtime={ "model": "gpt-5.4", "provider": "openai-codex", "base_url": "https://chatgpt.com/backend-api/codex", "api_key": "codex-token", "api_mode": "codex_responses", }, ) @patch("agent.model_metadata.get_model_context_length", return_value=1_000_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_feasibility_check_passes_config_context_length(mock_get_client, mock_ctx_len): """auxiliary.compression.context_length from config is forwarded to get_model_context_length so custom endpoints that lack /models still report the correct context window (fixes #8499).""" agent = _make_agent(main_context=200_000, threshold_percent=0.85) agent._aux_compression_context_length_config = 1_000_000 mock_client = MagicMock() mock_client.base_url = "http://custom-endpoint:8080/v1" mock_client.api_key = "sk-custom" mock_get_client.return_value = (mock_client, "custom/big-model") agent._emit_status = lambda msg: None agent._check_compression_model_feasibility() mock_ctx_len.assert_called_once_with( "custom/big-model", base_url="http://custom-endpoint:8080/v1", api_key="sk-custom", config_context_length=1_000_000, provider="openrouter", custom_providers=[], ) @patch("agent.model_metadata.get_model_context_length", return_value=128_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_feasibility_check_ignores_invalid_context_length(mock_get_client, mock_ctx_len): """Non-integer context_length in config is silently ignored.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) agent._aux_compression_context_length_config = None mock_client = MagicMock() mock_client.base_url = "http://custom:8080/v1" mock_client.api_key = "sk-test" mock_get_client.return_value = (mock_client, "custom/model") agent._emit_status = lambda msg: None agent._check_compression_model_feasibility() mock_ctx_len.assert_called_once_with( "custom/model", base_url="http://custom:8080/v1", api_key="sk-test", config_context_length=None, provider="openrouter", custom_providers=[], ) def test_init_feasibility_check_uses_aux_context_override_from_config(): """Lazy feasibility check should cache and forward auxiliary.compression.context_length. NB: feasibility check is deferred from AIAgent.__init__ to the first actual compression attempt (saves ~400ms cold startup on short sessions that never trigger compression). The test drives the check explicitly via ``agent._check_compression_model_feasibility()`` to assert the config-override threading. """ class _StubCompressor: def __init__(self, *args, **kwargs): self.context_length = 200_000 self.threshold_tokens = 100_000 self.threshold_percent = 0.50 def get_tool_schemas(self): return [] def on_session_start(self, *args, **kwargs): return None cfg = { "auxiliary": { "compression": { "context_length": 1_000_000, }, }, } mock_client = MagicMock() mock_client.base_url = "http://custom-endpoint:8080/v1" mock_client.api_key = "sk-custom" with ( patch("hermes_cli.config.load_config", return_value=cfg), patch("run_agent.get_tool_definitions", return_value=[]), patch("run_agent.check_toolset_requirements", return_value={}), patch("run_agent.OpenAI"), patch("run_agent.ContextCompressor", new=_StubCompressor), patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(mock_client, "custom/big-model")), patch("agent.model_metadata.get_model_context_length", return_value=1_000_000) as mock_ctx_len, ): agent = AIAgent( api_key="test-key-1234567890", base_url="https://openrouter.ai/api/v1", quiet_mode=True, skip_context_files=True, skip_memory=True, ) # Config override is captured eagerly in __init__ (still needed # because the threshold-derivation logic at construction time # consults it). assert agent._aux_compression_context_length_config == 1_000_000 # The expensive feasibility probe is deferred. Drive it manually # to validate the call shape still forwards the override correctly. agent._check_compression_model_feasibility() mock_ctx_len.assert_called_once_with( "custom/big-model", base_url="http://custom-endpoint:8080/v1", api_key="sk-custom", config_context_length=1_000_000, provider="", custom_providers=[], ) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_warns_when_no_auxiliary_provider(mock_get_client): """Warning emitted when no auxiliary provider is configured.""" agent = _make_agent() mock_get_client.return_value = (None, None) messages = [] agent._emit_status = lambda msg: messages.append(msg) agent._check_compression_model_feasibility() assert len(messages) == 1 assert "No auxiliary LLM provider" in messages[0] assert agent._compression_warning is not None def test_skips_check_when_compression_disabled(): """No check performed when compression is disabled.""" agent = _make_agent(compression_enabled=False) messages = [] agent._emit_status = lambda msg: messages.append(msg) agent._check_compression_model_feasibility() assert len(messages) == 0 assert agent._compression_warning is None @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_exception_does_not_crash(mock_get_client): """Exceptions in the check are caught — never blocks startup.""" agent = _make_agent() mock_get_client.side_effect = RuntimeError("boom") messages = [] agent._emit_status = lambda msg: messages.append(msg) # Should not raise agent._check_compression_model_feasibility() # No user-facing message (error is debug-logged) assert len(messages) == 0 @patch("agent.model_metadata.get_model_context_length", return_value=100_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_exact_threshold_boundary_no_warning(mock_get_client, mock_ctx_len): """No warning when aux context exactly equals the threshold.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "test-model") messages = [] agent._emit_status = lambda msg: messages.append(msg) agent._check_compression_model_feasibility() assert len(messages) == 0 @patch("agent.model_metadata.get_model_context_length", return_value=99_999) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_just_below_threshold_auto_corrects(mock_get_client, mock_ctx_len): """Auto-correct fires when aux context is one token below the threshold (and above the 64K hard floor).""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "small-model") messages = [] agent._emit_status = lambda msg: messages.append(msg) agent._check_compression_model_feasibility() assert len(messages) == 1 assert "small-model" in messages[0] assert "Auto-lowered" in messages[0] assert agent.context_compressor.threshold_tokens == 99_999 # ── Two-phase: __init__ + run_conversation replay ─────────────────── @patch("agent.model_metadata.get_model_context_length", return_value=80_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_warning_stored_for_gateway_replay(mock_get_client, mock_ctx_len): """__init__ stores the warning; _replay sends it through status_callback.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "google/gemini-3-flash-preview") # Phase 1: __init__ — _emit_status prints (CLI) but callback is None vprint_messages = [] agent._emit_status = lambda msg: vprint_messages.append(msg) agent._check_compression_model_feasibility() assert len(vprint_messages) == 1 # CLI got it assert agent._compression_warning is not None # stored for replay # Phase 2: gateway wires callback post-init, then run_conversation replays callback_events = [] agent.status_callback = lambda ev, msg: callback_events.append((ev, msg)) agent._replay_compression_warning() assert any( ev == "lifecycle" and "Auto-lowered" in msg for ev, msg in callback_events ) @patch("agent.model_metadata.get_model_context_length", return_value=200_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_no_replay_when_no_warning(mock_get_client, mock_ctx_len): """_replay_compression_warning is a no-op when there's no stored warning.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "big-model") agent._emit_status = lambda msg: None agent._check_compression_model_feasibility() assert agent._compression_warning is None callback_events = [] agent.status_callback = lambda ev, msg: callback_events.append((ev, msg)) agent._replay_compression_warning() assert len(callback_events) == 0 def test_replay_without_callback_is_noop(): """_replay_compression_warning doesn't crash when status_callback is None.""" agent = _make_agent() agent._compression_warning = "some warning" agent.status_callback = None # Should not raise agent._replay_compression_warning() @patch("agent.model_metadata.get_model_context_length", return_value=80_000) @patch("agent.auxiliary_client.get_text_auxiliary_client") def test_run_conversation_clears_warning_after_replay(mock_get_client, mock_ctx_len): """After replay in run_conversation, _compression_warning is cleared so the warning is not sent again on subsequent turns.""" agent = _make_agent(main_context=200_000, threshold_percent=0.50) mock_client = MagicMock() mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.api_key = "sk-aux" mock_get_client.return_value = (mock_client, "small-model") agent._emit_status = lambda msg: None agent._check_compression_model_feasibility() assert agent._compression_warning is not None # Simulate what run_conversation does callback_events = [] agent.status_callback = lambda ev, msg: callback_events.append((ev, msg)) if agent._compression_warning: agent._replay_compression_warning() agent._compression_warning = None # as in run_conversation assert len(callback_events) == 1 # Second turn — nothing replayed callback_events.clear() if agent._compression_warning: agent._replay_compression_warning() agent._compression_warning = None assert len(callback_events) == 0