Compare commits

..

1 Commits

Author SHA1 Message Date
907f660dfd feat: quota watchdog for OpenCode Go + Codex
- agent/quota_watchdog.py: core logic — SessionDB token tracking,
  Codex API integration, threshold comparison, auto-fallback
- scripts/quota_watchdog.py: CLI entry point with --status,
  --dry-run, --quiet-unchanged (default in non-tty cron mode)
- Config: quota section with thresholds in config.yaml
- Cron: 30-min no_agent watchdog job (fb918d5e5dd1)
2026-05-29 16:19:13 +00:00
5 changed files with 468 additions and 271 deletions

View File

@ -580,13 +580,6 @@ class ContextCompressor(ContextEngine):
self.summary_model = summary_model_override or ""
# Compression-model fallback: set by check_compression_model_feasibility
# when the primary aux compression model fails the minimum context check.
# If set, _generate_summary uses this provider/model for the LLM call
# instead of the main compressor attributes. Dict keys:
# provider, model, base_url, api_key
self._compression_fallback: Optional[Dict[str, str]] = None
# Stores the previous compaction summary for iterative updates
self._previous_summary: Optional[str] = None
# Anti-thrashing: track whether last compression was effective
@ -1076,20 +1069,6 @@ The user has requested that this compaction PRIORITISE preserving all informatio
}
if self.summary_model:
call_kwargs["model"] = self.summary_model
# Compression-model fallback: when the primary aux compression
# model was rejected for insufficient context, the feasibility
# check stored a replacement provider/model here. Override the
# entire main_runtime so call_llm routes the summary request to
# the fallback provider instead of the main one.
if self._compression_fallback:
_fb = self._compression_fallback
call_kwargs["main_runtime"] = {
"model": _fb["model"],
"provider": _fb["provider"],
"base_url": _fb.get("base_url", ""),
"api_key": _fb.get("api_key", ""),
"api_mode": _fb.get("api_mode", self.api_mode),
}
response = call_llm(**call_kwargs)
content = response.choices[0].message.content
# Handle cases where content is not a string (e.g., dict from llama.cpp)

View File

