hermes-agent-features/agent/session_stats.py
Anton Palgunov f936968c5c feat: telemetry dashboard /stats command with CLI + gateway handlers
- agent/session_stats.py: SessionDB context/compression metrics
- agent/skill_stats.py: curator usage.json reader + prune history
- agent/system_health.py: gateway uptime, version, cron activity
- agent/stats_dashboard.py: Telegram-friendly bullet renderer
- cli.py: /stats dispatch + _handle_stats_command method
- gateway/run.py: /stats dispatch + _handle_stats_command for messaging platforms
- hermes_cli/commands.py: /stats CommandDef registration
2026-05-29 16:31:33 +00:00

130 lines
4.8 KiB
Python

"""Session telemetry collectors for the /stats dashboard."""
from __future__ import annotations
import logging
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_TOKEN_FIELDS = (
"input_tokens",
"output_tokens",
"cache_read_tokens",
"cache_write_tokens",
"reasoning_tokens",
)
def _coerce_int(value: Any, default: int = 0) -> int:
try:
return int(value or 0)
except (TypeError, ValueError):
return default
def _sum_tokens(row: Any) -> int:
if not row:
return 0
total = 0
for field in _TOKEN_FIELDS:
try:
value = row.get(field) if hasattr(row, "get") else row[field]
except Exception:
value = 0
total += _coerce_int(value)
return total
def _query_one(session_db: Any, sql: str, params: tuple = ()) -> Optional[dict]:
conn = getattr(session_db, "_conn", None)
if conn is None:
return None
cur = conn.execute(sql, params)
row = cur.fetchone()
return dict(row) if row is not None else None
def collect_context_stats(*, agent: Any = None, session_db: Any = None, session_id: str | None = None) -> Dict[str, Any]:
"""Return current model/provider/context telemetry from live agent + SessionDB."""
model = getattr(agent, "model", None) or "unknown"
provider = getattr(agent, "provider", None) or "unknown"
context_length = _coerce_int(getattr(getattr(agent, "context_compressor", None), "context_length", 0))
threshold_tokens = _coerce_int(getattr(getattr(agent, "context_compressor", None), "threshold_tokens", 0))
total_tokens = _coerce_int(getattr(agent, "session_total_tokens", 0))
if session_db is not None and session_id and total_tokens <= 0:
try:
row = session_db.get_session(session_id)
total_tokens = _sum_tokens(row)
if row and (model == "unknown"):
model = row.get("model") or model
except Exception as exc:
logger.debug("Failed to read current session token totals: %s", exc, exc_info=True)
usage_percent = (total_tokens / context_length * 100.0) if context_length else None
fallback_chain = []
for entry in getattr(agent, "_fallback_chain", []) or []:
if isinstance(entry, dict):
fb_provider = str(entry.get("provider") or "").strip()
fb_model = str(entry.get("model") or "").strip()
if fb_provider or fb_model:
fallback_chain.append({"provider": fb_provider, "model": fb_model})
return {
"model": model,
"provider": provider,
"context_length": context_length,
"threshold_tokens": threshold_tokens,
"total_tokens": total_tokens,
"usage_percent": usage_percent,
"fallback_chain": fallback_chain,
}
def collect_semantic_rle_stats(session_db: Any = None) -> Dict[str, Any]:
"""Approximate compression/RLE savings from real SessionDB compression chains.
Hermes persists context-compression continuations as sessions whose parent
ended with ``end_reason='compression'``. We derive counts and token deltas
from those persisted parent/child rows instead of inventing counters.
"""
if session_db is None or getattr(session_db, "_conn", None) is None:
return {"sessions_compressed": 0, "compression_ratio": None, "avg_tokens_saved": 0, "source": "SessionDB unavailable"}
try:
row = _query_one(
session_db,
"""
SELECT COUNT(*) AS n,
COALESCE(SUM(input_tokens + output_tokens + cache_read_tokens + cache_write_tokens + reasoning_tokens), 0) AS parent_tokens
FROM sessions
WHERE end_reason = 'compression'
""",
) or {}
compressed = _coerce_int(row.get("n"))
parent_tokens = _coerce_int(row.get("parent_tokens"))
child = _query_one(
session_db,
"""
SELECT COALESCE(SUM(c.input_tokens + c.output_tokens + c.cache_read_tokens + c.cache_write_tokens + c.reasoning_tokens), 0) AS child_tokens
FROM sessions p
JOIN sessions c ON c.parent_session_id = p.id
WHERE p.end_reason = 'compression'
""",
) or {}
child_tokens = _coerce_int(child.get("child_tokens"))
except Exception as exc:
logger.debug("Failed to collect compression stats: %s", exc, exc_info=True)
return {"sessions_compressed": 0, "compression_ratio": None, "avg_tokens_saved": 0, "source": "SessionDB query failed"}
saved = max(parent_tokens - child_tokens, 0)
ratio = (child_tokens / parent_tokens) if parent_tokens else None
return {
"sessions_compressed": compressed,
"compression_ratio": ratio,
"avg_tokens_saved": int(saved / compressed) if compressed else 0,
"source": "SessionDB compression chains",
}