feat: quota watchdog for OpenCode Go + Codex
- agent/quota_watchdog.py: core logic — SessionDB token tracking, Codex API integration, threshold comparison, auto-fallback - scripts/quota_watchdog.py: CLI entry point with --status, --dry-run, --quiet-unchanged (default in non-tty cron mode) - Config: quota section with thresholds in config.yaml - Cron: 30-min no_agent watchdog job (fb918d5e5dd1)
This commit is contained in:
parent
2517917de3
commit
907f660dfd
409
agent/quota_watchdog.py
Normal file
409
agent/quota_watchdog.py
Normal file
@ -0,0 +1,409 @@
|
||||
"""Quota watchdog for paid coding providers.
|
||||
|
||||
The watchdog combines provider account usage APIs (where available) with the
|
||||
local SessionDB token ledger. It is intentionally conservative: once a
|
||||
monitored provider crosses the critical threshold it persists a local fallback
|
||||
model in config.yaml so future non-explicit sessions start on the cheap local
|
||||
runtime instead of burning the last paid tokens.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
import calendar
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Any, Iterable, Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.config import cfg_get, load_config, read_raw_config, save_config
|
||||
|
||||
|
||||
DEFAULT_WARNING_PCT = 80.0
|
||||
DEFAULT_CRITICAL_PCT = 95.0
|
||||
DEFAULT_FALLBACK_PROVIDER = "custom:gemma-local"
|
||||
DEFAULT_FALLBACK_MODEL = "gemma-local"
|
||||
DEFAULT_FALLBACK_BASE_URL = "http://127.0.0.1:8081/v1"
|
||||
DEFAULT_MONTHLY_LIMITS = {
|
||||
# OpenCode Go currently exposes an OpenAI-compatible inference endpoint but
|
||||
# no documented quota endpoint. The user-facing plan is ~10M tokens/month,
|
||||
# so use SessionDB accounting against that cap until an API appears.
|
||||
"opencode-go": 10_000_000,
|
||||
# Codex has an account-usage endpoint. If it is unavailable, SessionDB still
|
||||
# reports raw monthly tokens but no percentage unless the user configures a
|
||||
# monthly limit.
|
||||
"openai-codex": 0,
|
||||
}
|
||||
OPENCODE_HOSTS = {"opencode.ai", "www.opencode.ai"}
|
||||
STATE_FILE = "quota_watchdog_state.json"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProviderQuota:
|
||||
provider: str
|
||||
source: str
|
||||
used_percent: Optional[float]
|
||||
reset_at: Optional[datetime] = None
|
||||
used_tokens: int = 0
|
||||
limit_tokens: int = 0
|
||||
detail: str = ""
|
||||
available: bool = True
|
||||
|
||||
@property
|
||||
def level(self) -> str:
|
||||
if self.used_percent is None:
|
||||
return "unknown"
|
||||
if self.used_percent >= DEFAULT_CRITICAL_PCT:
|
||||
return "critical"
|
||||
if self.used_percent >= DEFAULT_WARNING_PCT:
|
||||
return "warning"
|
||||
return "ok"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WatchdogResult:
|
||||
quotas: tuple[ProviderQuota, ...]
|
||||
warning_pct: float
|
||||
critical_pct: float
|
||||
fallback_provider: str
|
||||
fallback_model: str
|
||||
forced_fallback: bool
|
||||
switched: bool = False
|
||||
alerts: tuple[str, ...] = ()
|
||||
errors: tuple[str, ...] = field(default_factory=tuple)
|
||||
|
||||
@property
|
||||
def worst(self) -> Optional[ProviderQuota]:
|
||||
ranked = {"critical": 3, "warning": 2, "ok": 1, "unknown": 0}
|
||||
return max(self.quotas, key=lambda q: ranked.get(q.level, 0), default=None)
|
||||
|
||||
|
||||
def _utc_now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _month_bounds(now: Optional[datetime] = None) -> tuple[datetime, datetime]:
|
||||
now = now or _utc_now()
|
||||
start = datetime(now.year, now.month, 1, tzinfo=timezone.utc)
|
||||
if now.month == 12:
|
||||
end = datetime(now.year + 1, 1, 1, tzinfo=timezone.utc)
|
||||
else:
|
||||
end = datetime(now.year, now.month + 1, 1, tzinfo=timezone.utc)
|
||||
return start, end
|
||||
|
||||
|
||||
def eta_reset_hours(reset_at: Optional[datetime]) -> Optional[int]:
|
||||
if not reset_at:
|
||||
return None
|
||||
seconds = max(0, int((reset_at - _utc_now()).total_seconds()))
|
||||
return int((seconds + 3599) // 3600)
|
||||
|
||||
|
||||
def _session_db_path() -> Path:
|
||||
return get_hermes_home() / "state.db"
|
||||
|
||||
|
||||
def _provider_matches(provider: str, billing_provider: str, billing_base_url: str, model: str) -> bool:
|
||||
provider = provider.lower()
|
||||
bp = (billing_provider or "").lower()
|
||||
base = (billing_base_url or "").lower()
|
||||
model_l = (model or "").lower()
|
||||
host = urlparse(base).hostname or ""
|
||||
if provider == "opencode-go":
|
||||
return "opencode" in bp or host in OPENCODE_HOSTS or "opencode.ai" in base or "opencode" in model_l
|
||||
if provider == "openai-codex":
|
||||
return bp == "openai-codex" or "codex" in bp or "codex" in base or "codex" in model_l
|
||||
return provider in bp or provider in base or provider in model_l
|
||||
|
||||
|
||||
def _sessiondb_monthly_tokens(provider: str, now: Optional[datetime] = None) -> int:
|
||||
db_path = _session_db_path()
|
||||
if not db_path.exists():
|
||||
return 0
|
||||
start, end = _month_bounds(now)
|
||||
start_ts = start.timestamp()
|
||||
end_ts = end.timestamp()
|
||||
total = 0
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT model, billing_provider, billing_base_url,
|
||||
COALESCE(input_tokens, 0) AS input_tokens,
|
||||
COALESCE(output_tokens, 0) AS output_tokens,
|
||||
COALESCE(cache_read_tokens, 0) AS cache_read_tokens,
|
||||
COALESCE(cache_write_tokens, 0) AS cache_write_tokens,
|
||||
COALESCE(reasoning_tokens, 0) AS reasoning_tokens
|
||||
FROM sessions
|
||||
WHERE started_at >= ? AND started_at < ?
|
||||
""",
|
||||
(start_ts, end_ts),
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
except sqlite3.Error:
|
||||
return 0
|
||||
for row in rows:
|
||||
if _provider_matches(provider, row["billing_provider"], row["billing_base_url"], row["model"]):
|
||||
total += int(row["input_tokens"] or 0)
|
||||
total += int(row["output_tokens"] or 0)
|
||||
total += int(row["cache_read_tokens"] or 0)
|
||||
total += int(row["cache_write_tokens"] or 0)
|
||||
total += int(row["reasoning_tokens"] or 0)
|
||||
return total
|
||||
|
||||
|
||||
def _config_section(config: dict[str, Any]) -> dict[str, Any]:
|
||||
section = config.get("quota")
|
||||
return section if isinstance(section, dict) else {}
|
||||
|
||||
|
||||
def _monthly_limits(config: dict[str, Any]) -> dict[str, int]:
|
||||
raw = cfg_get(config, "quota", "monthly_token_limits", default={})
|
||||
limits = dict(DEFAULT_MONTHLY_LIMITS)
|
||||
if isinstance(raw, dict):
|
||||
for key, value in raw.items():
|
||||
try:
|
||||
limits[str(key).strip()] = max(0, int(value or 0))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
return limits
|
||||
|
||||
|
||||
def _monitor_providers(config: dict[str, Any]) -> tuple[str, ...]:
|
||||
raw = cfg_get(config, "quota", "providers", default=("opencode-go", "openai-codex"))
|
||||
if isinstance(raw, str):
|
||||
raw = [raw]
|
||||
if not isinstance(raw, Iterable):
|
||||
return ("opencode-go", "openai-codex")
|
||||
providers = tuple(str(p).strip().lower() for p in raw if str(p).strip())
|
||||
return providers or ("opencode-go", "openai-codex")
|
||||
|
||||
|
||||
def _next_month_reset() -> datetime:
|
||||
return _month_bounds()[1]
|
||||
|
||||
|
||||
def _quota_from_sessiondb(provider: str, config: dict[str, Any]) -> ProviderQuota:
|
||||
used = _sessiondb_monthly_tokens(provider)
|
||||
limit = _monthly_limits(config).get(provider, 0)
|
||||
pct = (used / limit * 100.0) if limit else None
|
||||
detail = "SessionDB monthly token ledger"
|
||||
if not limit:
|
||||
detail += " (set quota.monthly_token_limits.%s for percent thresholds)" % provider
|
||||
return ProviderQuota(
|
||||
provider=provider,
|
||||
source="sessiondb",
|
||||
used_percent=pct,
|
||||
reset_at=_next_month_reset(),
|
||||
used_tokens=used,
|
||||
limit_tokens=limit,
|
||||
detail=detail,
|
||||
available=True,
|
||||
)
|
||||
|
||||
|
||||
def _quota_from_codex_api(config: dict[str, Any]) -> Optional[ProviderQuota]:
|
||||
try:
|
||||
from agent.account_usage import fetch_account_usage
|
||||
|
||||
snapshot = fetch_account_usage("openai-codex")
|
||||
except Exception:
|
||||
return None
|
||||
if not snapshot or not snapshot.windows:
|
||||
return None
|
||||
# Prefer the most depleted window; Codex exposes session/week windows rather
|
||||
# than a monthly token bucket, but this still catches imminent exhaustion.
|
||||
windows = [w for w in snapshot.windows if w.used_percent is not None]
|
||||
if not windows:
|
||||
return None
|
||||
worst = max(windows, key=lambda w: float(w.used_percent or 0))
|
||||
return ProviderQuota(
|
||||
provider="openai-codex",
|
||||
source=snapshot.source or "usage_api",
|
||||
used_percent=float(worst.used_percent or 0),
|
||||
reset_at=worst.reset_at,
|
||||
detail=f"{worst.label} account window",
|
||||
available=True,
|
||||
)
|
||||
|
||||
|
||||
def collect_quotas(config: Optional[dict[str, Any]] = None) -> tuple[ProviderQuota, ...]:
|
||||
config = config or load_config()
|
||||
quotas: list[ProviderQuota] = []
|
||||
for provider in _monitor_providers(config):
|
||||
quota: Optional[ProviderQuota] = None
|
||||
if provider == "openai-codex":
|
||||
quota = _quota_from_codex_api(config)
|
||||
# OpenCode Go: no documented quota API found on the OpenAI-compatible
|
||||
# endpoint. Use SessionDB monthly tracking.
|
||||
if quota is None:
|
||||
quota = _quota_from_sessiondb(provider, config)
|
||||
quotas.append(quota)
|
||||
return tuple(quotas)
|
||||
|
||||
|
||||
def _configured_thresholds(config: dict[str, Any]) -> tuple[float, float]:
|
||||
def as_pct(path: str, default: float) -> float:
|
||||
value = cfg_get(config, "quota", path, default=default)
|
||||
try:
|
||||
return max(0.0, min(100.0, float(value)))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
return as_pct("warning_pct", DEFAULT_WARNING_PCT), as_pct("critical_pct", DEFAULT_CRITICAL_PCT)
|
||||
|
||||
|
||||
def quota_fallback_active(config: Optional[dict[str, Any]] = None) -> bool:
|
||||
config = config or load_config()
|
||||
if os.getenv("HERMES_QUOTA_BYPASS") or os.getenv("HERMES_QUOTA_CRITICAL"):
|
||||
return False
|
||||
return bool(cfg_get(config, "quota", "force_fallback", default=False))
|
||||
|
||||
|
||||
def quota_fallback_provider(config: Optional[dict[str, Any]] = None) -> str:
|
||||
config = config or load_config()
|
||||
return str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER)
|
||||
|
||||
|
||||
def _ensure_gemma_provider(config: dict[str, Any], provider: str, model: str, base_url: str) -> None:
|
||||
if not provider.startswith("custom:"):
|
||||
return
|
||||
name = provider.split(":", 1)[1]
|
||||
providers = config.setdefault("providers", {})
|
||||
if not isinstance(providers, dict):
|
||||
providers = {}
|
||||
config["providers"] = providers
|
||||
entry = providers.get(name)
|
||||
if not isinstance(entry, dict):
|
||||
providers[name] = {
|
||||
"name": name,
|
||||
"base_url": base_url,
|
||||
"default_model": model,
|
||||
"api_key": "no-key-required",
|
||||
"transport": "chat_completions",
|
||||
}
|
||||
|
||||
|
||||
def _persist_forced_fallback(raw_config: dict[str, Any], *, provider: str, model: str, base_url: str) -> bool:
|
||||
quota = raw_config.setdefault("quota", {})
|
||||
if not isinstance(quota, dict):
|
||||
quota = {}
|
||||
raw_config["quota"] = quota
|
||||
already = bool(quota.get("force_fallback"))
|
||||
quota["force_fallback"] = True
|
||||
quota["forced_at"] = _utc_now().isoformat()
|
||||
quota["fallback_provider"] = provider
|
||||
quota["fallback_model"] = model
|
||||
quota["fallback_base_url"] = base_url
|
||||
_ensure_gemma_provider(raw_config, provider, model, base_url)
|
||||
save_config(raw_config)
|
||||
return not already
|
||||
|
||||
|
||||
def _alert_key(quota: ProviderQuota, level: str) -> str:
|
||||
reset = quota.reset_at.isoformat() if quota.reset_at else "unknown"
|
||||
pct = int(quota.used_percent or 0)
|
||||
return f"{quota.provider}:{level}:{pct // 5 * 5}:{reset}"
|
||||
|
||||
|
||||
def _state_path() -> Path:
|
||||
return get_hermes_home() / STATE_FILE
|
||||
|
||||
|
||||
def _read_state() -> dict[str, Any]:
|
||||
try:
|
||||
return json.loads(_state_path().read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _write_state(state: dict[str, Any]) -> None:
|
||||
path = _state_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
|
||||
|
||||
|
||||
def evaluate_and_apply(*, apply: bool = True, quiet_unchanged: bool = False) -> WatchdogResult:
|
||||
config = load_config()
|
||||
raw_config = read_raw_config() or {}
|
||||
warning_pct, critical_pct = _configured_thresholds(config)
|
||||
fallback_provider = str(cfg_get(config, "quota", "fallback_provider", default=DEFAULT_FALLBACK_PROVIDER) or DEFAULT_FALLBACK_PROVIDER)
|
||||
fallback_model = str(cfg_get(config, "quota", "fallback_model", default=DEFAULT_FALLBACK_MODEL) or DEFAULT_FALLBACK_MODEL)
|
||||
fallback_base_url = str(cfg_get(config, "quota", "fallback_base_url", default=DEFAULT_FALLBACK_BASE_URL) or DEFAULT_FALLBACK_BASE_URL)
|
||||
quotas = collect_quotas(config)
|
||||
state = _read_state()
|
||||
emitted = set(state.get("emitted_alert_keys") or [])
|
||||
alerts: list[str] = []
|
||||
switched = False
|
||||
|
||||
for quota in quotas:
|
||||
if quota.used_percent is None:
|
||||
continue
|
||||
level = "critical" if quota.used_percent >= critical_pct else "warning" if quota.used_percent >= warning_pct else "ok"
|
||||
if level == "ok":
|
||||
continue
|
||||
eta = eta_reset_hours(quota.reset_at)
|
||||
eta_text = f"{eta} hours" if eta is not None else "unknown"
|
||||
switch_text = "switching to fallback" if level == "critical" else "recommend fallback"
|
||||
msg = f"Quota at {quota.used_percent:.0f}% for {quota.provider}, {switch_text}. ETA reset: {eta_text}"
|
||||
key = _alert_key(quota, level)
|
||||
if key not in emitted or not quiet_unchanged:
|
||||
alerts.append(msg)
|
||||
emitted.add(key)
|
||||
if level == "critical" and apply:
|
||||
switched = _persist_forced_fallback(raw_config, provider=fallback_provider, model=fallback_model, base_url=fallback_base_url) or switched
|
||||
|
||||
state["emitted_alert_keys"] = sorted(emitted)[-100:]
|
||||
state["last_checked_at"] = _utc_now().isoformat()
|
||||
state["last_quotas"] = [quota.__dict__ | {"reset_at": quota.reset_at.isoformat() if quota.reset_at else None} for quota in quotas]
|
||||
if apply:
|
||||
_write_state(state)
|
||||
forced = bool(cfg_get(load_config(), "quota", "force_fallback", default=False))
|
||||
return WatchdogResult(
|
||||
quotas=quotas,
|
||||
warning_pct=warning_pct,
|
||||
critical_pct=critical_pct,
|
||||
fallback_provider=fallback_provider,
|
||||
fallback_model=fallback_model,
|
||||
forced_fallback=forced,
|
||||
switched=switched,
|
||||
alerts=tuple(alerts),
|
||||
)
|
||||
|
||||
|
||||
def format_status(result: WatchdogResult, *, markdown: bool = False) -> list[str]:
|
||||
lines = ["🧯 Quota watchdog"]
|
||||
lines.append(f"Thresholds: warning {result.warning_pct:.0f}%, critical {result.critical_pct:.0f}%")
|
||||
lines.append(f"Fallback: {result.fallback_model} via {result.fallback_provider} ({'forced' if result.forced_fallback else 'standby'})")
|
||||
for quota in result.quotas:
|
||||
if quota.used_percent is None:
|
||||
pct = "unknown"
|
||||
else:
|
||||
pct = f"{quota.used_percent:.0f}% used"
|
||||
token_part = ""
|
||||
if quota.limit_tokens:
|
||||
token_part = f" • {quota.used_tokens:,}/{quota.limit_tokens:,} tokens"
|
||||
reset = eta_reset_hours(quota.reset_at)
|
||||
reset_part = f" • reset in {reset}h" if reset is not None else ""
|
||||
lines.append(f"{quota.provider}: {pct} ({quota.source}){token_part}{reset_part}")
|
||||
if result.alerts:
|
||||
lines.append("Alerts:")
|
||||
lines.extend(f"- {alert}" for alert in result.alerts)
|
||||
return lines
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ProviderQuota",
|
||||
"WatchdogResult",
|
||||
"collect_quotas",
|
||||
"evaluate_and_apply",
|
||||
"format_status",
|
||||
"quota_fallback_active",
|
||||
"quota_fallback_provider",
|
||||
]
|
||||
47
scripts/quota_watchdog.py
Normal file
47
scripts/quota_watchdog.py
Normal file
@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Run Hermes quota watchdog.
|
||||
|
||||
This script is safe for cron: with --quiet-unchanged it prints only new
|
||||
warning/critical alerts, so no_agent cron jobs stay silent when nothing changed.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Support running from ~/.hermes/scripts/quota_watchdog.py when Hermes is not
|
||||
# installed as an editable package but the source checkout exists in the usual
|
||||
# location.
|
||||
REPO = Path.home() / ".hermes" / "hermes-agent"
|
||||
if REPO.exists() and str(REPO) not in sys.path:
|
||||
sys.path.insert(0, str(REPO))
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Check paid provider quotas and force local fallback at critical threshold.")
|
||||
parser.add_argument("--dry-run", action="store_true", help="do not write config/state")
|
||||
parser.add_argument("--quiet-unchanged", action="store_true", help="only print newly emitted alerts")
|
||||
parser.add_argument("--status", action="store_true", help="always print the current quota status")
|
||||
args = parser.parse_args()
|
||||
# In cron/non-interactive mode, default to quiet-unchanged to avoid spamming
|
||||
if not sys.stdin.isatty() and not args.status:
|
||||
args.quiet_unchanged = True
|
||||
|
||||
from agent.quota_watchdog import evaluate_and_apply, format_status
|
||||
|
||||
result = evaluate_and_apply(apply=not args.dry_run, quiet_unchanged=args.quiet_unchanged)
|
||||
lines: list[str]
|
||||
if args.status or not args.quiet_unchanged:
|
||||
lines = format_status(result)
|
||||
else:
|
||||
lines = list(result.alerts)
|
||||
if result.switched and not lines:
|
||||
lines = [f"Quota critical; forced fallback to {result.fallback_model} via {result.fallback_provider}"]
|
||||
if lines:
|
||||
print("\n".join(lines))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Loading…
Reference in New Issue
Block a user