From f936968c5c9b92ea8ea70fddcc029ffb186f3ed5 Mon Sep 17 00:00:00 2001 From: Anton Palgunov Date: Fri, 29 May 2026 16:31:33 +0000 Subject: [PATCH] feat: telemetry dashboard /stats command with CLI + gateway handlers - agent/session_stats.py: SessionDB context/compression metrics - agent/skill_stats.py: curator usage.json reader + prune history - agent/system_health.py: gateway uptime, version, cron activity - agent/stats_dashboard.py: Telegram-friendly bullet renderer - cli.py: /stats dispatch + _handle_stats_command method - gateway/run.py: /stats dispatch + _handle_stats_command for messaging platforms - hermes_cli/commands.py: /stats CommandDef registration --- agent/session_stats.py | 129 +++++++++++++++++++++++++++++++++++++++ agent/skill_stats.py | 84 +++++++++++++++++++++++++ agent/stats_dashboard.py | 76 +++++++++++++++++++++++ agent/system_health.py | 90 +++++++++++++++++++++++++++ cli.py | 20 ++++++ gateway/run.py | 27 ++++++++ hermes_cli/commands.py | 1 + 7 files changed, 427 insertions(+) create mode 100644 agent/session_stats.py create mode 100644 agent/skill_stats.py create mode 100644 agent/stats_dashboard.py create mode 100644 agent/system_health.py diff --git a/agent/session_stats.py b/agent/session_stats.py new file mode 100644 index 000000000..2bcb8ea4a --- /dev/null +++ b/agent/session_stats.py @@ -0,0 +1,129 @@ +"""Session telemetry collectors for the /stats dashboard.""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + +_TOKEN_FIELDS = ( + "input_tokens", + "output_tokens", + "cache_read_tokens", + "cache_write_tokens", + "reasoning_tokens", +) + + +def _coerce_int(value: Any, default: int = 0) -> int: + try: + return int(value or 0) + except (TypeError, ValueError): + return default + + +def _sum_tokens(row: Any) -> int: + if not row: + return 0 + total = 0 + for field in _TOKEN_FIELDS: + try: + value = row.get(field) if hasattr(row, "get") else row[field] + except Exception: + value = 0 + total += _coerce_int(value) + return total + + +def _query_one(session_db: Any, sql: str, params: tuple = ()) -> Optional[dict]: + conn = getattr(session_db, "_conn", None) + if conn is None: + return None + cur = conn.execute(sql, params) + row = cur.fetchone() + return dict(row) if row is not None else None + + +def collect_context_stats(*, agent: Any = None, session_db: Any = None, session_id: str | None = None) -> Dict[str, Any]: + """Return current model/provider/context telemetry from live agent + SessionDB.""" + model = getattr(agent, "model", None) or "unknown" + provider = getattr(agent, "provider", None) or "unknown" + context_length = _coerce_int(getattr(getattr(agent, "context_compressor", None), "context_length", 0)) + threshold_tokens = _coerce_int(getattr(getattr(agent, "context_compressor", None), "threshold_tokens", 0)) + total_tokens = _coerce_int(getattr(agent, "session_total_tokens", 0)) + + if session_db is not None and session_id and total_tokens <= 0: + try: + row = session_db.get_session(session_id) + total_tokens = _sum_tokens(row) + if row and (model == "unknown"): + model = row.get("model") or model + except Exception as exc: + logger.debug("Failed to read current session token totals: %s", exc, exc_info=True) + + usage_percent = (total_tokens / context_length * 100.0) if context_length else None + fallback_chain = [] + for entry in getattr(agent, "_fallback_chain", []) or []: + if isinstance(entry, dict): + fb_provider = str(entry.get("provider") or "").strip() + fb_model = str(entry.get("model") or "").strip() + if fb_provider or fb_model: + fallback_chain.append({"provider": fb_provider, "model": fb_model}) + + return { + "model": model, + "provider": provider, + "context_length": context_length, + "threshold_tokens": threshold_tokens, + "total_tokens": total_tokens, + "usage_percent": usage_percent, + "fallback_chain": fallback_chain, + } + + +def collect_semantic_rle_stats(session_db: Any = None) -> Dict[str, Any]: + """Approximate compression/RLE savings from real SessionDB compression chains. + + Hermes persists context-compression continuations as sessions whose parent + ended with ``end_reason='compression'``. We derive counts and token deltas + from those persisted parent/child rows instead of inventing counters. + """ + if session_db is None or getattr(session_db, "_conn", None) is None: + return {"sessions_compressed": 0, "compression_ratio": None, "avg_tokens_saved": 0, "source": "SessionDB unavailable"} + + try: + row = _query_one( + session_db, + """ + SELECT COUNT(*) AS n, + COALESCE(SUM(input_tokens + output_tokens + cache_read_tokens + cache_write_tokens + reasoning_tokens), 0) AS parent_tokens + FROM sessions + WHERE end_reason = 'compression' + """, + ) or {} + compressed = _coerce_int(row.get("n")) + parent_tokens = _coerce_int(row.get("parent_tokens")) + + child = _query_one( + session_db, + """ + SELECT COALESCE(SUM(c.input_tokens + c.output_tokens + c.cache_read_tokens + c.cache_write_tokens + c.reasoning_tokens), 0) AS child_tokens + FROM sessions p + JOIN sessions c ON c.parent_session_id = p.id + WHERE p.end_reason = 'compression' + """, + ) or {} + child_tokens = _coerce_int(child.get("child_tokens")) + except Exception as exc: + logger.debug("Failed to collect compression stats: %s", exc, exc_info=True) + return {"sessions_compressed": 0, "compression_ratio": None, "avg_tokens_saved": 0, "source": "SessionDB query failed"} + + saved = max(parent_tokens - child_tokens, 0) + ratio = (child_tokens / parent_tokens) if parent_tokens else None + return { + "sessions_compressed": compressed, + "compression_ratio": ratio, + "avg_tokens_saved": int(saved / compressed) if compressed else 0, + "source": "SessionDB compression chains", + } diff --git a/agent/skill_stats.py b/agent/skill_stats.py new file mode 100644 index 000000000..50c02f84a --- /dev/null +++ b/agent/skill_stats.py @@ -0,0 +1,84 @@ +"""Skill and curator telemetry collectors for the /stats dashboard.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict, List + +from hermes_constants import get_hermes_home +from tools.skill_usage import ( + STATE_ARCHIVED, + activity_count, + latest_activity_at, + load_usage, +) + + +def _parse_dt(value: Any): + if not value: + return None + try: + dt = datetime.fromisoformat(str(value)) + except (TypeError, ValueError): + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def _activity_count(record: Dict[str, Any]) -> int: + return activity_count(record) + + +def collect_skill_stats(limit: int = 5) -> Dict[str, Any]: + usage = load_usage() + rows: List[Dict[str, Any]] = [] + for name, record in usage.items(): + if not isinstance(record, dict): + continue + count = _activity_count(record) + rows.append({ + "name": str(name), + "activity_count": count, + "use_count": int(record.get("use_count") or 0), + "view_count": int(record.get("view_count") or 0), + "patch_count": int(record.get("patch_count") or 0), + "last_activity_at": latest_activity_at(record), + "state": record.get("state") or "active", + }) + rows.sort(key=lambda r: (r["activity_count"], r["name"]), reverse=True) + return {"top_skills": rows[:limit], "usage_records": len(rows)} + + +def collect_curator_prunes(days: int = 7, limit: int = 3) -> Dict[str, Any]: + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + usage = load_usage() + archived = [] + for name, record in usage.items(): + if not isinstance(record, dict): + continue + if record.get("state") != STATE_ARCHIVED: + continue + ts = record.get("archived_at") or record.get("last_patched_at") or record.get("created_at") + dt = _parse_dt(ts) + if dt is not None and dt < cutoff: + continue + archived.append({"name": str(name), "archived_at": ts}) + + # Also inspect the archive directory so manually restored/old usage sidecars + # still have a real filesystem source for the dashboard. + archive_dir = get_hermes_home() / "skills" / ".archive" + if archive_dir.exists(): + for path in archive_dir.iterdir(): + if not path.is_dir(): + continue + try: + dt = datetime.fromtimestamp(path.stat().st_mtime, timezone.utc) + except OSError: + continue + if dt >= cutoff and not any(row["name"] == path.name for row in archived): + archived.append({"name": path.name, "archived_at": dt.isoformat()}) + + archived.sort(key=lambda r: str(r.get("archived_at") or ""), reverse=True) + return {"recent_prunes": archived[:limit], "recent_prune_count": len(archived), "days": days} diff --git a/agent/stats_dashboard.py b/agent/stats_dashboard.py new file mode 100644 index 000000000..d216fc8bc --- /dev/null +++ b/agent/stats_dashboard.py @@ -0,0 +1,76 @@ +"""Renderer for the Telegram-friendly /stats dashboard.""" + +from __future__ import annotations + +from typing import Any + +from agent.session_stats import collect_context_stats, collect_semantic_rle_stats +from agent.skill_stats import collect_curator_prunes, collect_skill_stats +from agent.system_health import collect_system_health + + +def _fmt_int(value: Any) -> str: + try: + return f"{int(value):,}" + except (TypeError, ValueError): + return "0" + + +def _fmt_pct(value: Any) -> str: + if value is None: + return "unknown" + try: + return f"{float(value):.1f}%" + except (TypeError, ValueError): + return "unknown" + + +def _fallback_text(chain: list[dict]) -> str: + if not chain: + return "none" + return " → ".join( + f"{item.get('model') or '?'} ({item.get('provider') or '?'})" + for item in chain + ) + + +def format_stats_dashboard(*, agent: Any = None, session_db: Any = None, session_id: str | None = None, started_at: Any = None, start_monotonic: float | None = None) -> str: + context = collect_context_stats(agent=agent, session_db=session_db, session_id=session_id) + rle = collect_semantic_rle_stats(session_db=session_db) + skills = collect_skill_stats(limit=5) + prunes = collect_curator_prunes(days=7, limit=3) + health = collect_system_health(started_at=started_at, start_monotonic=start_monotonic) + cron = health.get("cron") or {} + + lines = [ + "📊 Hermes stats", + "", + f"• Model: {context['model']} ({context['provider']})", + f"• Fallback: {_fallback_text(context.get('fallback_chain') or [])}", + f"• Context: {_fmt_int(context.get('total_tokens'))}/{_fmt_int(context.get('context_length'))} tokens ({_fmt_pct(context.get('usage_percent'))})", + f"• Semantic RLE: {rle.get('sessions_compressed', 0)} sessions · ratio {_fmt_pct((rle.get('compression_ratio') or 0) * 100 if rle.get('compression_ratio') is not None else None)} · avg saved {_fmt_int(rle.get('avg_tokens_saved'))} tokens", + "", + "• Top skills:", + ] + + top = skills.get("top_skills") or [] + if top: + for row in top[:5]: + lines.append(f" - {row['name']}: {row['activity_count']} activity ({row['use_count']} use / {row['view_count']} view / {row['patch_count']} patch)") + else: + lines.append(" - no skill usage telemetry yet") + + lines.append("• Gardener prunes (7d):") + recent_prunes = prunes.get("recent_prunes") or [] + if recent_prunes: + for row in recent_prunes[:3]: + stamp = str(row.get("archived_at") or "unknown").split("T", 1)[0] + lines.append(f" - {row.get('name')}: {stamp}") + else: + lines.append(" - none") + + lines.extend([ + f"• Nightly/cron 24h: {cron.get('runs', 0)} runs · {cron.get('ok', 0)} ok · {cron.get('error', 0)} errors · {cron.get('health_checks', 0)} health checks", + f"• Uptime/version: {health.get('uptime')} · v{health.get('version')} · pid {health.get('pid')}", + ]) + return "\n".join(lines) diff --git a/agent/system_health.py b/agent/system_health.py new file mode 100644 index 000000000..032e7760a --- /dev/null +++ b/agent/system_health.py @@ -0,0 +1,90 @@ +"""System health and cron telemetry collectors for /stats.""" + +from __future__ import annotations + +import os +import time +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + + +def _parse_dt(value: Any): + if not value: + return None + try: + dt = datetime.fromisoformat(str(value)) + except (TypeError, ValueError): + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def format_duration(seconds: int | float | None) -> str: + if seconds is None: + return "unknown" + seconds = max(0, int(seconds)) + days, rem = divmod(seconds, 86400) + hours, rem = divmod(rem, 3600) + minutes, _ = divmod(rem, 60) + if days: + return f"{days}d {hours}h" + if hours: + return f"{hours}h {minutes}m" + return f"{minutes}m" + + +def collect_cron_activity(hours: int = 24) -> Dict[str, Any]: + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + try: + from cron.jobs import list_jobs + jobs = list_jobs(include_disabled=True) + except Exception: + jobs = [] + + recent = [] + ok = error = health_checks = 0 + for job in jobs: + if not isinstance(job, dict): + continue + dt = _parse_dt(job.get("last_run_at")) + if dt is None or dt < cutoff: + continue + status = str(job.get("last_status") or "unknown") + if status == "ok": + ok += 1 + elif status == "error": + error += 1 + haystack = " ".join(str(job.get(k) or "") for k in ("name", "prompt", "id")).lower() + if "health" in haystack or "doctor" in haystack: + health_checks += 1 + recent.append({"id": job.get("id"), "name": job.get("name"), "status": status, "last_run_at": job.get("last_run_at")}) + recent.sort(key=lambda r: str(r.get("last_run_at") or ""), reverse=True) + return {"hours": hours, "runs": len(recent), "ok": ok, "error": error, "health_checks": health_checks, "recent": recent[:5]} + + +def collect_system_health(*, started_at: Any = None, start_monotonic: float | None = None) -> Dict[str, Any]: + try: + from hermes_cli import __version__ as version + except Exception: + version = "unknown" + + uptime_seconds = None + if start_monotonic is not None: + uptime_seconds = time.monotonic() - float(start_monotonic) + else: + dt = started_at + if isinstance(dt, (int, float)): + uptime_seconds = time.time() - float(dt) + elif isinstance(dt, datetime): + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + uptime_seconds = (datetime.now(timezone.utc) - dt).total_seconds() + + return { + "version": version, + "pid": os.getpid(), + "uptime_seconds": int(uptime_seconds) if uptime_seconds is not None else None, + "uptime": format_duration(uptime_seconds), + "cron": collect_cron_activity(hours=24), + } diff --git a/cli.py b/cli.py index c05c361a7..1f22257ee 100644 --- a/cli.py +++ b/cli.py @@ -5988,6 +5988,24 @@ class HermesCLI: self._console_print("\n".join(lines), highlight=False, markup=False) + def _handle_stats_command(self): + """Show comprehensive system stats — model, skills, curator, cron, uptime.""" + try: + from agent.stats_dashboard import format_stats_dashboard + except ImportError as exc: + self._console_print(f"stats module unavailable: {exc}", highlight=False, markup=False) + return + + agent = getattr(self, "agent", None) + uptime_start = getattr(self, "session_start", None) + dashboard = format_stats_dashboard( + agent=agent, + session_db=getattr(self, "_session_db", None), + session_id=self.session_id, + started_at=uptime_start, + ) + self._console_print(dashboard, highlight=False, markup=False) + def _fast_command_available(self) -> bool: try: from hermes_cli.models import model_supports_fast_mode @@ -8489,6 +8507,8 @@ class HermesCLI: self._handle_skills_command(cmd_original) elif canonical == "platforms": self._show_gateway_status() + elif canonical == "stats": + self._handle_stats_command() elif canonical == "status": self._show_session_status() elif canonical == "statusbar": diff --git a/gateway/run.py b/gateway/run.py index 7b5ace070..72d744247 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7470,6 +7470,9 @@ class GatewayRunner: if canonical == "status": return await self._handle_status_command(event) + if canonical == "stats": + return await self._handle_stats_command(event) + if canonical == "agents": return await self._handle_agents_command(event) @@ -9707,6 +9710,30 @@ class GatewayRunner: return "\n".join(lines) + async def _handle_stats_command(self, event: MessageEvent) -> str: + """Handle /stats command — comprehensive system telemetry dashboard.""" + try: + from agent.stats_dashboard import format_stats_dashboard + except ImportError as exc: + logger.debug("stats module unavailable: %s", exc) + return "stats module unavailable" + + source = event.source + session_entry = self.session_store.get_or_create_session(source) + session_key = self._session_key_for_source(source) + + # Try to get the running agent for this session + agent = self._running_agents.get(session_key) + started_at = self._running_agents_ts.get(session_key) + + dashboard = format_stats_dashboard( + agent=agent, + session_db=self._session_db, + session_id=session_entry.session_id, + started_at=started_at, + ) + return dashboard + async def _handle_agents_command(self, event: MessageEvent) -> str: """Handle /agents command - list active agents and running tasks.""" from tools.process_registry import format_uptime_short, process_registry diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index f58924862..649931dbc 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -107,6 +107,7 @@ COMMAND_REGISTRY: list[CommandDef] = [ CommandDef("subgoal", "Add or manage extra criteria on the active goal", "Session", args_hint="[text | remove N | clear]"), CommandDef("status", "Show session info", "Session"), + CommandDef("stats", "Show comprehensive system stats — model, skills, curator, cron, uptime", "Info"), CommandDef("whoami", "Show your slash command access (admin / user)", "Info"), CommandDef("profile", "Show active profile name and home directory", "Info"), CommandDef("sethome", "Set this chat as the home channel", "Session",