diff --git a/agent/quota_watchdog.py b/agent/quota_watchdog.py new file mode 100644 index 000000000..31144467d --- /dev/null +++ b/agent/quota_watchdog.py @@ -0,0 +1,409 @@ +"""Quota watchdog for paid coding providers. + +The watchdog combines provider account usage APIs (where available) with the +local SessionDB token ledger. It is intentionally conservative: once a +monitored provider crosses the critical threshold it persists a local fallback +model in config.yaml so future non-explicit sessions start on the cheap local +runtime instead of burning the last paid tokens. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +import calendar +import json +import os +import sqlite3 +from typing import Any, Iterable, Optional +from urllib.parse import urlparse + +from hermes_constants import get_hermes_home +from hermes_cli.config import cfg_get, load_config, read_raw_config, save_config + + +DEFAULT_WARNING_PCT = 80.0 +DEFAULT_CRITICAL_PCT = 95.0 +DEFAULT_FALLBACK_PROVIDER = "custom:gemma-local" +DEFAULT_FALLBACK_MODEL = "gemma-local" +DEFAULT_FALLBACK_BASE_URL = "http://127.0.0.1:8081/v1" +DEFAULT_MONTHLY_LIMITS = { + # OpenCode Go currently exposes an OpenAI-compatible inference endpoint but + # no documented quota endpoint. The user-facing plan is ~10M tokens/month, + # so use SessionDB accounting against that cap until an API appears. + "opencode-go": 10_000_000, + # Codex has an account-usage endpoint. If it is unavailable, SessionDB still + # reports raw monthly tokens but no percentage unless the user configures a + # monthly limit. + "openai-codex": 0, +} +OPENCODE_HOSTS = {"opencode.ai", "www.opencode.ai"} +STATE_FILE = "quota_watchdog_state.json" + + +@dataclass(frozen=True) +class ProviderQuota: + provider: str + source: str + used_percent: Optional[float] + reset_at: Optional[datetime] = None + used_tokens: int = 0 + limit_tokens: int = 0 + detail: str = "" + available: bool = True + + @property + def level(self) -> str: + if self.used_percent is None: + return "unknown" + if self.used_percent >= DEFAULT_CRITICAL_PCT: + return "critical" + if self.used_percent >= DEFAULT_WARNING_PCT: + return "warning" + return "ok" + + +@dataclass(frozen=True) +class WatchdogResult: + quotas: tuple[ProviderQuota, ...] + warning_pct: float + critical_pct: float + fallback_provider: str + fallback_model: str + forced_fallback: bool + switched: bool = False + alerts: tuple[str, ...] = () + errors: tuple[str, ...] = field(default_factory=tuple) + + @property + def worst(self) -> Optional[ProviderQuota]: + ranked = {"critical": 3, "warning": 2, "ok": 1, "unknown": 0} + return max(self.quotas, key=lambda q: ranked.get(q.level, 0), default=None) + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _month_bounds(now: Optional[datetime] = None) -> tuple[datetime, datetime]: + now = now or _utc_now() + start = datetime(now.year, now.month, 1, tzinfo=timezone.utc) + if now.month == 12: + end = datetime(now.year + 1, 1, 1, tzinfo=timezone.utc) + else: + end = datetime(now.year, now.month + 1, 1, tzinfo=timezone.utc) + return start, end + + +def eta_reset_hours(reset_at: Optional[datetime]) -> Optional[int]: + if not reset_at: + return None + seconds = max(0, int((reset_at - _utc_now()).total_seconds())) + return int((seconds + 3599) // 3600) + + +def _session_db_path() -> Path: + return get_hermes_home() / "state.db" + + +def _provider_matches(provider: str, billing_provider: str, billing_base_url: str, model: str) -> bool: + provider = provider.lower() + bp = (billing_provider or "").lower() + base = (billing_base_url or "").lower() + model_l = (model or "").lower() + host = urlparse(base).hostname or "" + if provider == "opencode-go": + return "opencode" in bp or host in OPENCODE_HOSTS or "opencode.ai" in base or "opencode" in model_l + if provider == "openai-codex": + return bp == "openai-codex" or "codex" in bp or "codex" in base or "codex" in model_l + return provider in bp or provider in base or provider in model_l + + +def _sessiondb_monthly_tokens(provider: str, now: Optional[datetime] = None) -> int: + db_path = _session_db_path() + if not db_path.exists(): + return 0 + start, end = _month_bounds(now) + start_ts = start.timestamp() + end_ts = end.timestamp() + total = 0 + try: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """ + SELECT model, billing_provider, billing_base_url, + COALESCE(input_tokens, 0) AS input_tokens, + COALESCE(output_tokens, 0) AS output_tokens, + COALESCE(cache_read_tokens, 0) AS cache_read_tokens, + COALESCE(cache_write_tokens, 0) AS cache_write_tokens, + COALESCE(reasoning_tokens, 0) AS reasoning_tokens + FROM sessions + WHERE started_at >= ? AND started_at < ? + """, + (start_ts, end_ts), + ).fetchall() + finally: + conn.close() + except sqlite3.Error: + return 0 + for row in rows: + if _provider_matches(provider, row["billing_provider"], row["billing_base_url"], row["model"]): + total += int(row["input_tokens"] or 0) + total += int(row["output_tokens"] or 0) + total += int(row["cache_read_tokens"] or 0) + total += int(row["cache_write_tokens"] or 0) + total += int(row["reasoning_tokens"] or 0) + return total + + +def _config_section(config: dict[str, Any]) -> dict[str, Any]: + section = config.get("quota") + return section if isinstance(section, dict) else {} + + +def _monthly_limits(config: dict[str, Any]) -> dict[str, int]: + raw = cfg_get(config, "quota", "monthly_token_limits", default={}) + limits = dict(DEFAULT_MONTHLY_LIMITS) + if isinstance(raw, dict): + for key, value in raw.items(): + try: + limits[str(key).strip()] = max(0, int(value or 0)) + except (TypeError, ValueError): + continue + return limits + + +def _monitor_providers(config: dict[str, Any]) -> tuple[str, ...]: + raw = cfg_get(config, "quota", "providers", default=("opencode-go", "openai-codex")) + if isinstance(raw, str): + raw = [raw] + if not isinstance(raw, Iterable): + return ("opencode-go", "openai-codex") + providers = tuple(str(p).strip().lower() for p in raw if str(p).strip()) + return providers or ("opencode-go", "openai-codex") + + +def _next_month_reset() -> datetime: + return _month_bounds()[1] + + +def _quota_from_sessiondb(provider: str, config: dict[str, Any]) -> ProviderQuota: + used = _sessiondb_monthly_tokens(provider) + limit = _monthly_limits(config).get(provider, 0) + pct = (used / limit * 100.0) if limit else None + detail = "SessionDB monthly token ledger" + if not limit: + detail += " (set quota.monthly_token_limits.%s for percent thresholds)" % provider + return ProviderQuota( + provider=provider, + source="sessiondb", + used_percent=pct, + reset_at=_next_month_reset(), + used_tokens=used, + limit_tokens=limit, + detail=detail, + available=True, + ) + + +def _quota_from_codex_api(config: dict[str, Any]) -> Optional[ProviderQuota]: + try: + from agent.account_usage import fetch_account_usage + + snapshot = fetch_account_usage("openai-codex") + except Exception: + return None + if not snapshot or not snapshot.windows: + return None + # Prefer the most depleted window; Codex exposes session/week windows rather + # than a monthly token bucket, but this still catches imminent exhaustion. + windows = [w for w in snapshot.windows if w.used_percent is not None] + if not windows: + return None + worst = max(windows, key=lambda w: float(w.used_percent or 0)) + return ProviderQuota( + provider="openai-codex", + source=snapshot.source or "usage_api", + used_percent=float(worst.used_percent or 0), + reset_at=worst.reset_at, + detail=f"{worst.label} account window", + available=True, + ) + + +def collect_quotas(config: Optional[dict[str, Any]] = None) -> tuple[ProviderQuota, ...]: + config = config or load_config() + quotas: list[ProviderQuota] = [] + for provider in _monitor_providers(config): + quota: Optional[ProviderQuota] = None + if provider == "openai-codex": + quota = _quota_from_codex_api(config) + # OpenCode Go: no documented quota API found on the OpenAI-compatible + # endpoint. Use SessionDB monthly tracking. + if quota is None: + quota = _quota_from_sessiondb(provider, config) + quotas.append(quota) + return tuple(quotas) + + +def _configured_thresholds(config: dict[str, Any]) -> tuple[float, float]: + def as_pct(path: str, default: float) -> float: + value = cfg_get(config, "quota", path, default=default) + try: + return max(0.0, min(100.0, float(value))) + except (TypeError, ValueError): + return default + + return as_pct("warning_pct", DEFAULT_WARNING_PCT), as_pct("critical_pct", DEFAULT_CRITICAL_PCT) + + +def quota_fallback_active(config: Optional[dict[str, Any]] = None) -> bool: + config = config or load_config() + if os.getenv("HERMES_QUOTA_BYPASS") or os.getenv("HERMES_QUOTA_CRITICAL"): + return False + return bool(cfg_get(config, "quota", "force_fallback", default=False)) + + +def quota_fallback_provider(config: Optional[dict[str, Any]] = None) -> str: + config = config or load_config() + return str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER) + + +def _ensure_gemma_provider(config: dict[str, Any], provider: str, model: str, base_url: str) -> None: + if not provider.startswith("custom:"): + return + name = provider.split(":", 1)[1] + providers = config.setdefault("providers", {}) + if not isinstance(providers, dict): + providers = {} + config["providers"] = providers + entry = providers.get(name) + if not isinstance(entry, dict): + providers[name] = { + "name": name, + "base_url": base_url, + "default_model": model, + "api_key": "no-key-required", + "transport": "chat_completions", + } + + +def _persist_forced_fallback(raw_config: dict[str, Any], *, provider: str, model: str, base_url: str) -> bool: + quota = raw_config.setdefault("quota", {}) + if not isinstance(quota, dict): + quota = {} + raw_config["quota"] = quota + already = bool(quota.get("force_fallback")) + quota["force_fallback"] = True + quota["forced_at"] = _utc_now().isoformat() + quota["fallback_provider"] = provider + quota["fallback_model"] = model + quota["fallback_base_url"] = base_url + _ensure_gemma_provider(raw_config, provider, model, base_url) + save_config(raw_config) + return not already + + +def _alert_key(quota: ProviderQuota, level: str) -> str: + reset = quota.reset_at.isoformat() if quota.reset_at else "unknown" + pct = int(quota.used_percent or 0) + return f"{quota.provider}:{level}:{pct // 5 * 5}:{reset}" + + +def _state_path() -> Path: + return get_hermes_home() / STATE_FILE + + +def _read_state() -> dict[str, Any]: + try: + return json.loads(_state_path().read_text(encoding="utf-8")) + except Exception: + return {} + + +def _write_state(state: dict[str, Any]) -> None: + path = _state_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8") + + +def evaluate_and_apply(*, apply: bool = True, quiet_unchanged: bool = False) -> WatchdogResult: + config = load_config() + raw_config = read_raw_config() or {} + warning_pct, critical_pct = _configured_thresholds(config) + fallback_provider = str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER) + fallback_model = str(cfg_get(config, "quota", "fallback_model", default=DEFAULT_FALLBACK_MODEL) or DEFAULT_FALLBACK_MODEL) + fallback_base_url = str(cfg_get(config, "quota", "fallback_base_url", default=DEFAULT_FALLBACK_BASE_URL) or DEFAULT_FALLBACK_BASE_URL) + quotas = collect_quotas(config) + state = _read_state() + emitted = set(state.get("emitted_alert_keys") or []) + alerts: list[str] = [] + switched = False + + for quota in quotas: + if quota.used_percent is None: + continue + level = "critical" if quota.used_percent >= critical_pct else "warning" if quota.used_percent >= warning_pct else "ok" + if level == "ok": + continue + eta = eta_reset_hours(quota.reset_at) + eta_text = f"{eta} hours" if eta is not None else "unknown" + switch_text = "switching to fallback" if level == "critical" else "recommend fallback" + msg = f"Quota at {quota.used_percent:.0f}% for {quota.provider}, {switch_text}. ETA reset: {eta_text}" + key = _alert_key(quota, level) + if key not in emitted or not quiet_unchanged: + alerts.append(msg) + emitted.add(key) + if level == "critical" and apply: + switched = _persist_forced_fallback(raw_config, provider=fallback_provider, model=fallback_model, base_url=fallback_base_url) or switched + + state["emitted_alert_keys"] = sorted(emitted)[-100:] + state["last_checked_at"] = _utc_now().isoformat() + state["last_quotas"] = [quota.__dict__ | {"reset_at": quota.reset_at.isoformat() if quota.reset_at else None} for quota in quotas] + if apply: + _write_state(state) + forced = bool(cfg_get(load_config(), "quota", "force_fallback", default=False)) + return WatchdogResult( + quotas=quotas, + warning_pct=warning_pct, + critical_pct=critical_pct, + fallback_provider=fallback_provider, + fallback_model=fallback_model, + forced_fallback=forced, + switched=switched, + alerts=tuple(alerts), + ) + + +def format_status(result: WatchdogResult, *, markdown: bool = False) -> list[str]: + lines = ["🧯 Quota watchdog"] + lines.append(f"Thresholds: warning {result.warning_pct:.0f}%, critical {result.critical_pct:.0f}%") + lines.append(f"Fallback: {result.fallback_model} via {result.fallback_provider} ({'forced' if result.forced_fallback else 'standby'})") + for quota in result.quotas: + if quota.used_percent is None: + pct = "unknown" + else: + pct = f"{quota.used_percent:.0f}% used" + token_part = "" + if quota.limit_tokens: + token_part = f" • {quota.used_tokens:,}/{quota.limit_tokens:,} tokens" + reset = eta_reset_hours(quota.reset_at) + reset_part = f" • reset in {reset}h" if reset is not None else "" + lines.append(f"{quota.provider}: {pct} ({quota.source}){token_part}{reset_part}") + if result.alerts: + lines.append("Alerts:") + lines.extend(f"- {alert}" for alert in result.alerts) + return lines + + +__all__ = [ + "ProviderQuota", + "WatchdogResult", + "collect_quotas", + "evaluate_and_apply", + "format_status", + "quota_fallback_active", + "quota_fallback_provider", +] diff --git a/scripts/quota_watchdog.py b/scripts/quota_watchdog.py new file mode 100644 index 000000000..07bdad187 --- /dev/null +++ b/scripts/quota_watchdog.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +"""Run Hermes quota watchdog. + +This script is safe for cron: with --quiet-unchanged it prints only new +warning/critical alerts, so no_agent cron jobs stay silent when nothing changed. +""" +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +# Support running from ~/.hermes/scripts/quota_watchdog.py when Hermes is not +# installed as an editable package but the source checkout exists in the usual +# location. +REPO = Path.home() / ".hermes" / "hermes-agent" +if REPO.exists() and str(REPO) not in sys.path: + sys.path.insert(0, str(REPO)) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Check paid provider quotas and force local fallback at critical threshold.") + parser.add_argument("--dry-run", action="store_true", help="do not write config/state") + parser.add_argument("--quiet-unchanged", action="store_true", help="only print newly emitted alerts") + parser.add_argument("--status", action="store_true", help="always print the current quota status") + args = parser.parse_args() + # In cron/non-interactive mode, default to quiet-unchanged to avoid spamming + if not sys.stdin.isatty() and not args.status: + args.quiet_unchanged = True + + from agent.quota_watchdog import evaluate_and_apply, format_status + + result = evaluate_and_apply(apply=not args.dry_run, quiet_unchanged=args.quiet_unchanged) + lines: list[str] + if args.status or not args.quiet_unchanged: + lines = format_status(result) + else: + lines = list(result.alerts) + if result.switched and not lines: + lines = [f"Quota critical; forced fallback to {result.fallback_model} via {result.fallback_provider}"] + if lines: + print("\n".join(lines)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())