@ -221,101 +221,9 @@ def check_compression_model_feasibility(agent: Any) -> None:
new_threshold,
)
except ValueError:
# Primary compression model failed the minimum context check
# (context_length < MINIMUM_CONTEXT_LENGTH). Before giving up,
# try the user's fallback provider chain so a model switch or
# provider outage doesn't silently disable compression.
_fallback_chain = getattr(agent, '_fallback_chain', None) or []
_tried = [f"{aux_model} ({_aux_cfg_provider or 'auto'}): {aux_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"]
for _fb_entry in _fallback_chain:
_fb_provider = _fb_entry.get("provider", "")
_fb_model = _fb_entry.get("model", "")
if not _fb_provider or not _fb_model:
continue
try:
from agent.auxiliary_client import resolve_provider_client
_fb_client, _fb_resolved_model = resolve_provider_client(
_fb_provider,
_fb_model,
explicit_base_url=_fb_entry.get("base_url", ""),
explicit_api_key=_fb_entry.get("api_key", ""),
main_runtime=agent._current_main_runtime(),
)
if _fb_client is None or not _fb_resolved_model:
_tried.append(f"{_fb_model} ({_fb_provider}): unavailable")
continue
_fb_base_url = str(getattr(_fb_client, "base_url", ""))
_fb_api_key_raw = getattr(_fb_client, "api_key", "")
_fb_api_key = (
""
if callable(_fb_api_key_raw) and not isinstance(_fb_api_key_raw, str)
else str(_fb_api_key_raw or "")
)
_fb_context = get_model_context_length(
_fb_resolved_model,
base_url=_fb_base_url,
api_key=_fb_api_key,
provider=_fb_provider,
custom_providers=getattr(agent, "_custom_providers", None),
)
if _fb_context and _fb_context < MINIMUM_CONTEXT_LENGTH:
_tried.append(
f"{_fb_resolved_model} ({_fb_provider}): "
f"{_fb_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"
)
continue
# ── Found a suitable fallback ──────────────────────────
logger.warning(
"Compression model %s (%s) has only %d token context "
"(minimum %d). Falling back to %s (%s) with %d token context.",
aux_model, _aux_cfg_provider or "auto", aux_context,
MINIMUM_CONTEXT_LENGTH, _fb_resolved_model, _fb_provider,
_fb_context or 0,
)
agent.context_compressor._compression_fallback = {
"provider": _fb_provider,
"model": _fb_resolved_model,
"base_url": _fb_base_url,
"api_key": _fb_api_key,
}
_msg = (
f"⚠ Compression model {aux_model} has only "
f"{aux_context:,} token context (minimum "
f"{MINIMUM_CONTEXT_LENGTH:,} required). "
f"Falling back to {_fb_resolved_model} ({_fb_provider}) "
f"for summaries."
)
agent._compression_warning = _msg
agent._emit_status(_msg)
return
except Exception as _fb_err:
_tried.append(f"{_fb_model} ({_fb_provider}): {_fb_err}")
continue
# No fallback worked — warn and let compression run without
# summaries (same behavior as 'no auxiliary LLM' above).
_all_tried = "; ".join(_tried)
_msg = (
f"⚠ No suitable compression model available. "
f"Tried: {_all_tried}. "
f"Compression will drop middle turns without summaries. "
f"Run `hermes setup` or set "
f"auxiliary.compression.model in config.yaml."
)
agent._compression_warning = _msg
agent._emit_status(_msg)
logger.warning("Compression model fallback exhausted: %s", _all_tried)
return
# Hard rejections (aux below minimum context) must propagate
# so the session refuses to start.
raise
except Exception as exc:
logger.debug(
"Compression feasibility check failed (non-fatal): %s", exc

409
agent/quota_watchdog.py Normal file
View File

@ -0,0 +1,409 @@
"""Quota watchdog for paid coding providers.
The watchdog combines provider account usage APIs (where available) with the
local SessionDB token ledger. It is intentionally conservative: once a
monitored provider crosses the critical threshold it persists a local fallback
model in config.yaml so future non-explicit sessions start on the cheap local
runtime instead of burning the last paid tokens.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
import calendar
import json
import os
import sqlite3
from typing import Any, Iterable, Optional
from urllib.parse import urlparse
from hermes_constants import get_hermes_home
from hermes_cli.config import cfg_get, load_config, read_raw_config, save_config
DEFAULT_WARNING_PCT = 80.0
DEFAULT_CRITICAL_PCT = 95.0
DEFAULT_FALLBACK_PROVIDER = "custom:gemma-local"
DEFAULT_FALLBACK_MODEL = "gemma-local"
DEFAULT_FALLBACK_BASE_URL = "http://127.0.0.1:8081/v1"
DEFAULT_MONTHLY_LIMITS = {
# OpenCode Go currently exposes an OpenAI-compatible inference endpoint but
# no documented quota endpoint. The user-facing plan is ~10M tokens/month,
# so use SessionDB accounting against that cap until an API appears.
"opencode-go": 10_000_000,
# Codex has an account-usage endpoint. If it is unavailable, SessionDB still
# reports raw monthly tokens but no percentage unless the user configures a
# monthly limit.
"openai-codex": 0,
}
OPENCODE_HOSTS = {"opencode.ai", "www.opencode.ai"}
STATE_FILE = "quota_watchdog_state.json"
@dataclass(frozen=True)
class ProviderQuota:
provider: str
source: str
used_percent: Optional[float]
reset_at: Optional[datetime] = None
used_tokens: int = 0
limit_tokens: int = 0
detail: str = ""
available: bool = True
@property
def level(self) -> str:
if self.used_percent is None:
return "unknown"
if self.used_percent >= DEFAULT_CRITICAL_PCT:
return "critical"
if self.used_percent >= DEFAULT_WARNING_PCT:
return "warning"
return "ok"
@dataclass(frozen=True)
class WatchdogResult:
quotas: tuple[ProviderQuota, ...]
warning_pct: float
critical_pct: float
fallback_provider: str
fallback_model: str
forced_fallback: bool
switched: bool = False
alerts: tuple[str, ...] = ()
errors: tuple[str, ...] = field(default_factory=tuple)
@property
def worst(self) -> Optional[ProviderQuota]:
ranked = {"critical": 3, "warning": 2, "ok": 1, "unknown": 0}
return max(self.quotas, key=lambda q: ranked.get(q.level, 0), default=None)
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
def _month_bounds(now: Optional[datetime] = None) -> tuple[datetime, datetime]:
now = now or _utc_now()
start = datetime(now.year, now.month, 1, tzinfo=timezone.utc)
if now.month == 12:
end = datetime(now.year + 1, 1, 1, tzinfo=timezone.utc)
else:
end = datetime(now.year, now.month + 1, 1, tzinfo=timezone.utc)
return start, end
def eta_reset_hours(reset_at: Optional[datetime]) -> Optional[int]:
if not reset_at:
return None
seconds = max(0, int((reset_at - _utc_now()).total_seconds()))
return int((seconds + 3599) // 3600)
def _session_db_path() -> Path:
return get_hermes_home() / "state.db"
def _provider_matches(provider: str, billing_provider: str, billing_base_url: str, model: str) -> bool:
provider = provider.lower()
bp = (billing_provider or "").lower()
base = (billing_base_url or "").lower()
model_l = (model or "").lower()
host = urlparse(base).hostname or ""
if provider == "opencode-go":
return "opencode" in bp or host in OPENCODE_HOSTS or "opencode.ai" in base or "opencode" in model_l
if provider == "openai-codex":
return bp == "openai-codex" or "codex" in bp or "codex" in base or "codex" in model_l
return provider in bp or provider in base or provider in model_l
def _sessiondb_monthly_tokens(provider: str, now: Optional[datetime] = None) -> int:
db_path = _session_db_path()
if not db_path.exists():
return 0
start, end = _month_bounds(now)
start_ts = start.timestamp()
end_ts = end.timestamp()
total = 0
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT model, billing_provider, billing_base_url,
COALESCE(input_tokens, 0) AS input_tokens,
COALESCE(output_tokens, 0) AS output_tokens,
COALESCE(cache_read_tokens, 0) AS cache_read_tokens,
COALESCE(cache_write_tokens, 0) AS cache_write_tokens,
COALESCE(reasoning_tokens, 0) AS reasoning_tokens
FROM sessions
WHERE started_at >= ? AND started_at < ?
""",
(start_ts, end_ts),
).fetchall()
finally:
conn.close()
except sqlite3.Error:
return 0
for row in rows:
if _provider_matches(provider, row["billing_provider"], row["billing_base_url"], row["model"]):
total += int(row["input_tokens"] or 0)
total += int(row["output_tokens"] or 0)
total += int(row["cache_read_tokens"] or 0)
total += int(row["cache_write_tokens"] or 0)
total += int(row["reasoning_tokens"] or 0)
return total
def _config_section(config: dict[str, Any]) -> dict[str, Any]:
section = config.get("quota")
return section if isinstance(section, dict) else {}
def _monthly_limits(config: dict[str, Any]) -> dict[str, int]:
raw = cfg_get(config, "quota", "monthly_token_limits", default={})
limits = dict(DEFAULT_MONTHLY_LIMITS)
if isinstance(raw, dict):
for key, value in raw.items():
try:
limits[str(key).strip()] = max(0, int(value or 0))
except (TypeError, ValueError):
continue
return limits
def _monitor_providers(config: dict[str, Any]) -> tuple[str, ...]:
raw = cfg_get(config, "quota", "providers", default=("opencode-go", "openai-codex"))
if isinstance(raw, str):
raw = [raw]
if not isinstance(raw, Iterable):
return ("opencode-go", "openai-codex")
providers = tuple(str(p).strip().lower() for p in raw if str(p).strip())
return providers or ("opencode-go", "openai-codex")
def _next_month_reset() -> datetime:
return _month_bounds()[1]
def _quota_from_sessiondb(provider: str, config: dict[str, Any]) -> ProviderQuota:
used = _sessiondb_monthly_tokens(provider)
limit = _monthly_limits(config).get(provider, 0)
pct = (used / limit * 100.0) if limit else None
detail = "SessionDB monthly token ledger"
if not limit:
detail += " (set quota.monthly_token_limits.%s for percent thresholds)" % provider
return ProviderQuota(
provider=provider,
source="sessiondb",
used_percent=pct,
reset_at=_next_month_reset(),
used_tokens=used,
limit_tokens=limit,
detail=detail,
available=True,
)
def _quota_from_codex_api(config: dict[str, Any]) -> Optional[ProviderQuota]:
try:
from agent.account_usage import fetch_account_usage
snapshot = fetch_account_usage("openai-codex")
except Exception:
return None
if not snapshot or not snapshot.windows:
return None
# Prefer the most depleted window; Codex exposes session/week windows rather
# than a monthly token bucket, but this still catches imminent exhaustion.
windows = [w for w in snapshot.windows if w.used_percent is not None]
if not windows:
return None
worst = max(windows, key=lambda w: float(w.used_percent or 0))
return ProviderQuota(
provider="openai-codex",
source=snapshot.source or "usage_api",
used_percent=float(worst.used_percent or 0),
reset_at=worst.reset_at,
detail=f"{worst.label} account window",
available=True,
)
def collect_quotas(config: Optional[dict[str, Any]] = None) -> tuple[ProviderQuota, ...]:
config = config or load_config()
quotas: list[ProviderQuota] = []
for provider in _monitor_providers(config):
quota: Optional[ProviderQuota] = None
if provider == "openai-codex":
quota = _quota_from_codex_api(config)
# OpenCode Go: no documented quota API found on the OpenAI-compatible
# endpoint. Use SessionDB monthly tracking.
if quota is None:
quota = _quota_from_sessiondb(provider, config)
quotas.append(quota)
return tuple(quotas)
def _configured_thresholds(config: dict[str, Any]) -> tuple[float, float]:
def as_pct(path: str, default: float) -> float:
value = cfg_get(config, "quota", path, default=default)
try:
return max(0.0, min(100.0, float(value)))
except (TypeError, ValueError):
return default
return as_pct("warning_pct", DEFAULT_WARNING_PCT), as_pct("critical_pct", DEFAULT_CRITICAL_PCT)
def quota_fallback_active(config: Optional[dict[str, Any]] = None) -> bool:
config = config or load_config()
if os.getenv("HERMES_QUOTA_BYPASS") or os.getenv("HERMES_QUOTA_CRITICAL"):
return False
return bool(cfg_get(config, "quota", "force_fallback", default=False))
def quota_fallback_provider(config: Optional[dict[str, Any]] = None) -> str:
config = config or load_config()
return str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER)
def _ensure_gemma_provider(config: dict[str, Any], provider: str, model: str, base_url: str) -> None:
if not provider.startswith("custom:"):
return
name = provider.split(":", 1)[1]
providers = config.setdefault("providers", {})
if not isinstance(providers, dict):
providers = {}
config["providers"] = providers
entry = providers.get(name)
if not isinstance(entry, dict):
providers[name] = {
"name": name,
"base_url": base_url,
"default_model": model,
"api_key": "no-key-required",
"transport": "chat_completions",
}
def _persist_forced_fallback(raw_config: dict[str, Any], *, provider: str, model: str, base_url: str) -> bool:
quota = raw_config.setdefault("quota", {})
if not isinstance(quota, dict):
quota = {}
raw_config["quota"] = quota
already = bool(quota.get("force_fallback"))
quota["force_fallback"] = True
quota["forced_at"] = _utc_now().isoformat()
quota["fallback_provider"] = provider
quota["fallback_model"] = model
quota["fallback_base_url"] = base_url
_ensure_gemma_provider(raw_config, provider, model, base_url)
save_config(raw_config)
return not already
def _alert_key(quota: ProviderQuota, level: str) -> str:
reset = quota.reset_at.isoformat() if quota.reset_at else "unknown"
pct = int(quota.used_percent or 0)
return f"{quota.provider}:{level}:{pct // 5 * 5}:{reset}"
def _state_path() -> Path:
return get_hermes_home() / STATE_FILE
def _read_state() -> dict[str, Any]:
try:
return json.loads(_state_path().read_text(encoding="utf-8"))
except Exception:
return {}
def _write_state(state: dict[str, Any]) -> None:
path = _state_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
def evaluate_and_apply(*, apply: bool = True, quiet_unchanged: bool = False) -> WatchdogResult:
config = load_config()
raw_config = read_raw_config() or {}
warning_pct, critical_pct = _configured_thresholds(config)
fallback_provider = str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER)
fallback_model = str(cfg_get(config, "quota", "fallback_model", default=DEFAULT_FALLBACK_MODEL) or DEFAULT_FALLBACK_MODEL)
fallback_base_url = str(cfg_get(config, "quota", "fallback_base_url", default=DEFAULT_FALLBACK_BASE_URL) or DEFAULT_FALLBACK_BASE_URL)
quotas = collect_quotas(config)
state = _read_state()
emitted = set(state.get("emitted_alert_keys") or [])
alerts: list[str] = []
switched = False
for quota in quotas:
if quota.used_percent is None:
continue
level = "critical" if quota.used_percent >= critical_pct else "warning" if quota.used_percent >= warning_pct else "ok"
if level == "ok":
continue
eta = eta_reset_hours(quota.reset_at)
eta_text = f"{eta} hours" if eta is not None else "unknown"
switch_text = "switching to fallback" if level == "critical" else "recommend fallback"
msg = f"Quota at {quota.used_percent:.0f}% for {quota.provider}, {switch_text}. ETA reset: {eta_text}"
key = _alert_key(quota, level)
if key not in emitted or not quiet_unchanged:
alerts.append(msg)
emitted.add(key)
if level == "critical" and apply:
switched = _persist_forced_fallback(raw_config, provider=fallback_provider, model=fallback_model, base_url=fallback_base_url) or switched
state["emitted_alert_keys"] = sorted(emitted)[-100:]
state["last_checked_at"] = _utc_now().isoformat()
state["last_quotas"] = [quota.__dict__ | {"reset_at": quota.reset_at.isoformat() if quota.reset_at else None} for quota in quotas]
if apply:
_write_state(state)
forced = bool(cfg_get(load_config(), "quota", "force_fallback", default=False))
return WatchdogResult(
quotas=quotas,
warning_pct=warning_pct,
critical_pct=critical_pct,
fallback_provider=fallback_provider,
fallback_model=fallback_model,
forced_fallback=forced,
switched=switched,
alerts=tuple(alerts),
)
def format_status(result: WatchdogResult, *, markdown: bool = False) -> list[str]:
lines = ["🧯 Quota watchdog"]
lines.append(f"Thresholds: warning {result.warning_pct:.0f}%, critical {result.critical_pct:.0f}%")
lines.append(f"Fallback: {result.fallback_model} via {result.fallback_provider} ({'forced' if result.forced_fallback else 'standby'})")
for quota in result.quotas:
if quota.used_percent is None:
pct = "unknown"
else:
pct = f"{quota.used_percent:.0f}% used"
token_part = ""
if quota.limit_tokens:
token_part = f"{quota.used_tokens:,}/{quota.limit_tokens:,} tokens"
reset = eta_reset_hours(quota.reset_at)
reset_part = f" • reset in {reset}h" if reset is not None else ""
lines.append(f"{quota.provider}: {pct} ({quota.source}){token_part}{reset_part}")
if result.alerts:
lines.append("Alerts:")
lines.extend(f"- {alert}" for alert in result.alerts)
return lines
__all__ = [
"ProviderQuota",
"WatchdogResult",
"collect_quotas",
"evaluate_and_apply",
"format_status",
"quota_fallback_active",
"quota_fallback_provider",
]

