Compare commits

...

1 Commits

Author SHA1 Message Date
907f660dfd feat: quota watchdog for OpenCode Go + Codex
- agent/quota_watchdog.py: core logic — SessionDB token tracking,
  Codex API integration, threshold comparison, auto-fallback
- scripts/quota_watchdog.py: CLI entry point with --status,
  --dry-run, --quiet-unchanged (default in non-tty cron mode)
- Config: quota section with thresholds in config.yaml
- Cron: 30-min no_agent watchdog job (fb918d5e5dd1)
2026-05-29 16:19:13 +00:00
2 changed files with 456 additions and 0 deletions

409
agent/quota_watchdog.py Normal file
View File

@ -0,0 +1,409 @@
"""Quota watchdog for paid coding providers.
The watchdog combines provider account usage APIs (where available) with the
local SessionDB token ledger. It is intentionally conservative: once a
monitored provider crosses the critical threshold it persists a local fallback
model in config.yaml so future non-explicit sessions start on the cheap local
runtime instead of burning the last paid tokens.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
import calendar
import json
import os
import sqlite3
from typing import Any, Iterable, Optional
from urllib.parse import urlparse
from hermes_constants import get_hermes_home
from hermes_cli.config import cfg_get, load_config, read_raw_config, save_config
DEFAULT_WARNING_PCT = 80.0
DEFAULT_CRITICAL_PCT = 95.0
DEFAULT_FALLBACK_PROVIDER = "custom:gemma-local"
DEFAULT_FALLBACK_MODEL = "gemma-local"
DEFAULT_FALLBACK_BASE_URL = "http://127.0.0.1:8081/v1"
DEFAULT_MONTHLY_LIMITS = {
# OpenCode Go currently exposes an OpenAI-compatible inference endpoint but
# no documented quota endpoint. The user-facing plan is ~10M tokens/month,
# so use SessionDB accounting against that cap until an API appears.
"opencode-go": 10_000_000,
# Codex has an account-usage endpoint. If it is unavailable, SessionDB still
# reports raw monthly tokens but no percentage unless the user configures a
# monthly limit.
"openai-codex": 0,
}
OPENCODE_HOSTS = {"opencode.ai", "www.opencode.ai"}
STATE_FILE = "quota_watchdog_state.json"
@dataclass(frozen=True)
class ProviderQuota:
provider: str
source: str
used_percent: Optional[float]
reset_at: Optional[datetime] = None
used_tokens: int = 0
limit_tokens: int = 0
detail: str = ""
available: bool = True
@property
def level(self) -> str:
if self.used_percent is None:
return "unknown"
if self.used_percent >= DEFAULT_CRITICAL_PCT:
return "critical"
if self.used_percent >= DEFAULT_WARNING_PCT:
return "warning"
return "ok"
@dataclass(frozen=True)
class WatchdogResult:
quotas: tuple[ProviderQuota, ...]
warning_pct: float
critical_pct: float
fallback_provider: str
fallback_model: str
forced_fallback: bool
switched: bool = False
alerts: tuple[str, ...] = ()
errors: tuple[str, ...] = field(default_factory=tuple)
@property
def worst(self) -> Optional[ProviderQuota]:
ranked = {"critical": 3, "warning": 2, "ok": 1, "unknown": 0}
return max(self.quotas, key=lambda q: ranked.get(q.level, 0), default=None)
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
def _month_bounds(now: Optional[datetime] = None) -> tuple[datetime, datetime]:
now = now or _utc_now()
start = datetime(now.year, now.month, 1, tzinfo=timezone.utc)
if now.month == 12:
end = datetime(now.year + 1, 1, 1, tzinfo=timezone.utc)
else:
end = datetime(now.year, now.month + 1, 1, tzinfo=timezone.utc)
return start, end
def eta_reset_hours(reset_at: Optional[datetime]) -> Optional[int]:
if not reset_at:
return None
seconds = max(0, int((reset_at - _utc_now()).total_seconds()))
return int((seconds + 3599) // 3600)
def _session_db_path() -> Path:
return get_hermes_home() / "state.db"
def _provider_matches(provider: str, billing_provider: str, billing_base_url: str, model: str) -> bool:
provider = provider.lower()
bp = (billing_provider or "").lower()
base = (billing_base_url or "").lower()
model_l = (model or "").lower()
host = urlparse(base).hostname or ""
if provider == "opencode-go":
return "opencode" in bp or host in OPENCODE_HOSTS or "opencode.ai" in base or "opencode" in model_l
if provider == "openai-codex":
return bp == "openai-codex" or "codex" in bp or "codex" in base or "codex" in model_l
return provider in bp or provider in base or provider in model_l
def _sessiondb_monthly_tokens(provider: str, now: Optional[datetime] = None) -> int:
db_path = _session_db_path()
if not db_path.exists():
return 0
start, end = _month_bounds(now)
start_ts = start.timestamp()
end_ts = end.timestamp()
total = 0
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT model, billing_provider, billing_base_url,
COALESCE(input_tokens, 0) AS input_tokens,
COALESCE(output_tokens, 0) AS output_tokens,
COALESCE(cache_read_tokens, 0) AS cache_read_tokens,
COALESCE(cache_write_tokens, 0) AS cache_write_tokens,
COALESCE(reasoning_tokens, 0) AS reasoning_tokens
FROM sessions
WHERE started_at >= ? AND started_at < ?
""",
(start_ts, end_ts),
).fetchall()
finally:
conn.close()
except sqlite3.Error:
return 0
for row in rows:
if _provider_matches(provider, row["billing_provider"], row["billing_base_url"], row["model"]):
total += int(row["input_tokens"] or 0)
total += int(row["output_tokens"] or 0)
total += int(row["cache_read_tokens"] or 0)
total += int(row["cache_write_tokens"] or 0)
total += int(row["reasoning_tokens"] or 0)
return total
def _config_section(config: dict[str, Any]) -> dict[str, Any]:
section = config.get("quota")
return section if isinstance(section, dict) else {}
def _monthly_limits(config: dict[str, Any]) -> dict[str, int]:
raw = cfg_get(config, "quota", "monthly_token_limits", default={})
limits = dict(DEFAULT_MONTHLY_LIMITS)
if isinstance(raw, dict):
for key, value in raw.items():
try:
limits[str(key).strip()] = max(0, int(value or 0))
except (TypeError, ValueError):
continue
return limits
def _monitor_providers(config: dict[str, Any]) -> tuple[str, ...]:
raw = cfg_get(config, "quota", "providers", default=("opencode-go", "openai-codex"))
if isinstance(raw, str):
raw = [raw]
if not isinstance(raw, Iterable):
return ("opencode-go", "openai-codex")
providers = tuple(str(p).strip().lower() for p in raw if str(p).strip())
return providers or ("opencode-go", "openai-codex")
def _next_month_reset() -> datetime:
return _month_bounds()[1]
def _quota_from_sessiondb(provider: str, config: dict[str, Any]) -> ProviderQuota:
used = _sessiondb_monthly_tokens(provider)
limit = _monthly_limits(config).get(provider, 0)
pct = (used / limit * 100.0) if limit else None
detail = "SessionDB monthly token ledger"
if not limit:
detail += " (set quota.monthly_token_limits.%s for percent thresholds)" % provider
return ProviderQuota(
provider=provider,
source="sessiondb",
used_percent=pct,
reset_at=_next_month_reset(),
used_tokens=used,
limit_tokens=limit,
detail=detail,
available=True,
)
def _quota_from_codex_api(config: dict[str, Any]) -> Optional[ProviderQuota]:
try:
from agent.account_usage import fetch_account_usage
snapshot = fetch_account_usage("openai-codex")
except Exception:
return None
if not snapshot or not snapshot.windows:
return None
# Prefer the most depleted window; Codex exposes session/week windows rather
# than a monthly token bucket, but this still catches imminent exhaustion.
windows = [w for w in snapshot.windows if w.used_percent is not None]
if not windows:
return None
worst = max(windows, key=lambda w: float(w.used_percent or 0))
return ProviderQuota(
provider="openai-codex",
source=snapshot.source or "usage_api",
used_percent=float(worst.used_percent or 0),
reset_at=worst.reset_at,
detail=f"{worst.label} account window",
available=True,
)
def collect_quotas(config: Optional[dict[str, Any]] = None) -> tuple[ProviderQuota, ...]:
config = config or load_config()
quotas: list[ProviderQuota] = []
for provider in _monitor_providers(config):
quota: Optional[ProviderQuota] = None
if provider == "openai-codex":
quota = _quota_from_codex_api(config)
# OpenCode Go: no documented quota API found on the OpenAI-compatible
# endpoint. Use SessionDB monthly tracking.
if quota is None:
quota = _quota_from_sessiondb(provider, config)
quotas.append(quota)
return tuple(quotas)
def _configured_thresholds(config: dict[str, Any]) -> tuple[float, float]:
def as_pct(path: str, default: float) -> float:
value = cfg_get(config, "quota", path, default=default)
try:
return max(0.0, min(100.0, float(value)))
except (TypeError, ValueError):
return default
return as_pct("warning_pct", DEFAULT_WARNING_PCT), as_pct("critical_pct", DEFAULT_CRITICAL_PCT)
def quota_fallback_active(config: Optional[dict[str, Any]] = None) -> bool:
config = config or load_config()
if os.getenv("HERMES_QUOTA_BYPASS") or os.getenv("HERMES_QUOTA_CRITICAL"):
return False
return bool(cfg_get(config, "quota", "force_fallback", default=False))
def quota_fallback_provider(config: Optional[dict[str, Any]] = None) -> str:
config = config or load_config()
return str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER)
def _ensure_gemma_provider(config: dict[str, Any], provider: str, model: str, base_url: str) -> None:
if not provider.startswith("custom:"):
return
name = provider.split(":", 1)[1]
providers = config.setdefault("providers", {})
if not isinstance(providers, dict):
providers = {}
config["providers"] = providers
entry = providers.get(name)
if not isinstance(entry, dict):
providers[name] = {
"name": name,
"base_url": base_url,
"default_model": model,
"api_key": "no-key-required",
"transport": "chat_completions",
}
def _persist_forced_fallback(raw_config: dict[str, Any], *, provider: str, model: str, base_url: str) -> bool:
quota = raw_config.setdefault("quota", {})
if not isinstance(quota, dict):
quota = {}
raw_config["quota"] = quota
already = bool(quota.get("force_fallback"))
quota["force_fallback"] = True
quota["forced_at"] = _utc_now().isoformat()
quota["fallback_provider"] = provider
quota["fallback_model"] = model
quota["fallback_base_url"] = base_url
_ensure_gemma_provider(raw_config, provider, model, base_url)
save_config(raw_config)
return not already
def _alert_key(quota: ProviderQuota, level: str) -> str:
reset = quota.reset_at.isoformat() if quota.reset_at else "unknown"
pct = int(quota.used_percent or 0)
return f"{quota.provider}:{level}:{pct // 5 * 5}:{reset}"
def _state_path() -> Path:
return get_hermes_home() / STATE_FILE
def _read_state() -> dict[str, Any]:
try:
return json.loads(_state_path().read_text(encoding="utf-8"))
except Exception:
return {}
def _write_state(state: dict[str, Any]) -> None:
path = _state_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
def evaluate_and_apply(*, apply: bool = True, quiet_unchanged: bool = False) -> WatchdogResult:
config = load_config()
raw_config = read_raw_config() or {}
warning_pct, critical_pct = _configured_thresholds(config)
fallback_provider = str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER)
fallback_model = str(cfg_get(config, "quota", "fallback_model", default=DEFAULT_FALLBACK_MODEL) or DEFAULT_FALLBACK_MODEL)
fallback_base_url = str(cfg_get(config, "quota", "fallback_base_url", default=DEFAULT_FALLBACK_BASE_URL) or DEFAULT_FALLBACK_BASE_URL)
quotas = collect_quotas(config)
state = _read_state()
emitted = set(state.get("emitted_alert_keys") or [])
alerts: list[str] = []
switched = False
for quota in quotas:
if quota.used_percent is None:
continue
level = "critical" if quota.used_percent >= critical_pct else "warning" if quota.used_percent >= warning_pct else "ok"
if level == "ok":
continue
eta = eta_reset_hours(quota.reset_at)
eta_text = f"{eta} hours" if eta is not None else "unknown"
switch_text = "switching to fallback" if level == "critical" else "recommend fallback"
msg = f"Quota at {quota.used_percent:.0f}% for {quota.provider}, {switch_text}. ETA reset: {eta_text}"
key = _alert_key(quota, level)
if key not in emitted or not quiet_unchanged:
alerts.append(msg)
emitted.add(key)
if level == "critical" and apply:
switched = _persist_forced_fallback(raw_config, provider=fallback_provider, model=fallback_model, base_url=fallback_base_url) or switched
state["emitted_alert_keys"] = sorted(emitted)[-100:]
state["last_checked_at"] = _utc_now().isoformat()
state["last_quotas"] = [quota.__dict__ | {"reset_at": quota.reset_at.isoformat() if quota.reset_at else None} for quota in quotas]
if apply:
_write_state(state)
forced = bool(cfg_get(load_config(), "quota", "force_fallback", default=False))
return WatchdogResult(
quotas=quotas,
warning_pct=warning_pct,
critical_pct=critical_pct,
fallback_provider=fallback_provider,
fallback_model=fallback_model,
forced_fallback=forced,
switched=switched,
alerts=tuple(alerts),
)
def format_status(result: WatchdogResult, *, markdown: bool = False) -> list[str]:
lines = ["🧯 Quota watchdog"]
lines.append(f"Thresholds: warning {result.warning_pct:.0f}%, critical {result.critical_pct:.0f}%")
lines.append(f"Fallback: {result.fallback_model} via {result.fallback_provider} ({'forced' if result.forced_fallback else 'standby'})")
for quota in result.quotas:
if quota.used_percent is None:
pct = "unknown"
else:
pct = f"{quota.used_percent:.0f}% used"
token_part = ""
if quota.limit_tokens:
token_part = f"{quota.used_tokens:,}/{quota.limit_tokens:,} tokens"
reset = eta_reset_hours(quota.reset_at)
reset_part = f" • reset in {reset}h" if reset is not None else ""
lines.append(f"{quota.provider}: {pct} ({quota.source}){token_part}{reset_part}")
if result.alerts:
lines.append("Alerts:")
lines.extend(f"- {alert}" for alert in result.alerts)
return lines
__all__ = [
"ProviderQuota",
"WatchdogResult",
"collect_quotas",
"evaluate_and_apply",
"format_status",
"quota_fallback_active",
"quota_fallback_provider",
]

