Compare commits

...

1 Commits

Author SHA1 Message Date
d77f01e31a feat: /stats command — comprehensive system dashboard
Add /stats slash command showing model config, context engine
(Semantic RLE status), skills usage (from curator telemetry),
curator state, cron jobs, 24h activity, and system memory.

Pure local compute — no LLM call, no prompt-cache impact.

- COMMAND_REGISTRY: stats (Info category)
- gateway/run.py: dispatch + _handle_stats_command handler
- tests/gateway/test_stats_command.py: 7 tests
2026-05-29 15:22:27 +00:00
3 changed files with 368 additions and 0 deletions

View File

@ -7535,6 +7535,9 @@ class GatewayRunner:
if canonical == "insights":
return await self._handle_insights_command(event)
if canonical == "stats":
return await self._handle_stats_command(event)
if canonical == "reload-mcp":
return await self._handle_reload_mcp_command(event)
@ -13234,6 +13237,166 @@ class GatewayRunner:
logger.error("Insights command error: %s", e, exc_info=True)
return t("gateway.insights.error", error=e)
async def _handle_stats_command(self, event: MessageEvent) -> str:
"""Handle /stats command — comprehensive system and model report."""
import json, time as _time, os as _os
from datetime import datetime, timezone, timedelta
source = event.source
session_entry = self.session_store.get_or_create_session(source)
lines: list[str] = []
_now = datetime.now(timezone.utc)
lines.append("📊 **Hermes Stats**")
lines.append("")
# ── Model ──
cfg = self._read_user_config()
model_cfg = cfg.get("model", {}) if isinstance(cfg, dict) else {}
main_model = model_cfg.get("default", "") if isinstance(model_cfg, dict) else str(model_cfg)
main_provider = model_cfg.get("provider", "") if isinstance(model_cfg, dict) else ""
fallback_chain = cfg.get("fallback_providers", []) if isinstance(cfg, dict) else []
fb_models = [f"{fb.get('model','?')} ({fb.get('provider','?')})" for fb in fallback_chain[:3]]
ctx_len = "?"
ctx_engine_name = "compressor"
try:
_cache_lock = getattr(self, "_agent_cache_lock", None)
if _cache_lock:
with _cache_lock:
_cached = self._agent_cache.get(session_entry.session_key)
if _cached:
_agent, _, _ts = _cached
_cc = getattr(_agent, "context_compressor", None)
if _cc:
ctx_len = f"{getattr(_cc, 'context_length', 0):,}"
_ce = getattr(_cc, "name", None) or type(_cc).__name__
ctx_engine_name = _ce.lower() if _ce != "ContextCompressor" else "compressor"
except Exception:
pass
lines.append("**🤖 Model**")
lines.append(f" Main: `{main_model}` ({main_provider})")
if fb_models:
lines.append(f" Fallback: {', '.join(fb_models)}")
lines.append(f" Context: {ctx_len} tokens | Engine: {ctx_engine_name}")
lines.append("")
# ── Context Engine (Semantic RLE) ──
_rle_dir = _os.path.expanduser("~/.hermes/hermes-agent/plugins/context_engine/semantic_rle")
lines.append("**🧠 Context Engine**")
if _os.path.isdir(_rle_dir):
lines.append(" Semantic RLE: installed ✓")
else:
lines.append(" Semantic RLE: not installed")
lines.append("")
# ── Skills Usage ──
_skills_usage_path = _os.path.expanduser("~/.hermes/skills/.usage.json")
skills_active = 0
skills_agent = 0
skills_top = []
if _os.path.isfile(_skills_usage_path):
try:
with open(_skills_usage_path) as f:
_su = json.load(f)
for _sn, _sd in _su.items():
if _sd.get("state") == "active":
skills_active += 1
if _sd.get("created_by") == "agent":
skills_agent += 1
_by_use = sorted(_su.items(), key=lambda x: x[1].get("use_count", 0), reverse=True)
skills_top = [(_sn, _sd.get("use_count", 0)) for _sn, _sd in _by_use[:5]]
except Exception:
pass
lines.append("**📚 Skills**")
lines.append(f" Active: {skills_active} | Agent-created: {skills_agent}")
if skills_top:
_top_str = ", ".join(f"`{n}` ({c})" for n, c in skills_top)
lines.append(f" Top: {_top_str}")
lines.append("")
# ── Curator ──
_curator_state_path = _os.path.expanduser("~/.hermes/skills/.curator_state.json")
curator_runs = 0
curator_last = "never"
curator_archived = 0
if _os.path.isfile(_curator_state_path):
try:
with open(_curator_state_path) as f:
_cs = json.load(f)
curator_runs = _cs.get("runs", 0)
_cls = _cs.get("last_run")
if _cls:
_dt = datetime.fromisoformat(_cls.replace("Z", "+00:00"))
_delta = _now - _dt
if _delta.days > 0:
curator_last = f"{_delta.days}d ago"
elif _delta.seconds > 3600:
curator_last = f"{_delta.seconds // 3600}h ago"
else:
curator_last = f"{_delta.seconds // 60}m ago"
curator_archived = _cs.get("archived_count", 0)
except Exception:
pass
lines.append("**🌱 Curator**")
lines.append(f" Runs: {curator_runs} | Last: {curator_last} | Archived: {curator_archived}")
lines.append("")
# ── Cron ──
try:
from cron.jobs import load_jobs
_jobs = load_jobs()
_active_jobs = [j for j in _jobs if j.get("paused") != True]
_job_names = [j.get("name", j.get("job_id", "?")[:8]) for j in _active_jobs[:5]]
lines.append("**⏰ Cron**")
if _job_names:
lines.append(f" Active: {len(_active_jobs)}{', '.join(_job_names)}")
else:
lines.append(" Active: 0")
lines.append("")
except Exception:
pass
# ── Recent Activity ──
try:
from hermes_state import SessionDB
_db = SessionDB()
_recent = _db.list_sessions(limit=5, offset=0)
_db.close()
if _recent:
_recent_24h = 0
_recent_msgs = 0
_cutoff = _now - timedelta(hours=24)
for _rs in _recent:
_t = _rs.get("updated_at") or _rs.get("created_at")
if _t:
_dt = datetime.fromisoformat(str(_t).replace("Z", "+00:00")) if isinstance(_t, str) else _t
if _dt > _cutoff:
_recent_24h += 1
_recent_msgs += _rs.get("message_count", 0)
lines.append("**⏱️ Activity (24h)**")
lines.append(f" Sessions: {_recent_24h} | Messages: {_recent_msgs}")
_latest = _recent[0] if _recent else {}
_latest_preview = (_latest.get("preview") or "")[:80]
if _latest_preview:
lines.append(f" Latest: _{_latest_preview}_")
lines.append("")
except Exception:
pass
# ── System ──
try:
import psutil
_proc = psutil.Process()
_mem_mb = _proc.memory_info().rss / 1024 / 1024
lines.append("**🔧 System**")
lines.append(f" Memory: {_mem_mb:.0f} MB")
except ImportError:
pass
except Exception:
pass
return "\n".join(lines)
async def _handle_reload_mcp_command(self, event: MessageEvent) -> Optional[str]:
"""Handle /reload-mcp — reconnect MCP servers and rebuild the cached agent.

View File

@ -98,6 +98,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
aliases=("bg", "btw"), args_hint="<prompt>"),
CommandDef("agents", "Show active agents and running tasks", "Session",
aliases=("tasks",)),
CommandDef("stats", "Show comprehensive system stats — model, skills, curator, cron, activity", "Info"),
CommandDef("queue", "Queue a prompt for the next turn (doesn't interrupt)", "Session",
aliases=("q",), args_hint="<prompt>"),
CommandDef("steer", "Inject a message after the next tool call without interrupting", "Session",

View File

@ -0,0 +1,204 @@
"""Tests for /stats command — comprehensive system report."""
from unittest.mock import MagicMock, AsyncMock, patch
import json, os, tempfile, time
from datetime import datetime, timezone, timedelta
import pytest
# ---- Helpers ----
def _mock_event(command="stats", args=""):
"""Build a minimal MessageEvent for slash-command tests."""
event = MagicMock()
event.source.platform.value = "telegram"
event.source.chat_id = "test-chat"
event.source.chat_type = "dm"
event.get_command.return_value = command
event.get_command_args.return_value = args
return event
def _make_runner():
"""Build a GatewayRunner with minimum wiring for _handle_stats_command."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
# Session store mocks
runner.session_store = MagicMock()
session_entry = MagicMock()
session_entry.session_key = "agent:main:telegram:dm:test-chat"
session_entry.session_id = "20260529_test"
runner.session_store.get_or_create_session.return_value = session_entry
# Config mock
runner._read_user_config = MagicMock(return_value={
"model": {"default": "deepseek-v4-pro", "provider": "opencode_go"},
"fallback_providers": [
{"provider": "opencode_go", "model": "deepseek-v4-pro"},
],
})
# Agent cache
runner._agent_cache = {}
runner._agent_cache_lock = MagicMock()
runner._agent_cache_lock.__enter__ = lambda s: None
runner._agent_cache_lock.__exit__ = lambda s, *a: None
return runner
# ---- Tests ----
class TestStatsCommand:
"""Unit tests for _handle_stats_command."""
@pytest.mark.asyncio
async def test_output_has_expected_sections(self):
"""Output contains all major dashboard sections."""
runner = _make_runner()
event = _mock_event()
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = [
{"session_id": "s1", "updated_at": datetime.now(timezone.utc).isoformat(), "message_count": 5}
]
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
# All sections should be present for a working config
assert "Hermes Stats" in result
assert "Model" in result
assert "deepseek-v4-pro" in result
assert "opencode_go" in result
assert "Context Engine" in result
assert "Skills" in result
assert "Curator" in result
assert "Cron" in result
assert "Activity" in result
@pytest.mark.asyncio
async def test_model_section_shows_fallback(self):
"""Fallback chain is rendered when configured."""
runner = _make_runner()
runner._read_user_config.return_value["fallback_providers"] = [
{"provider": "custom", "model": "gemma-local"},
]
event = _mock_event()
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "gemma-local" in result
assert "custom" in result
@pytest.mark.asyncio
async def test_no_fallback_shown_when_empty(self):
"""No fallback line when chain is empty."""
runner = _make_runner()
runner._read_user_config.return_value["fallback_providers"] = []
event = _mock_event()
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "Fallback" not in result
@pytest.mark.asyncio
async def test_curator_section_no_state_file(self):
"""Curator section shows defaults when no state file exists."""
runner = _make_runner()
event = _mock_event()
with (
patch("hermes_state.SessionDB") as mock_db_cls,
patch("os.path.isfile", return_value=False),
):
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "Runs: 0" in result
assert "Archived: 0" in result
@pytest.mark.asyncio
async def test_activity_section_with_recent_sessions(self):
"""Activity section shows 24h stats from session DB."""
runner = _make_runner()
event = _mock_event()
now = datetime.now(timezone.utc)
recent = [
{
"session_id": "s1", "title": "Debug session",
"updated_at": now.isoformat(),
"message_count": 42,
"preview": "Fixed the bug",
},
{
"session_id": "s2",
"updated_at": (now - timedelta(hours=48)).isoformat(),
"message_count": 10,
},
]
with patch("hermes_state.SessionDB") as mock_db_cls:
mock_db = MagicMock()
mock_db.list_sessions.return_value = recent
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
# First session is within 24h
assert "Sessions: 1" in result
assert "Messages: 42" in result
@pytest.mark.asyncio
async def test_cron_section_handles_import_error(self):
"""Cron section is skipped gracefully when cron module unavailable."""
runner = _make_runner()
event = _mock_event()
with (
patch("hermes_state.SessionDB") as mock_db_cls,
patch("cron.jobs.load_jobs", side_effect=ImportError("no cron")),
):
mock_db = MagicMock()
mock_db.list_sessions.return_value = []
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert isinstance(result, str)
assert "Hermes Stats" in result
# Cron section should be absent (not crash)
assert "Cron" not in result
@pytest.mark.asyncio
async def test_rle_section_not_installed(self):
"""RLE section shows 'not installed' when plugin dir missing."""
runner = _make_runner()
event = _mock_event()
with (
patch("hermes_state.SessionDB") as mock_db_cls,
patch("os.path.isdir", return_value=False),
):
mock_db = MagicMock()
mock_db.list_sessions.return_value = [
{"session_id": "s1", "updated_at": datetime.now(timezone.utc).isoformat(), "message_count": 0}
]
mock_db_cls.return_value = mock_db
result = await runner._handle_stats_command(event)
assert "not installed" in result