Compare commits

..

2 Commits

Author SHA1 Message Date
f936968c5c feat: telemetry dashboard /stats command with CLI + gateway handlers
- agent/session_stats.py: SessionDB context/compression metrics
- agent/skill_stats.py: curator usage.json reader + prune history
- agent/system_health.py: gateway uptime, version, cron activity
- agent/stats_dashboard.py: Telegram-friendly bullet renderer
- cli.py: /stats dispatch + _handle_stats_command method
- gateway/run.py: /stats dispatch + _handle_stats_command for messaging platforms
- hermes_cli/commands.py: /stats CommandDef registration
2026-05-29 16:31:33 +00:00
e5d74fa32a feat(auxiliary): self-healing fallback chain with proactive context-length checks
- Add _context_length_error() for pre-API-call context window validation
- Add _is_context_length_error() to detect context-length API errors
- Enhance _try_configured_fallback_chain() with failed_model/messages/max_tokens
- Fix skip logic: same provider + different model = valid self-healing rung
- Integrate context-length checks into call_llm() and async_call_llm()
- Trigger fallback chain on context_length errors (not just payment/connection)
2026-05-29 16:15:00 +00:00
12 changed files with 644 additions and 279 deletions

View File

@ -2831,16 +2831,74 @@ def _try_main_agent_model_fallback(
return client, resolved_model or main_model, label return client, resolved_model or main_model, label
def _coerce_positive_int(value: Any) -> Optional[int]:
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def _estimate_auxiliary_request_tokens(messages: list, max_tokens: Optional[int] = None) -> int:
"""Rough token estimate for local auxiliary context-window checks."""
try:
from agent.model_metadata import estimate_messages_tokens_rough
input_tokens = estimate_messages_tokens_rough(messages or [])
except Exception:
input_tokens = 0
for msg in messages or []:
content = msg.get("content", "") if isinstance(msg, dict) else str(msg)
input_tokens += max(1, len(str(content)) // 4)
return input_tokens + (_coerce_positive_int(max_tokens) or 0)
def _context_length_error(
*,
task: str,
provider: str,
model: Optional[str],
context_length: Optional[int],
messages: list,
max_tokens: Optional[int],
) -> Optional[ValueError]:
ctx = _coerce_positive_int(context_length)
if not ctx:
return None
estimated = _estimate_auxiliary_request_tokens(messages, max_tokens)
if estimated <= ctx:
return None
return ValueError(
f"Auxiliary {task or 'call'} request needs ~{estimated} tokens, "
f"exceeding configured context_length={ctx} for "
f"{provider or 'auto'}/{model or 'default'}"
)
def _is_context_length_error(exc: Exception) -> bool:
text = str(exc).lower()
return (
"context_length" in text
or "context length" in text
or "context window" in text
or "too many tokens" in text
or "exceeding configured context" in text
or "exceeds the max_model_len" in text
)
def _try_configured_fallback_chain( def _try_configured_fallback_chain(
task: str, task: str,
failed_provider: str, failed_provider: str,
reason: str = "error", reason: str = "error",
failed_model: Optional[str] = None,
messages: Optional[list] = None,
max_tokens: Optional[int] = None,
) -> Tuple[Optional[Any], Optional[str], str]: ) -> Tuple[Optional[Any], Optional[str], str]:
"""Try user-configured fallback_chain for a specific auxiliary task. """Try user-configured fallback_chain for a specific auxiliary task.
Reads auxiliary.<task>.fallback_chain from config.yaml and tries each Reads auxiliary.<task>.fallback_chain from config.yaml and tries each
entry in order. Each entry must have at least ``provider``; ``model``, entry in order. Each entry must have at least ``provider``; ``model``,
``base_url``, and ``api_key`` are optional. ``base_url``, ``api_key``, and ``context_length`` are optional.
Returns: Returns:
(client, model, provider_label) or (None, None, "") if no fallback. (client, model, provider_label) or (None, None, "") if no fallback.
@ -2853,21 +2911,46 @@ def _try_configured_fallback_chain(
if not chain or not isinstance(chain, list): if not chain or not isinstance(chain, list):
return None, None, "" return None, None, ""
skip = failed_provider.lower().strip() skip_provider = failed_provider.lower().strip()
skip_model = str(failed_model or "").lower().strip()
tried = [] tried = []
for i, entry in enumerate(chain): for i, entry in enumerate(chain):
if not isinstance(entry, dict): if not isinstance(entry, dict):
continue continue
fb_provider = str(entry.get("provider", "")).strip() fb_provider = str(entry.get("provider", "")).strip()
if not fb_provider or fb_provider.lower() == skip: if not fb_provider:
continue continue
fb_model = str(entry.get("model", "")).strip() or None fb_model = str(entry.get("model", "")).strip() or None
# Skip only the exact failed provider+model pair. Same provider with a
# different model is a valid self-healing rung (e.g. opencode_go
# deepseek-v4-pro -> opencode_go gpt-5.5).
if fb_provider.lower() == skip_provider and (
not skip_model or (fb_model or "").lower() == skip_model
):
continue
fb_base_url = str(entry.get("base_url", "")).strip() or None fb_base_url = str(entry.get("base_url", "")).strip() or None
fb_api_key = str(entry.get("api_key", "")).strip() or None fb_api_key = str(entry.get("api_key", "")).strip() or None
label = f"fallback_chain[{i}]({fb_provider})" label = f"fallback_chain[{i}]({fb_provider})"
if messages is not None:
context_err = _context_length_error(
task=task,
provider=fb_provider,
model=fb_model,
context_length=entry.get("context_length"),
messages=messages,
max_tokens=max_tokens,
)
if context_err is not None:
logger.info(
"Auxiliary %s: skipping %s (%s) because it also exceeds context_length: %s",
task, label, fb_model or "default", context_err,
)
tried.append(label)
continue
try: try:
fb_client = _resolve_single_provider( fb_client = _resolve_single_provider(
fb_provider, fb_model, fb_base_url, fb_api_key) fb_provider, fb_model, fb_base_url, fb_api_key)
@ -2889,7 +2972,6 @@ def _try_configured_fallback_chain(
) )
return None, None, "" return None, None, ""
def _resolve_single_provider( def _resolve_single_provider(
provider: str, provider: str,
model: Optional[str] = None, model: Optional[str] = None,
@ -4889,6 +4971,17 @@ def call_llm(
# Handle unsupported temperature, max_tokens vs max_completion_tokens retry, # Handle unsupported temperature, max_tokens vs max_completion_tokens retry,
# then payment fallback. # then payment fallback.
try: try:
task_context = _get_auxiliary_task_config(task).get("context_length") if task else None
context_err = _context_length_error(
task=task or "call",
provider=resolved_provider,
model=final_model,
context_length=task_context,
messages=messages,
max_tokens=max_tokens,
)
if context_err is not None:
raise context_err
return _validate_llm_response( return _validate_llm_response(
client.chat.completions.create(**kwargs), task) client.chat.completions.create(**kwargs), task)
except Exception as first_err: except Exception as first_err:
@ -5072,6 +5165,7 @@ def call_llm(
_is_payment_error(first_err) _is_payment_error(first_err)
or _is_connection_error(first_err) or _is_connection_error(first_err)
or _is_rate_limit_error(first_err) or _is_rate_limit_error(first_err)
or _is_context_length_error(first_err)
) )
# Respect explicit provider choice for transient errors (auth, request # Respect explicit provider choice for transient errors (auth, request
# validation, etc.) but allow fallback when the provider clearly cannot # validation, etc.) but allow fallback when the provider clearly cannot
@ -5082,7 +5176,11 @@ def call_llm(
is_auto = resolved_provider in {"auto", "", None} is_auto = resolved_provider in {"auto", "", None}
# Capacity errors bypass the explicit-provider gate: the provider # Capacity errors bypass the explicit-provider gate: the provider
# literally cannot serve this request regardless of user intent. # literally cannot serve this request regardless of user intent.
is_capacity_error = _is_payment_error(first_err) or _is_connection_error(first_err) is_capacity_error = (
_is_payment_error(first_err)
or _is_connection_error(first_err)
or _is_context_length_error(first_err)
)
if should_fallback and (is_auto or is_capacity_error): if should_fallback and (is_auto or is_capacity_error):
if _is_payment_error(first_err): if _is_payment_error(first_err):
reason = "payment error" reason = "payment error"
@ -5095,6 +5193,8 @@ def call_llm(
) )
elif _is_rate_limit_error(first_err): elif _is_rate_limit_error(first_err):
reason = "rate limit" reason = "rate limit"
elif _is_context_length_error(first_err):
reason = "context length"
else: else:
reason = "connection error" reason = "connection error"
logger.info("Auxiliary %s: %s on %s (%s), trying fallback", logger.info("Auxiliary %s: %s on %s (%s), trying fallback",
@ -5112,7 +5212,8 @@ def call_llm(
resolved_provider, task, reason=reason) resolved_provider, task, reason=reason)
else: else:
fb_client, fb_model, fb_label = _try_configured_fallback_chain( fb_client, fb_model, fb_label = _try_configured_fallback_chain(
task, resolved_provider or "auto", reason=reason) task, resolved_provider or "auto", reason=reason,
failed_model=final_model, messages=messages, max_tokens=max_tokens)
if fb_client is None: if fb_client is None:
fb_client, fb_model, fb_label = _try_main_agent_model_fallback( fb_client, fb_model, fb_label = _try_main_agent_model_fallback(
resolved_provider, task, reason=reason) resolved_provider, task, reason=reason)
@ -5295,6 +5396,17 @@ async def async_call_llm(
kwargs["messages"] = _convert_openai_images_to_anthropic(kwargs["messages"]) kwargs["messages"] = _convert_openai_images_to_anthropic(kwargs["messages"])
try: try:
task_context = _get_auxiliary_task_config(task).get("context_length") if task else None
context_err = _context_length_error(
task=task or "call",
provider=resolved_provider,
model=final_model,
context_length=task_context,
messages=messages,
max_tokens=max_tokens,
)
if context_err is not None:
raise context_err
return _validate_llm_response( return _validate_llm_response(
await client.chat.completions.create(**kwargs), task) await client.chat.completions.create(**kwargs), task)
except Exception as first_err: except Exception as first_err:
@ -5446,12 +5558,17 @@ async def async_call_llm(
_is_payment_error(first_err) _is_payment_error(first_err)
or _is_connection_error(first_err) or _is_connection_error(first_err)
or _is_rate_limit_error(first_err) or _is_rate_limit_error(first_err)
or _is_context_length_error(first_err)
) )
# Capacity errors (payment/quota/connection) bypass the explicit-provider # Capacity errors (payment/quota/connection) bypass the explicit-provider
# gate — the provider cannot serve the request regardless of user intent. # gate — the provider cannot serve the request regardless of user intent.
# See #26803: daily token quota must fall back like a 402 credit error. # See #26803: daily token quota must fall back like a 402 credit error.
is_auto = resolved_provider in {"auto", "", None} is_auto = resolved_provider in {"auto", "", None}
is_capacity_error = _is_payment_error(first_err) or _is_connection_error(first_err) is_capacity_error = (
_is_payment_error(first_err)
or _is_connection_error(first_err)
or _is_context_length_error(first_err)
)
if should_fallback and (is_auto or is_capacity_error): if should_fallback and (is_auto or is_capacity_error):
if _is_payment_error(first_err): if _is_payment_error(first_err):
reason = "payment error" reason = "payment error"
@ -5460,6 +5577,8 @@ async def async_call_llm(
) )
elif _is_rate_limit_error(first_err): elif _is_rate_limit_error(first_err):
reason = "rate limit" reason = "rate limit"
elif _is_context_length_error(first_err):
reason = "context length"
else: else:
reason = "connection error" reason = "connection error"
logger.info("Auxiliary %s (async): %s on %s (%s), trying fallback", logger.info("Auxiliary %s (async): %s on %s (%s), trying fallback",
@ -5476,7 +5595,8 @@ async def async_call_llm(
resolved_provider, task, reason=reason) resolved_provider, task, reason=reason)
else: else:
fb_client, fb_model, fb_label = _try_configured_fallback_chain( fb_client, fb_model, fb_label = _try_configured_fallback_chain(
task, resolved_provider or "auto", reason=reason) task, resolved_provider or "auto", reason=reason,
failed_model=final_model, messages=messages, max_tokens=max_tokens)
if fb_client is None: if fb_client is None:
fb_client, fb_model, fb_label = _try_main_agent_model_fallback( fb_client, fb_model, fb_label = _try_main_agent_model_fallback(
resolved_provider, task, reason=reason) resolved_provider, task, reason=reason)

View File

@ -580,13 +580,6 @@ class ContextCompressor(ContextEngine):
self.summary_model = summary_model_override or "" self.summary_model = summary_model_override or ""
# Compression-model fallback: set by check_compression_model_feasibility
# when the primary aux compression model fails the minimum context check.
# If set, _generate_summary uses this provider/model for the LLM call
# instead of the main compressor attributes. Dict keys:
# provider, model, base_url, api_key
self._compression_fallback: Optional[Dict[str, str]] = None
# Stores the previous compaction summary for iterative updates # Stores the previous compaction summary for iterative updates
self._previous_summary: Optional[str] = None self._previous_summary: Optional[str] = None
# Anti-thrashing: track whether last compression was effective # Anti-thrashing: track whether last compression was effective
@ -1076,20 +1069,6 @@ The user has requested that this compaction PRIORITISE preserving all informatio
} }
if self.summary_model: if self.summary_model:
call_kwargs["model"] = self.summary_model call_kwargs["model"] = self.summary_model
# Compression-model fallback: when the primary aux compression
# model was rejected for insufficient context, the feasibility
# check stored a replacement provider/model here. Override the
# entire main_runtime so call_llm routes the summary request to
# the fallback provider instead of the main one.
if self._compression_fallback:
_fb = self._compression_fallback
call_kwargs["main_runtime"] = {
"model": _fb["model"],
"provider": _fb["provider"],
"base_url": _fb.get("base_url", ""),
"api_key": _fb.get("api_key", ""),
"api_mode": _fb.get("api_mode", self.api_mode),
}
response = call_llm(**call_kwargs) response = call_llm(**call_kwargs)
content = response.choices[0].message.content content = response.choices[0].message.content
# Handle cases where content is not a string (e.g., dict from llama.cpp) # Handle cases where content is not a string (e.g., dict from llama.cpp)

View File

@ -221,101 +221,9 @@ def check_compression_model_feasibility(agent: Any) -> None:
new_threshold, new_threshold,
) )
except ValueError: except ValueError:
# Primary compression model failed the minimum context check # Hard rejections (aux below minimum context) must propagate
# (context_length < MINIMUM_CONTEXT_LENGTH). Before giving up, # so the session refuses to start.
# try the user's fallback provider chain so a model switch or raise
# provider outage doesn't silently disable compression.
_fallback_chain = getattr(agent, '_fallback_chain', None) or []
_tried = [f"{aux_model} ({_aux_cfg_provider or 'auto'}): {aux_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"]
for _fb_entry in _fallback_chain:
_fb_provider = _fb_entry.get("provider", "")
_fb_model = _fb_entry.get("model", "")
if not _fb_provider or not _fb_model:
continue
try:
from agent.auxiliary_client import resolve_provider_client
_fb_client, _fb_resolved_model = resolve_provider_client(
_fb_provider,
_fb_model,
explicit_base_url=_fb_entry.get("base_url", ""),
explicit_api_key=_fb_entry.get("api_key", ""),
main_runtime=agent._current_main_runtime(),
)
if _fb_client is None or not _fb_resolved_model:
_tried.append(f"{_fb_model} ({_fb_provider}): unavailable")
continue
_fb_base_url = str(getattr(_fb_client, "base_url", ""))
_fb_api_key_raw = getattr(_fb_client, "api_key", "")
_fb_api_key = (
""
if callable(_fb_api_key_raw) and not isinstance(_fb_api_key_raw, str)
else str(_fb_api_key_raw or "")
)
_fb_context = get_model_context_length(
_fb_resolved_model,
base_url=_fb_base_url,
api_key=_fb_api_key,
provider=_fb_provider,
custom_providers=getattr(agent, "_custom_providers", None),
)
if _fb_context and _fb_context < MINIMUM_CONTEXT_LENGTH:
_tried.append(
f"{_fb_resolved_model} ({_fb_provider}): "
f"{_fb_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"
)
continue
# ── Found a suitable fallback ──────────────────────────
logger.warning(
"Compression model %s (%s) has only %d token context "
"(minimum %d). Falling back to %s (%s) with %d token context.",
aux_model, _aux_cfg_provider or "auto", aux_context,
MINIMUM_CONTEXT_LENGTH, _fb_resolved_model, _fb_provider,
_fb_context or 0,
)
agent.context_compressor._compression_fallback = {
"provider": _fb_provider,
"model": _fb_resolved_model,
"base_url": _fb_base_url,
"api_key": _fb_api_key,
}
_msg = (
f"⚠ Compression model {aux_model} has only "
f"{aux_context:,} token context (minimum "
f"{MINIMUM_CONTEXT_LENGTH:,} required). "
f"Falling back to {_fb_resolved_model} ({_fb_provider}) "
f"for summaries."
)
agent._compression_warning = _msg
agent._emit_status(_msg)
return
except Exception as _fb_err:
_tried.append(f"{_fb_model} ({_fb_provider}): {_fb_err}")
continue
# No fallback worked — warn and let compression run without
# summaries (same behavior as 'no auxiliary LLM' above).
_all_tried = "; ".join(_tried)
_msg = (
f"⚠ No suitable compression model available. "
f"Tried: {_all_tried}. "
f"Compression will drop middle turns without summaries. "
f"Run `hermes setup` or set "
f"auxiliary.compression.model in config.yaml."
)
agent._compression_warning = _msg
agent._emit_status(_msg)
logger.warning("Compression model fallback exhausted: %s", _all_tried)
return
except Exception as exc: except Exception as exc:
logger.debug( logger.debug(
"Compression feasibility check failed (non-fatal): %s", exc "Compression feasibility check failed (non-fatal): %s", exc

129
agent/session_stats.py Normal file
View File

@ -0,0 +1,129 @@
"""Session telemetry collectors for the /stats dashboard."""
from __future__ import annotations
import logging
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_TOKEN_FIELDS = (
"input_tokens",
"output_tokens",
"cache_read_tokens",
"cache_write_tokens",
"reasoning_tokens",
)
def _coerce_int(value: Any, default: int = 0) -> int:
try:
return int(value or 0)
except (TypeError, ValueError):
return default
def _sum_tokens(row: Any) -> int:
if not row:
return 0
total = 0
for field in _TOKEN_FIELDS:
try:
value = row.get(field) if hasattr(row, "get") else row[field]
except Exception:
value = 0
total += _coerce_int(value)
return total
def _query_one(session_db: Any, sql: str, params: tuple = ()) -> Optional[dict]:
conn = getattr(session_db, "_conn", None)
if conn is None:
return None
cur = conn.execute(sql, params)
row = cur.fetchone()
return dict(row) if row is not None else None
def collect_context_stats(*, agent: Any = None, session_db: Any = None, session_id: str | None = None) -> Dict[str, Any]:
"""Return current model/provider/context telemetry from live agent + SessionDB."""
model = getattr(agent, "model", None) or "unknown"
provider = getattr(agent, "provider", None) or "unknown"
context_length = _coerce_int(getattr(getattr(agent, "context_compressor", None), "context_length", 0))
threshold_tokens = _coerce_int(getattr(getattr(agent, "context_compressor", None), "threshold_tokens", 0))
total_tokens = _coerce_int(getattr(agent, "session_total_tokens", 0))
if session_db is not None and session_id and total_tokens <= 0:
try:
row = session_db.get_session(session_id)
total_tokens = _sum_tokens(row)
if row and (model == "unknown"):
model = row.get("model") or model
except Exception as exc:
logger.debug("Failed to read current session token totals: %s", exc, exc_info=True)
usage_percent = (total_tokens / context_length * 100.0) if context_length else None
fallback_chain = []
for entry in getattr(agent, "_fallback_chain", []) or []:
if isinstance(entry, dict):
fb_provider = str(entry.get("provider") or "").strip()
fb_model = str(entry.get("model") or "").strip()
if fb_provider or fb_model:
fallback_chain.append({"provider": fb_provider, "model": fb_model})
return {
"model": model,
"provider": provider,
"context_length": context_length,
"threshold_tokens": threshold_tokens,
"total_tokens": total_tokens,
"usage_percent": usage_percent,
"fallback_chain": fallback_chain,
}
def collect_semantic_rle_stats(session_db: Any = None) -> Dict[str, Any]:
"""Approximate compression/RLE savings from real SessionDB compression chains.
Hermes persists context-compression continuations as sessions whose parent
ended with ``end_reason='compression'``. We derive counts and token deltas
from those persisted parent/child rows instead of inventing counters.
"""
if session_db is None or getattr(session_db, "_conn", None) is None:
return {"sessions_compressed": 0, "compression_ratio": None, "avg_tokens_saved": 0, "source": "SessionDB unavailable"}
try:
row = _query_one(
session_db,
"""
SELECT COUNT(*) AS n,
COALESCE(SUM(input_tokens + output_tokens + cache_read_tokens + cache_write_tokens + reasoning_tokens), 0) AS parent_tokens
FROM sessions
WHERE end_reason = 'compression'
""",
) or {}
compressed = _coerce_int(row.get("n"))
parent_tokens = _coerce_int(row.get("parent_tokens"))
child = _query_one(
session_db,
"""
SELECT COALESCE(SUM(c.input_tokens + c.output_tokens + c.cache_read_tokens + c.cache_write_tokens + c.reasoning_tokens), 0) AS child_tokens
FROM sessions p
JOIN sessions c ON c.parent_session_id = p.id
WHERE p.end_reason = 'compression'
""",
) or {}
child_tokens = _coerce_int(child.get("child_tokens"))
except Exception as exc:
logger.debug("Failed to collect compression stats: %s", exc, exc_info=True)
return {"sessions_compressed": 0, "compression_ratio": None, "avg_tokens_saved": 0, "source": "SessionDB query failed"}
saved = max(parent_tokens - child_tokens, 0)
ratio = (child_tokens / parent_tokens) if parent_tokens else None
return {
"sessions_compressed": compressed,
"compression_ratio": ratio,
"avg_tokens_saved": int(saved / compressed) if compressed else 0,
"source": "SessionDB compression chains",
}

84
agent/skill_stats.py Normal file
View File

@ -0,0 +1,84 @@
"""Skill and curator telemetry collectors for the /stats dashboard."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List
from hermes_constants import get_hermes_home
from tools.skill_usage import (
STATE_ARCHIVED,
activity_count,
latest_activity_at,
load_usage,
)
def _parse_dt(value: Any):
if not value:
return None
try:
dt = datetime.fromisoformat(str(value))
except (TypeError, ValueError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def _activity_count(record: Dict[str, Any]) -> int:
return activity_count(record)
def collect_skill_stats(limit: int = 5) -> Dict[str, Any]:
usage = load_usage()
rows: List[Dict[str, Any]] = []
for name, record in usage.items():
if not isinstance(record, dict):
continue
count = _activity_count(record)
rows.append({
"name": str(name),
"activity_count": count,
"use_count": int(record.get("use_count") or 0),
"view_count": int(record.get("view_count") or 0),
"patch_count": int(record.get("patch_count") or 0),
"last_activity_at": latest_activity_at(record),
"state": record.get("state") or "active",
})
rows.sort(key=lambda r: (r["activity_count"], r["name"]), reverse=True)
return {"top_skills": rows[:limit], "usage_records": len(rows)}
def collect_curator_prunes(days: int = 7, limit: int = 3) -> Dict[str, Any]:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
usage = load_usage()
archived = []
for name, record in usage.items():
if not isinstance(record, dict):
continue
if record.get("state") != STATE_ARCHIVED:
continue
ts = record.get("archived_at") or record.get("last_patched_at") or record.get("created_at")
dt = _parse_dt(ts)
if dt is not None and dt < cutoff:
continue
archived.append({"name": str(name), "archived_at": ts})
# Also inspect the archive directory so manually restored/old usage sidecars
# still have a real filesystem source for the dashboard.
archive_dir = get_hermes_home() / "skills" / ".archive"
if archive_dir.exists():
for path in archive_dir.iterdir():
if not path.is_dir():
continue
try:
dt = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc)
except OSError:
continue
if dt >= cutoff and not any(row["name"] == path.name for row in archived):
archived.append({"name": path.name, "archived_at": dt.isoformat()})
archived.sort(key=lambda r: str(r.get("archived_at") or ""), reverse=True)
return {"recent_prunes": archived[:limit], "recent_prune_count": len(archived), "days": days}

76
agent/stats_dashboard.py Normal file
View File

@ -0,0 +1,76 @@
"""Renderer for the Telegram-friendly /stats dashboard."""
from __future__ import annotations
from typing import Any
from agent.session_stats import collect_context_stats, collect_semantic_rle_stats
from agent.skill_stats import collect_curator_prunes, collect_skill_stats
from agent.system_health import collect_system_health
def _fmt_int(value: Any) -> str:
try:
return f"{int(value):,}"
except (TypeError, ValueError):
return "0"
def _fmt_pct(value: Any) -> str:
if value is None:
return "unknown"
try:
return f"{float(value):.1f}%"
except (TypeError, ValueError):
return "unknown"
def _fallback_text(chain: list[dict]) -> str:
if not chain:
return "none"
return "".join(
f"{item.get('model') or '?'} ({item.get('provider') or '?'})"
for item in chain
)
def format_stats_dashboard(*, agent: Any = None, session_db: Any = None, session_id: str | None = None, started_at: Any = None, start_monotonic: float | None = None) -> str:
context = collect_context_stats(agent=agent, session_db=session_db, session_id=session_id)
rle = collect_semantic_rle_stats(session_db=session_db)
skills = collect_skill_stats(limit=5)
prunes = collect_curator_prunes(days=7, limit=3)
health = collect_system_health(started_at=started_at, start_monotonic=start_monotonic)
cron = health.get("cron") or {}
lines = [
"📊 Hermes stats",
"",
f"• Model: {context['model']} ({context['provider']})",
f"• Fallback: {_fallback_text(context.get('fallback_chain') or [])}",
f"• Context: {_fmt_int(context.get('total_tokens'))}/{_fmt_int(context.get('context_length'))} tokens ({_fmt_pct(context.get('usage_percent'))})",
f"• Semantic RLE: {rle.get('sessions_compressed', 0)} sessions · ratio {_fmt_pct((rle.get('compression_ratio') or 0) * 100 if rle.get('compression_ratio') is not None else None)} · avg saved {_fmt_int(rle.get('avg_tokens_saved'))} tokens",
"",
"• Top skills:",
]
top = skills.get("top_skills") or []
if top:
for row in top[:5]:
lines.append(f" - {row['name']}: {row['activity_count']} activity ({row['use_count']} use / {row['view_count']} view / {row['patch_count']} patch)")
else:
lines.append(" - no skill usage telemetry yet")
lines.append("• Gardener prunes (7d):")
recent_prunes = prunes.get("recent_prunes") or []
if recent_prunes:
for row in recent_prunes[:3]:
stamp = str(row.get("archived_at") or "unknown").split("T", 1)[0]
lines.append(f" - {row.get('name')}: {stamp}")
else:
lines.append(" - none")
lines.extend([
f"• Nightly/cron 24h: {cron.get('runs', 0)} runs · {cron.get('ok', 0)} ok · {cron.get('error', 0)} errors · {cron.get('health_checks', 0)} health checks",
f"• Uptime/version: {health.get('uptime')} · v{health.get('version')} · pid {health.get('pid')}",
])
return "\n".join(lines)

90
agent/system_health.py Normal file
View File

@ -0,0 +1,90 @@
"""System health and cron telemetry collectors for /stats."""
from __future__ import annotations
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
def _parse_dt(value: Any):
if not value:
return None
try:
dt = datetime.fromisoformat(str(value))
except (TypeError, ValueError):
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def format_duration(seconds: int | float | None) -> str:
if seconds is None:
return "unknown"
seconds = max(0, int(seconds))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, _ = divmod(rem, 60)
if days:
return f"{days}d {hours}h"
if hours:
return f"{hours}h {minutes}m"
return f"{minutes}m"
def collect_cron_activity(hours: int = 24) -> Dict[str, Any]:
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
try:
from cron.jobs import list_jobs
jobs = list_jobs(include_disabled=True)
except Exception:
jobs = []
recent = []
ok = error = health_checks = 0
for job in jobs:
if not isinstance(job, dict):
continue
dt = _parse_dt(job.get("last_run_at"))
if dt is None or dt < cutoff:
continue
status = str(job.get("last_status") or "unknown")
if status == "ok":
ok += 1
elif status == "error":
error += 1
haystack = " ".join(str(job.get(k) or "") for k in ("name", "prompt", "id")).lower()
if "health" in haystack or "doctor" in haystack:
health_checks += 1
recent.append({"id": job.get("id"), "name": job.get("name"), "status": status, "last_run_at": job.get("last_run_at")})
recent.sort(key=lambda r: str(r.get("last_run_at") or ""), reverse=True)
return {"hours": hours, "runs": len(recent), "ok": ok, "error": error, "health_checks": health_checks, "recent": recent[:5]}
def collect_system_health(*, started_at: Any = None, start_monotonic: float | None = None) -> Dict[str, Any]:
try:
from hermes_cli import __version__ as version
except Exception:
version = "unknown"
uptime_seconds = None
if start_monotonic is not None:
uptime_seconds = time.monotonic() - float(start_monotonic)
else:
dt = started_at
if isinstance(dt, (int, float)):
uptime_seconds = time.time() - float(dt)
elif isinstance(dt, datetime):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
uptime_seconds = (datetime.now(timezone.utc) - dt).total_seconds()
return {
"version": version,
"pid": os.getpid(),
"uptime_seconds": int(uptime_seconds) if uptime_seconds is not None else None,
"uptime": format_duration(uptime_seconds),
"cron": collect_cron_activity(hours=24),
}

20
cli.py
View File

@ -5988,6 +5988,24 @@ class HermesCLI:
self._console_print("\n".join(lines), highlight=False, markup=False) self._console_print("\n".join(lines), highlight=False, markup=False)
def _handle_stats_command(self):
"""Show comprehensive system stats — model, skills, curator, cron, uptime."""
try:
from agent.stats_dashboard import format_stats_dashboard
except ImportError as exc:
self._console_print(f"stats module unavailable: {exc}", highlight=False, markup=False)
return
agent = getattr(self, "agent", None)
uptime_start = getattr(self, "session_start", None)
dashboard = format_stats_dashboard(
agent=agent,
session_db=getattr(self, "_session_db", None),
session_id=self.session_id,
started_at=uptime_start,
)
self._console_print(dashboard, highlight=False, markup=False)
def _fast_command_available(self) -> bool: def _fast_command_available(self) -> bool:
try: try:
from hermes_cli.models import model_supports_fast_mode from hermes_cli.models import model_supports_fast_mode
@ -8489,6 +8507,8 @@ class HermesCLI:
self._handle_skills_command(cmd_original) self._handle_skills_command(cmd_original)
elif canonical == "platforms": elif canonical == "platforms":
self._show_gateway_status() self._show_gateway_status()
elif canonical == "stats":
self._handle_stats_command()
elif canonical == "status": elif canonical == "status":
self._show_session_status() self._show_session_status()
elif canonical == "statusbar": elif canonical == "statusbar":

View File

@ -7470,6 +7470,9 @@ class GatewayRunner:
if canonical == "status": if canonical == "status":
return await self._handle_status_command(event) return await self._handle_status_command(event)
if canonical == "stats":
return await self._handle_stats_command(event)
if canonical == "agents": if canonical == "agents":
return await self._handle_agents_command(event) return await self._handle_agents_command(event)
@ -9707,6 +9710,30 @@ class GatewayRunner:
return "\n".join(lines) return "\n".join(lines)
async def _handle_stats_command(self, event: MessageEvent) -> str:
"""Handle /stats command — comprehensive system telemetry dashboard."""
try:
from agent.stats_dashboard import format_stats_dashboard
except ImportError as exc:
logger.debug("stats module unavailable: %s", exc)
return "stats module unavailable"
source = event.source
session_entry = self.session_store.get_or_create_session(source)
session_key = self._session_key_for_source(source)
# Try to get the running agent for this session
agent = self._running_agents.get(session_key)
started_at = self._running_agents_ts.get(session_key)
dashboard = format_stats_dashboard(
agent=agent,
session_db=self._session_db,
session_id=session_entry.session_id,
started_at=started_at,
)
return dashboard
async def _handle_agents_command(self, event: MessageEvent) -> str: async def _handle_agents_command(self, event: MessageEvent) -> str:
"""Handle /agents command - list active agents and running tasks.""" """Handle /agents command - list active agents and running tasks."""
from tools.process_registry import format_uptime_short, process_registry from tools.process_registry import format_uptime_short, process_registry

View File

@ -107,6 +107,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
CommandDef("subgoal", "Add or manage extra criteria on the active goal", "Session", CommandDef("subgoal", "Add or manage extra criteria on the active goal", "Session",
args_hint="[text | remove N | clear]"), args_hint="[text | remove N | clear]"),
CommandDef("status", "Show session info", "Session"), CommandDef("status", "Show session info", "Session"),
CommandDef("stats", "Show comprehensive system stats — model, skills, curator, cron, uptime", "Info"),
CommandDef("whoami", "Show your slash command access (admin / user)", "Info"), CommandDef("whoami", "Show your slash command access (admin / user)", "Info"),
CommandDef("profile", "Show active profile name and home directory", "Info"), CommandDef("profile", "Show active profile name and home directory", "Info"),
CommandDef("sethome", "Set this chat as the home channel", "Session", CommandDef("sethome", "Set this chat as the home channel", "Session",

View File

@ -1402,6 +1402,83 @@ class TestAuxiliaryFallbackLayering:
assert main_client.chat.completions.create.called assert main_client.chat.completions.create.called
def test_context_length_failure_uses_configured_chain_same_provider_different_model(self, monkeypatch):
"""Local auxiliary context_length failure should self-heal via fallback_chain."""
monkeypatch.setenv("OPENCODE_GO_API_KEY", "go-key")
primary_client = MagicMock()
chain_client = MagicMock()
chain_client.chat.completions.create.return_value = MagicMock(choices=[
MagicMock(message=MagicMock(content="from opencode gpt fallback"))
])
task_cfg = {
"provider": "opencode_go",
"model": "deepseek-v4-pro",
"context_length": 100,
"fallback_chain": [
{"provider": "opencode_go", "model": "gpt-5.5", "context_length": 10000},
],
}
def resolve_single(provider, model=None, base_url=None, api_key=None):
assert provider == "opencode_go"
assert model == "gpt-5.5"
return chain_client
with patch("agent.auxiliary_client._get_cached_client",
return_value=(primary_client, "deepseek-v4-pro")), \
patch("agent.auxiliary_client._resolve_task_provider_model",
return_value=("opencode_go", "deepseek-v4-pro", None, None, None)), \
patch("agent.auxiliary_client._get_auxiliary_task_config",
return_value=task_cfg), \
patch("agent.auxiliary_client._resolve_single_provider",
side_effect=resolve_single), \
patch("agent.auxiliary_client._try_main_agent_model_fallback") as main_fb:
result = call_llm(
task="compression",
messages=[{"role": "user", "content": "x" * 1000}],
max_tokens=2000,
)
primary_client.chat.completions.create.assert_not_called()
chain_client.chat.completions.create.assert_called_once()
main_fb.assert_not_called()
assert result.choices[0].message.content == "from opencode gpt fallback"
def test_configured_chain_skips_too_small_fallback_context(self):
"""fallback_chain should continue past entries that cannot fit the request."""
from agent.auxiliary_client import _try_configured_fallback_chain
too_small = MagicMock()
fits = MagicMock()
task_cfg = {
"fallback_chain": [
{"provider": "custom", "model": "tiny", "context_length": 100},
{"provider": "custom", "model": "gemma-local", "context_length": 10000},
]
}
def resolve_single(provider, model=None, base_url=None, api_key=None):
return too_small if model == "tiny" else fits
with patch("agent.auxiliary_client._get_auxiliary_task_config",
return_value=task_cfg), \
patch("agent.auxiliary_client._resolve_single_provider",
side_effect=resolve_single):
client, model, label = _try_configured_fallback_chain(
"compression",
"opencode_go",
reason="context length",
failed_model="deepseek-v4-pro",
messages=[{"role": "user", "content": "x" * 1000}],
max_tokens=2000,
)
assert client is fits
assert model == "gemma-local"
assert label == "fallback_chain[1](custom)"
def test_warning_emitted_when_all_fallbacks_exhausted(self, monkeypatch, caplog): def test_warning_emitted_when_all_fallbacks_exhausted(self, monkeypatch, caplog):
"""When chain AND main model both fail, a user-visible warning fires before re-raise.""" """When chain AND main model both fail, a user-visible warning fires before re-raise."""
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key") monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")

View File

@ -57,7 +57,6 @@ def _make_agent(
compressor = MagicMock(spec=ContextCompressor) compressor = MagicMock(spec=ContextCompressor)
compressor.context_length = main_context compressor.context_length = main_context
compressor.threshold_tokens = int(main_context * threshold_percent) compressor.threshold_tokens = int(main_context * threshold_percent)
compressor._compression_fallback = None
agent.context_compressor = compressor agent.context_compressor = compressor
return agent return agent
@ -102,169 +101,24 @@ def test_auto_corrects_threshold_when_aux_context_below_threshold(mock_get_clien
@patch("agent.model_metadata.get_model_context_length", return_value=32_768) @patch("agent.model_metadata.get_model_context_length", return_value=32_768)
@patch("agent.auxiliary_client.get_text_auxiliary_client") @patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_rejects_aux_below_minimum_context(mock_get_client, mock_ctx_len): def test_rejects_aux_below_minimum_context(mock_get_client, mock_ctx_len):
"""When aux context < MINIMUM_CONTEXT_LENGTH (64K) and no fallback """Hard floor: aux context < MINIMUM_CONTEXT_LENGTH (64K) → session
providers are configured, a warning is emitted and compression will refuses to start (ValueError), mirroring the main-model rejection."""
operate without summaries. Previously this raised ValueError; now it
degrades gracefully so a model switch doesn't kill the session."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50) agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_client = MagicMock() mock_client = MagicMock()
mock_client.base_url = "https://openrouter.ai/api/v1" mock_client.base_url = "https://openrouter.ai/api/v1"
mock_client.api_key = "sk-aux" mock_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_client, "tiny-aux-model") mock_get_client.return_value = (mock_client, "tiny-aux-model")
messages = [] agent._emit_status = lambda msg: None
agent._emit_status = lambda msg: messages.append(msg)
# No fallback chain → should warn, not raise with pytest.raises(ValueError) as exc_info:
agent._fallback_chain = []
agent._check_compression_model_feasibility()
assert len(messages) == 1
assert "No suitable compression model" in messages[0]
assert "tiny-aux-model" in messages[0]
assert "32,768" in messages[0]
assert "64,000" in messages[0]
assert agent._compression_warning is not None
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_falls_back_to_chain_when_aux_below_minimum(mock_get_client, mock_ctx_len):
"""When the primary aux model fails the context-length floor, the
feasibility check tries each fallback provider in order, using the
first one that meets MINIMUM_CONTEXT_LENGTH."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
# Primary aux model: too small (32K)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny-aux-model")
# Fallback chain: two providers, first one meets the floor
agent._fallback_chain = [
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
{"provider": "custom", "model": "gemma-local",
"base_url": "http://127.0.0.1:8081/v1", "api_key": "no-key"},
]
# Mock resolve_provider_client for the fallback resolution
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://api.opencode.ai/v1"
mock_fb_client.api_key = "sk-fallback"
# get_model_context_length: first return 32K (primary fail),
# then return 128K (fallback success)
mock_ctx_len.side_effect = [32_768, 128_000]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
with patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_fb_client, "deepseek-v4-pro")) as mock_resolve:
agent._check_compression_model_feasibility() agent._check_compression_model_feasibility()
# Should have resolved the fallback provider err = str(exc_info.value)
mock_resolve.assert_called_once() assert "tiny-aux-model" in err
# First two positional args: provider, model assert "32,768" in err
assert mock_resolve.call_args[0][0] == "opencode_go" assert "64,000" in err
assert mock_resolve.call_args[0][1] == "deepseek-v4-pro" assert "below the minimum" in err
# Warning should mention the fallback choice
assert len(messages) == 1
assert "Falling back to" in messages[0]
assert "deepseek-v4-pro" in messages[0]
assert "opencode_go" in messages[0]
# Fallback dict stored on compressor
fb = agent.context_compressor._compression_fallback
assert fb is not None
assert fb["provider"] == "opencode_go"
assert fb["model"] == "deepseek-v4-pro"
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_falls_back_past_unavailable_provider(mock_get_client, mock_ctx_len):
"""When the first fallback provider is unavailable, skip it and
try the next one."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny")
# Fallback chain: first unavailable, second works
agent._fallback_chain = [
{"provider": "broken-provider", "model": "broken-model"},
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
]
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://api.opencode.ai/v1"
mock_fb_client.api_key = "sk-fallback"
# Primary: 32K (fail), broken-provider: unavailable, opencode_go: 128K
mock_ctx_len.side_effect = [32_768, None, 128_000]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
# First resolve returns None (unavailable), second returns client
mock_resolve_values = [(None, None), (mock_fb_client, "deepseek-v4-pro")]
with patch("agent.auxiliary_client.resolve_provider_client",
side_effect=mock_resolve_values) as mock_resolve:
agent._check_compression_model_feasibility()
# Should have tried both fallbacks
assert mock_resolve.call_count == 2
# Should succeed with the second fallback
fb = agent.context_compressor._compression_fallback
assert fb is not None
assert fb["provider"] == "opencode_go"
@patch("agent.model_metadata.get_model_context_length")
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_warns_when_all_fallbacks_exhausted(mock_get_client, mock_ctx_len):
"""When every fallback provider also fails the context floor or is
unavailable, emit a warning and degrade to no-summary mode without
raising."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
mock_primary_client = MagicMock()
mock_primary_client.base_url = "https://openrouter.ai/api/v1"
mock_primary_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_primary_client, "tiny-main")
agent._fallback_chain = [
{"provider": "small-provider", "model": "small-model"},
]
# Fallback also too small
mock_fb_client = MagicMock()
mock_fb_client.base_url = "https://small.api/v1"
mock_fb_client.api_key = "sk-small"
mock_ctx_len.side_effect = [32_768, 16_384]
messages = []
agent._emit_status = lambda msg: messages.append(msg)
# Mock compressor won't have _compression_fallback until set —
# initialize it so the final assertion works.
agent.context_compressor._compression_fallback = None
with patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_fb_client, "small-model")):
agent._check_compression_model_feasibility()
assert len(messages) == 1
assert "No suitable compression model" in messages[0]
assert "small-model" in messages[0]
assert agent._compression_warning is not None
# No fallback on compressor
assert agent.context_compressor._compression_fallback is None
@patch("agent.model_metadata.get_model_context_length", return_value=200_000) @patch("agent.model_metadata.get_model_context_length", return_value=200_000)