47
scripts/quota_watchdog.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""Run Hermes quota watchdog.
This script is safe for cron: with --quiet-unchanged it prints only new
warning/critical alerts, so no_agent cron jobs stay silent when nothing changed.
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
# Support running from ~/.hermes/scripts/quota_watchdog.py when Hermes is not
# installed as an editable package but the source checkout exists in the usual
# location.
REPO = Path.home() / ".hermes" / "hermes-agent"
if REPO.exists() and str(REPO) not in sys.path:
sys.path.insert(0, str(REPO))
def main() -> int:
parser = argparse.ArgumentParser(description="Check paid provider quotas and force local fallback at critical threshold.")
parser.add_argument("--dry-run", action="store_true", help="do not write config/state")
parser.add_argument("--quiet-unchanged", action="store_true", help="only print newly emitted alerts")
parser.add_argument("--status", action="store_true", help="always print the current quota status")
args = parser.parse_args()
# In cron/non-interactive mode, default to quiet-unchanged to avoid spamming
if not sys.stdin.isatty() and not args.status:
args.quiet_unchanged = True
from agent.quota_watchdog import evaluate_and_apply, format_status
result = evaluate_and_apply(apply=not args.dry_run, quiet_unchanged=args.quiet_unchanged)
lines: list[str]
if args.status or not args.quiet_unchanged:
lines = format_status(result)
else:
lines = list(result.alerts)
if result.switched and not lines:
lines = [f"Quota critical; forced fallback to {result.fallback_model} via {result.fallback_provider}"]
if lines:
print("\n".join(lines))
return 0
if __name__ == "__main__":
raise SystemExit(main())