47
scripts/quota_watchdog.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""Run Hermes quota watchdog.
This script is safe for cron: with --quiet-unchanged it prints only new
warning/critical alerts, so no_agent cron jobs stay silent when nothing changed.
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
# Support running from ~/.hermes/scripts/quota_watchdog.py when Hermes is not
# installed as an editable package but the source checkout exists in the usual
# location.
REPO = Path.home() / ".hermes" / "hermes-agent"
if REPO.exists() and str(REPO) not in sys.path:
sys.path.insert(0, str(REPO))
def main() -> int:
parser = argparse.ArgumentParser(description="Check paid provider quotas and force local fallback at critical threshold.")
parser.add_argument("--dry-run", action="store_true", help="do not write config/state")
parser.add_argument("--quiet-unchanged", action="store_true", help="only print newly emitted alerts")
parser.add_argument("--status", action="store_true", help="always print the current quota status")
args = parser.parse_args()
# In cron/non-interactive mode, default to quiet-unchanged to avoid spamming
if not sys.stdin.isatty() and not args.status:
args.quiet_unchanged = True
from agent.quota_watchdog import evaluate_and_apply, format_status
result = evaluate_and_apply(apply=not args.dry_run, quiet_unchanged=args.quiet_unchanged)
lines: list[str]
if args.status or not args.quiet_unchanged:
lines = format_status(result)
else:
lines = list(result.alerts)
if result.switched and not lines:
lines = [f"Quota critical; forced fallback to {result.fallback_model} via {result.fallback_provider}"]
if lines:
print("\n".join(lines))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -57,7 +57,6 @@ def _make_agent(
compressor = MagicMock(spec=ContextCompressor)
compressor.context_length = main_context
compressor.threshold_tokens = int(main_context * threshold_percent)
compressor._compression_fallback = None
agent.context_compressor = compressor
return agent
@ -102,169 +101,24 @@ def test_auto_corrects_threshold_when_aux_context_below_threshold(mock_get_clien
@patch("agent.model_metadata.get_model_context_length", return_value=32_768)
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_rejects_aux_below_minimum_context(mock_get_client, mock_ctx_len):
"""When aux context < MINIMUM_CONTEXT_LENGTH (64K) and no fallback
providers are configured, a warning is emitted and compression will
operate without summaries. Previously this raised ValueError; now it
degrades gracefully so a model switch doesn't kill the session."""
"""Hard floor: aux context < MINIMUM_CONTEXT_LENGTH (64K) → session
refuses to start (ValueError), mirroring the main-model rejection."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_client = MagicMock()
mock_client.base_url = "https://openrouter.ai/api/v1"
mock_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_client, "tiny-aux-model")
messages = []
agent._emit_status = lambda msg: messages.append(msg)
agent._emit_status = lambda msg: None
# No fallback chain → should warn, not raise
agent._fallback_chain = []
agent._check_compression_model_feasibility()
assert len(messages) == 1
assert "No suitable compression model" in messages[0]
assert "tiny-aux-model" in messages[0]
assert "32,768" in messages[0]
assert "64,000" in messages[0]
assert agent._compression_warning is not None
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_falls_back_to_chain_when_aux_below_minimum(mock_get_client, mock_ctx_len):
"""When the primary aux model fails the context-length floor, the
feasibility check tries each fallback provider in order, using the
first one that meets MINIMUM_CONTEXT_LENGTH."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
# Primary aux model: too small (32K)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny-aux-model")
# Fallback chain: two providers, first one meets the floor
agent._fallback_chain = [
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
{"provider": "custom", "model": "gemma-local",
"base_url": "http://127.0.0.1:8081/v1", "api_key": "no-key"},
]
# Mock resolve_provider_client for the fallback resolution
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://api.opencode.ai/v1"
mock_fb_client.api_key = "sk-fallback"
# get_model_context_length: first return 32K (primary fail),
# then return 128K (fallback success)
mock_ctx_len.side_effect = [32_768, 128_000]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
with patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_fb_client, "deepseek-v4-pro")) as mock_resolve:
with pytest.raises(ValueError) as exc_info:
agent._check_compression_model_feasibility()
# Should have resolved the fallback provider
mock_resolve.assert_called_once()
# First two positional args: provider, model
assert mock_resolve.call_args[0][0] == "opencode_go"
assert mock_resolve.call_args[0][1] == "deepseek-v4-pro"
# Warning should mention the fallback choice
assert len(messages) == 1
assert "Falling back to" in messages[0]
assert "deepseek-v4-pro" in messages[0]
assert "opencode_go" in messages[0]
# Fallback dict stored on compressor
fb = agent.context_compressor._compression_fallback
assert fb is not None
assert fb["provider"] == "opencode_go"
assert fb["model"] == "deepseek-v4-pro"
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_falls_back_past_unavailable_provider(mock_get_client, mock_ctx_len):
"""When the first fallback provider is unavailable, skip it and
try the next one."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny")
# Fallback chain: first unavailable, second works
agent._fallback_chain = [
{"provider": "broken-provider", "model": "broken-model"},
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
]
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://api.opencode.ai/v1"
mock_fb_client.api_key = "sk-fallback"
# Primary: 32K (fail), broken-provider: unavailable, opencode_go: 128K
mock_ctx_len.side_effect = [32_768, None, 128_000]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
# First resolve returns None (unavailable), second returns client
mock_resolve_values = [(None, None), (mock_fb_client, "deepseek-v4-pro")]
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=mock_resolve_values) as mock_resolve:
agent._check_compression_model_feasibility()
# Should have tried both fallbacks
assert mock_resolve.call_count == 2
# Should succeed with the second fallback
fb = agent.context_compressor._compression_fallback
assert fb is not None
assert fb["provider"] == "opencode_go"
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_warns_when_all_fallbacks_exhausted(mock_get_client, mock_ctx_len):
"""When every fallback provider also fails the context floor or is
unavailable, emit a warning and degrade to no-summary mode without
raising."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny-main")
agent._fallback_chain = [
{"provider": "small-provider", "model": "small-model"},
]
# Fallback also too small
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://small.api/v1"
mock_fb_client.api_key = "sk-small"
mock_ctx_len.side_effect = [32_768, 16_384]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
# Mock compressor won't have _compression_fallback until set —
# initialize it so the final assertion works.
agent.context_compressor._compression_fallback = None
with patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_fb_client, "small-model")):
agent._check_compression_model_feasibility()
assert len(messages) == 1
assert "No suitable compression model" in messages[0]
assert "small-model" in messages[0]
assert agent._compression_warning is not None
# No fallback on compressor
assert agent.context_compressor._compression_fallback is None
err = str(exc_info.value)
assert "tiny-aux-model" in err
assert "32,768" in err
assert "64,000" in err
assert "below the minimum" in err
@patch("agent.model_metadata.get_model_context_length", return_value=200_000)