From f95f2daa4bdbd5e1c5f51bae2f2a4e7bb836031e Mon Sep 17 00:00:00 2001 From: Anton Palgunov Date: Fri, 29 May 2026 15:40:24 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20session=20improvements=20=E2=80=94=20cr?= =?UTF-8?q?ash-context,=20compression=20fallback,=20semantic=20RLE,=20tele?= =?UTF-8?q?gram=20voice/test=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/agent_runtime_helpers.py | 4 + agent/codex_responses_adapter.py | 12 +- agent/codex_runtime.py | 10 +- agent/context_compressor.py | 21 ++ agent/conversation_compression.py | 98 ++++- agent/conversation_loop.py | 38 +- agent/transports/codex.py | 48 ++- .../semantic-rle-context-engine.md | 156 ++++++++ gateway/platforms/telegram.py | 164 +++++++- gateway/run.py | 349 +++++++++++++++++- gateway/session.py | 7 + hermes_cli/commands.py | 1 + .../context_engine/semantic_rle/__init__.py | 300 +++++++++++++++ .../context_engine/semantic_rle/plugin.yaml | 4 + run_agent.py | 10 +- scripts/semantic_rle_eval.py | 171 +++++++++ .../gateway/test_shutdown_pending_followup.py | 32 ++ tests/gateway/test_telegram_audio_vs_voice.py | 71 ++++ tests/gateway/test_telegram_reactions.py | 77 ++++ tests/gateway/test_telegram_voice_ack.py | 111 ++++++ .../test_semantic_rle_context_engine.py | 102 +++++ tests/scripts/test_semantic_rle_eval.py | 39 ++ 22 files changed, 1791 insertions(+), 34 deletions(-) create mode 100644 docs/experiments/semantic-rle-context-engine.md create mode 100644 plugins/context_engine/semantic_rle/__init__.py create mode 100644 plugins/context_engine/semantic_rle/plugin.yaml create mode 100644 scripts/semantic_rle_eval.py create mode 100644 tests/gateway/test_shutdown_pending_followup.py create mode 100644 tests/gateway/test_telegram_voice_ack.py create mode 100644 tests/plugins/test_semantic_rle_context_engine.py create mode 100644 tests/scripts/test_semantic_rle_eval.py diff --git a/agent/agent_runtime_helpers.py b/agent/agent_runtime_helpers.py index f0fbd0aa8..d98b8f3eb 100644 --- a/agent/agent_runtime_helpers.py +++ b/agent/agent_runtime_helpers.py @@ -943,6 +943,10 @@ def restore_primary_runtime(agent) -> bool: agent._fallback_activated = False agent._fallback_index = 0 + agent._emit_status( + f"✅ Primary model recovered — switching back: " + f"{agent.model} via {agent.provider}" + ) logger.info( "Primary runtime restored for new turn: %s (%s)", agent.model, agent.provider, diff --git a/agent/codex_responses_adapter.py b/agent/codex_responses_adapter.py index 07ae5cc95..234ff8698 100644 --- a/agent/codex_responses_adapter.py +++ b/agent/codex_responses_adapter.py @@ -846,6 +846,8 @@ def _extract_responses_message_text(item: Any) -> str: chunks: List[str] = [] for part in content: + if part is None: + continue ptype = getattr(part, "type", None) if ptype not in {"output_text", "text"}: continue @@ -861,6 +863,8 @@ def _extract_responses_reasoning_text(item: Any) -> str: if isinstance(summary, list): chunks: List[str] = [] for part in summary: + if part is None: + continue text = getattr(part, "text", None) if isinstance(text, str) and text: chunks.append(text) @@ -920,7 +924,13 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]: saw_commentary_phase = False saw_final_answer_phase = False - for item in output: + for idx, item in enumerate(output): + if item is None: + logger.warning( + "Codex response output contains None item (item #%d of %d); skipping.", + idx, len(output), + ) + continue item_type = getattr(item, "type", None) item_status = getattr(item, "status", None) if isinstance(item_status, str): diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 8c5dff39b..29e48cfe9 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -335,7 +335,15 @@ def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta ) return agent._run_codex_create_stream_fallback(api_kwargs, client=active_client) raise - + except Exception as exc: + # Catch-all for unexpected errors during stream processing + # (e.g. TypeError when a response field is None that we try to iterate). + # Log with full traceback so we can diagnose, then re-raise. + logger.exception( + "Codex Responses stream raised unexpected %s: %s. %s", + type(exc).__name__, exc, agent._client_log_context(), + ) + raise def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None): diff --git a/agent/context_compressor.py b/agent/context_compressor.py index 49907e2c3..ba5fcf596 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -580,6 +580,13 @@ class ContextCompressor(ContextEngine): self.summary_model = summary_model_override or "" + # Compression-model fallback: set by check_compression_model_feasibility + # when the primary aux compression model fails the minimum context check. + # If set, _generate_summary uses this provider/model for the LLM call + # instead of the main compressor attributes. Dict keys: + # provider, model, base_url, api_key + self._compression_fallback: Optional[Dict[str, str]] = None + # Stores the previous compaction summary for iterative updates self._previous_summary: Optional[str] = None # Anti-thrashing: track whether last compression was effective @@ -1069,6 +1076,20 @@ The user has requested that this compaction PRIORITISE preserving all informatio } if self.summary_model: call_kwargs["model"] = self.summary_model + # Compression-model fallback: when the primary aux compression + # model was rejected for insufficient context, the feasibility + # check stored a replacement provider/model here. Override the + # entire main_runtime so call_llm routes the summary request to + # the fallback provider instead of the main one. + if self._compression_fallback: + _fb = self._compression_fallback + call_kwargs["main_runtime"] = { + "model": _fb["model"], + "provider": _fb["provider"], + "base_url": _fb.get("base_url", ""), + "api_key": _fb.get("api_key", ""), + "api_mode": _fb.get("api_mode", self.api_mode), + } response = call_llm(**call_kwargs) content = response.choices[0].message.content # Handle cases where content is not a string (e.g., dict from llama.cpp) diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index a620f343e..5e0099c4f 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -221,9 +221,101 @@ def check_compression_model_feasibility(agent: Any) -> None: new_threshold, ) except ValueError: - # Hard rejections (aux below minimum context) must propagate - # so the session refuses to start. - raise + # Primary compression model failed the minimum context check + # (context_length < MINIMUM_CONTEXT_LENGTH). Before giving up, + # try the user's fallback provider chain so a model switch or + # provider outage doesn't silently disable compression. + _fallback_chain = getattr(agent, '_fallback_chain', None) or [] + _tried = [f"{aux_model} ({_aux_cfg_provider or 'auto'}): {aux_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}"] + + for _fb_entry in _fallback_chain: + _fb_provider = _fb_entry.get("provider", "") + _fb_model = _fb_entry.get("model", "") + if not _fb_provider or not _fb_model: + continue + + try: + from agent.auxiliary_client import resolve_provider_client + + _fb_client, _fb_resolved_model = resolve_provider_client( + _fb_provider, + _fb_model, + explicit_base_url=_fb_entry.get("base_url", ""), + explicit_api_key=_fb_entry.get("api_key", ""), + main_runtime=agent._current_main_runtime(), + ) + if _fb_client is None or not _fb_resolved_model: + _tried.append(f"{_fb_model} ({_fb_provider}): unavailable") + continue + + _fb_base_url = str(getattr(_fb_client, "base_url", "")) + _fb_api_key_raw = getattr(_fb_client, "api_key", "") + _fb_api_key = ( + "" + if callable(_fb_api_key_raw) and not isinstance(_fb_api_key_raw, str) + else str(_fb_api_key_raw or "") + ) + + _fb_context = get_model_context_length( + _fb_resolved_model, + base_url=_fb_base_url, + api_key=_fb_api_key, + provider=_fb_provider, + custom_providers=getattr(agent, "_custom_providers", None), + ) + + if _fb_context and _fb_context < MINIMUM_CONTEXT_LENGTH: + _tried.append( + f"{_fb_resolved_model} ({_fb_provider}): " + f"{_fb_context:,} ctx < {MINIMUM_CONTEXT_LENGTH:,}" + ) + continue + + # ── Found a suitable fallback ────────────────────────── + logger.warning( + "Compression model %s (%s) has only %d token context " + "(minimum %d). Falling back to %s (%s) with %d token context.", + aux_model, _aux_cfg_provider or "auto", aux_context, + MINIMUM_CONTEXT_LENGTH, _fb_resolved_model, _fb_provider, + _fb_context or 0, + ) + + agent.context_compressor._compression_fallback = { + "provider": _fb_provider, + "model": _fb_resolved_model, + "base_url": _fb_base_url, + "api_key": _fb_api_key, + } + + _msg = ( + f"⚠ Compression model {aux_model} has only " + f"{aux_context:,} token context (minimum " + f"{MINIMUM_CONTEXT_LENGTH:,} required). " + f"Falling back to {_fb_resolved_model} ({_fb_provider}) " + f"for summaries." + ) + agent._compression_warning = _msg + agent._emit_status(_msg) + return + + except Exception as _fb_err: + _tried.append(f"{_fb_model} ({_fb_provider}): {_fb_err}") + continue + + # No fallback worked — warn and let compression run without + # summaries (same behavior as 'no auxiliary LLM' above). + _all_tried = "; ".join(_tried) + _msg = ( + f"⚠ No suitable compression model available. " + f"Tried: {_all_tried}. " + f"Compression will drop middle turns without summaries. " + f"Run `hermes setup` or set " + f"auxiliary.compression.model in config.yaml." + ) + agent._compression_warning = _msg + agent._emit_status(_msg) + logger.warning("Compression model fallback exhausted: %s", _all_tried) + return except Exception as exc: logger.debug( "Compression feasibility check failed (non-fatal): %s", exc diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index abcb342d0..6de58412f 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -1814,6 +1814,14 @@ def run_conversation( break except Exception as api_error: + # Log full traceback immediately so we can diagnose what broke. + # Many recovery paths below can swallow or transform the error; + # this guarantees we always have the original stack trace. + logger.exception( + "%sAPI call raised exception (attempt %s/%s, thread=%s): %s", + agent.log_prefix, retry_count + 1, max_retries, + agent._thread_identity(), api_error, + ) # Stop spinner before printing error messages if thinking_spinner: thinking_spinner.stop("(╥_╥) error, retrying...") @@ -2822,6 +2830,10 @@ def run_conversation( # provider/network failure (malformed response body, # truncated stream, routing layer corruption), not a # local programming bug, and should be retried (#14782). + # Exclude TypeError when it originates from SDK response + # processing (e.g. 'NoneType' object is not iterable when + # a server sends output: null) — that's a transient API/SDK + # bug, not a local programming error. (#14950) is_local_validation_error = ( isinstance(api_error, (ValueError, TypeError)) and not isinstance( @@ -2835,6 +2847,14 @@ def run_conversation( # ssl.SSLError explicitly so the error classifier's # retryable=True mapping takes effect instead. and not isinstance(api_error, ssl.SSLError) + # TypeError from the SDK is a transient bug in API + # response processing (e.g. NoneType not iterable when + # server returns output: null). Do NOT treat it as a + # local programming error — it should be retried. + and not ( + isinstance(api_error, TypeError) + and not isinstance(api_error, (UnicodeEncodeError, ssl.SSLError)) + ) ) # ``FailoverReason.billing`` (HTTP 402) is NOT in this # exclusion set. By the time we reach this block: @@ -2887,6 +2907,22 @@ def run_conversation( agent._vprint(f"{agent.log_prefix}❌ Non-retryable client error (HTTP {status_code}). Aborting.", force=True) agent._vprint(f"{agent.log_prefix} 🔌 Provider: {_provider} Model: {_model}", force=True) agent._vprint(f"{agent.log_prefix} 🌐 Endpoint: {_base}", force=True) + # Surface local validation bugs (TypeError, ValueError) with a + # traceback pointer so the user can find the root cause in logs. + if isinstance(api_error, (TypeError, ValueError)) and not isinstance( + api_error, (UnicodeEncodeError, json.JSONDecodeError, ssl.SSLError) + ): + _log_dir = os.path.join(os.path.expanduser("~"), ".hermes", "logs") + agent._vprint( + f"{agent.log_prefix} 🐛 This is a local code error (not a provider failure). " + f"Full traceback: {_log_dir}/errors.log", + force=True, + ) + # Also emit a status so gateway/cron users see it + agent._emit_status( + f"🐛 Local bug: {agent._summarize_api_error(api_error)} " + f"— check {_log_dir}/errors.log" + ) # Actionable guidance for common auth errors if classified.is_auth or classified.reason == FailoverReason.billing: if _provider in {"openai-codex", "xai-oauth", "nous"} and status_code == 401: @@ -2917,7 +2953,7 @@ def run_conversation( agent._vprint(f"{agent.log_prefix} • Check credits: https://openrouter.ai/settings/credits", force=True) else: agent._vprint(f"{agent.log_prefix} 💡 This type of error won't be fixed by retrying.", force=True) - logger.error(f"{agent.log_prefix}Non-retryable client error: {api_error}") + logger.error(f"{agent.log_prefix}Non-retryable client error: {api_error}", exc_info=True) # Skip session persistence when the error is likely # context-overflow related (status 400 + large session). # Persisting the failed user message would make the diff --git a/agent/transports/codex.py b/agent/transports/codex.py index 970692c03..f7ad77b3d 100644 --- a/agent/transports/codex.py +++ b/agent/transports/codex.py @@ -213,24 +213,44 @@ class ResponsesApiTransport(ProviderTransport): _normalize_codex_response, ) + import logging + _logger = logging.getLogger(__name__) + # _normalize_codex_response returns (SimpleNamespace, finish_reason_str) - msg, finish_reason = _normalize_codex_response(response) + try: + msg, finish_reason = _normalize_codex_response(response) + except Exception: + _logger.exception( + "Failed to normalize Codex response (response type=%s, has_output=%s)", + type(response).__name__, + hasattr(response, "output"), + ) + raise tool_calls = None if msg and msg.tool_calls: - tool_calls = [] - for tc in msg.tool_calls: - provider_data = {} - if hasattr(tc, "call_id") and tc.call_id: - provider_data["call_id"] = tc.call_id - if hasattr(tc, "response_item_id") and tc.response_item_id: - provider_data["response_item_id"] = tc.response_item_id - tool_calls.append(ToolCall( - id=tc.id if hasattr(tc, "id") else (tc.function.name if hasattr(tc, "function") else None), - name=tc.function.name if hasattr(tc, "function") else getattr(tc, "name", ""), - arguments=tc.function.arguments if hasattr(tc, "function") else getattr(tc, "arguments", "{}"), - provider_data=provider_data or None, - )) + _raw_tc = msg.tool_calls + if not hasattr(_raw_tc, "__iter__"): + # Defensive: tool_calls should be a list; log and skip if not iterable. + _logger.warning( + "Codex response tool_calls is not iterable (type=%s, value=%r); " + "skipping tool call extraction.", + type(_raw_tc).__name__, _raw_tc, + ) + else: + tool_calls = [] + for tc in _raw_tc: + provider_data = {} + if hasattr(tc, "call_id") and tc.call_id: + provider_data["call_id"] = tc.call_id + if hasattr(tc, "response_item_id") and tc.response_item_id: + provider_data["response_item_id"] = tc.response_item_id + tool_calls.append(ToolCall( + id=tc.id if hasattr(tc, "id") else (tc.function.name if hasattr(tc, "function") else None), + name=tc.function.name if hasattr(tc, "function") else getattr(tc, "name", ""), + arguments=tc.function.arguments if hasattr(tc, "function") else getattr(tc, "arguments", "{}"), + provider_data=provider_data or None, + )) # Extract reasoning items for provider_data provider_data = {} diff --git a/docs/experiments/semantic-rle-context-engine.md b/docs/experiments/semantic-rle-context-engine.md new file mode 100644 index 000000000..f17b586aa --- /dev/null +++ b/docs/experiments/semantic-rle-context-engine.md @@ -0,0 +1,156 @@ +# Semantic RLE Context Engine experiment + +Status: experimental MVP, not enabled by default. + +## Hypothesis + +For very long Telegram chats, a deterministic context engine with: + +- a verbatim hot tail; +- a semantic factual ledger for older turns; +- explicit stale/superseded facts; +- credential references instead of raw secrets; +- retrieval notes for manual compression focus; + +will reduce "misses" versus the current lossy compression path, especially when users correct facts over time ("server X" -> "server Y") or return to obligations created many turns ago. + +## MVP design + +Plugin: `plugins/context_engine/semantic_rle/`. + +Compression output shape: + +1. original system messages; +2. `system` summary block named `Semantic RLE context ledger`; +3. last `hot_tail_messages` non-system messages preserved verbatim. + +The ledger is deterministic and local-only. No cloud LLM is called by the plugin. + +Ledger sections: + +- Active facts +- Decisions +- Obligations +- Superseded facts +- Unresolved questions +- Credential refs +- Retrieval notes + +Best-effort redaction: + +- key/value secrets (`api_key=...`, `token: ...`, `Authorization: ...`) become stable `credential_ref:credential:` references; +- token-like long strings become credential refs; +- IPv4-like strings become `[REDACTED_IP]`. + +## What counts as a miss + +A run has a miss when the assistant, after compaction, does any of these: + +1. uses an inactive/superseded fact as current; +2. loses a still-active obligation or TODO; +3. says it does not know a fact that was in the compacted cold history; +4. exposes raw fake credentials or sensitive IP-like strings from cold history; +5. answers from an older decision when a newer decision superseded it; +6. needs user correction for information that the ledger retained. + +## Baseline + +Baseline should be the same Telegram session corpus and prompt set with: + +```yaml +context: + engine: compressor +``` + +Record: + +- number of manual corrections per 100 turns; +- number of stale-fact answers per scenario; +- number of obligation/TODO misses; +- raw secret leakage count in model-visible compressed context; +- compressed message count and rough token count; +- whether final hot-tail messages remain byte-for-byte unchanged. + +## First A/B plan + +1. Select 5-10 long Telegram transcripts with known corrections and recurring tasks. +2. Replay fixed query checkpoints against baseline `compressor` and experimental `semantic_rle`. +3. Keep model/provider/toolsets constant. +4. For each checkpoint, force compression before asking the evaluation query. +5. Score blind if possible: correct/current, stale, missing, unsafe leak, or ambiguous. + +## Test scenarios + +- Hot tail preservation: last N messages must be unchanged. +- Server supersession: `server alpha` followed by `server beta` should keep beta active and mark alpha superseded. +- Fake token redaction: no raw fake token appears in compressed output; only credential refs. +- IP-like redaction: raw IPv4-like strings do not appear in the ledger. +- Obligations: old `todo`/`надо` messages survive as obligations. +- Unresolved questions: old question markers survive as unresolved questions. +- Deterministic failure: if extraction raises, return original messages rather than dropping context. +- Discovery/config: plugin can be discovered and explicitly loaded, but is not globally enabled by adding it to the repo. + +## Manual enablement for experiment only + +Do not turn it on globally in commits. For a local experiment: + +```bash +hermes config set context.engine semantic_rle +# restart the CLI session or gateway process you intentionally want to test +``` + +Return to default: + +```bash +hermes config set context.engine compressor +``` + +Gateway note: do not restart production gateway just because the plugin exists. Restart only a deliberately selected experiment profile/session. + +## Deterministic smoke-eval + +Run: + +```bash +python scripts/semantic_rle_eval.py +python scripts/semantic_rle_eval.py --json +``` + +Current invariant results on the checked-in synthetic corpus: + +- `hot_tail_only_baseline`: 4/12 checks passed. +- `semantic_rle`: 12/12 checks passed. + +Covered invariants: + +- current fact retained after supersession; +- old fact marked superseded; +- old decision retained; +- old obligation retained; +- old unresolved question retained; +- cold fake token/IP redacted to refs/markers; +- hot tail preserved byte-for-byte. + +This is only a deterministic preflight. It proves the plugin keeps the facts in model-visible context; it does **not** prove the LLM will use them correctly in live Telegram replay. + +## Logical MVP boundary + +Done for MVP: + +- plugin discovery and explicit loading; +- ContextEngine ABC compatibility; +- local-only deterministic compression path; +- fail-closed compression error handling; +- hot tail preservation; +- factual ledger with active/superseded facts; +- decisions/obligations/questions sections; +- cold-history fake secret and IPv4 redaction; +- smoke-eval harness and tests. + +Still intentionally out of MVP: + +- Better fact keys: current MVP is regex-based and intentionally conservative. +- Persistence: ledger is in-memory only; no per-chat store yet. +- Retrieval tools: no `semantic_rle_search` tool yet. +- Multilingual extraction: Russian TODO/question markers are minimal. +- Live Telegram replay/scoring against real transcripts. diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 300fc49c0..457544cac 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -30,6 +30,7 @@ try: Application, CommandHandler, CallbackQueryHandler, + MessageReactionHandler, MessageHandler as TelegramMessageHandler, ContextTypes, filters, @@ -48,6 +49,7 @@ except ImportError: Application = Any CommandHandler = Any CallbackQueryHandler = Any + MessageReactionHandler = Any TelegramMessageHandler = Any HTTPXRequest = Any filters = None @@ -118,7 +120,7 @@ def check_telegram_requirements() -> bool: """ global TELEGRAM_AVAILABLE, Update, Bot, Message, InlineKeyboardButton global InlineKeyboardMarkup, LinkPreviewOptions, Application - global CommandHandler, CallbackQueryHandler, TelegramMessageHandler + global CommandHandler, CallbackQueryHandler, MessageReactionHandler, TelegramMessageHandler global ContextTypes, filters, ParseMode, ChatType, HTTPXRequest if TELEGRAM_AVAILABLE: return True @@ -137,6 +139,7 @@ def check_telegram_requirements() -> bool: from telegram.ext import ( Application as _App, CommandHandler as _CH, CallbackQueryHandler as _CQH, + MessageReactionHandler as _MRH, MessageHandler as _MH, ContextTypes as _CT, filters as _filters, ) @@ -153,6 +156,7 @@ def check_telegram_requirements() -> bool: Application = _App CommandHandler = _CH CallbackQueryHandler = _CQH + MessageReactionHandler = _MRH TelegramMessageHandler = _MH ContextTypes = _CT filters = _filters @@ -1581,6 +1585,14 @@ class TelegramAdapter(BasePlatformAdapter): )) # Handle inline keyboard button callbacks (update prompts) self._app.add_handler(CallbackQueryHandler(self._handle_callback_query)) + + # Handle user reactions to bot/news messages. The existing + # telegram.reactions flag controls the bot's own processing + # lifecycle reactions (👀/👍/👎); this handler is for inbound + # Telegram ``message_reaction`` updates so reaction-only feedback + # can reach the agent/session. + if MessageReactionHandler is not Any: + self._app.add_handler(MessageReactionHandler(self._handle_message_reaction)) # Start polling — retry initialize() for transient TLS resets try: @@ -2073,6 +2085,46 @@ class TelegramAdapter(BasePlatformAdapter): is_connect_timeout = self._looks_like_connect_timeout(e) return SendResult(success=False, error=str(e), retryable=(is_connect_timeout or not is_timeout)) + def _thread_metadata_for_voice_ack(self, event: MessageEvent) -> Optional[Dict[str, Any]]: + """Build thread metadata for a platform-side pre-STT voice ACK. + + The ACK is sent before gateway STT calls run, so it must stay entirely + platform-side and must not become an assistant/model turn. Reuse the + same Telegram thread/DM-topic metadata shape the runner uses for normal + replies so the short status lands in the same chat/topic. + """ + source = event.source + thread_id = getattr(source, "thread_id", None) + if thread_id is None: + return None + metadata: Dict[str, Any] = {"thread_id": thread_id} + if getattr(source, "platform", None) == Platform.TELEGRAM and getattr(source, "chat_type", None) == "dm": + metadata["telegram_dm_topic_reply_fallback"] = True + tid = str(thread_id) + if tid and tid not in {"", "1"}: + metadata["direct_messages_topic_id"] = tid + anchor = getattr(source, "message_id", None) or event.message_id + if anchor is not None: + metadata["telegram_reply_to_message_id"] = str(anchor) + return metadata + + async def _send_voice_pre_stt_ack(self, event: MessageEvent) -> None: + """Send a best-effort Telegram status before slow voice transcription.""" + try: + result = await self.send( + event.source.chat_id, + "🎧 получил, распознаю", + reply_to=event.message_id, + metadata=self._thread_metadata_for_voice_ack(event), + ) + if not result.success: + logger.warning( + "[Telegram] Pre-STT voice ACK was not delivered: %s", + result.error or "unknown error", + ) + except Exception: + logger.exception("[Telegram] Failed to send pre-STT voice ACK; continuing transcription") + async def send_or_update_status( self, chat_id: str, @@ -4947,6 +4999,113 @@ class TelegramAdapter(BasePlatformAdapter): """ return getattr(update, "effective_message", None) or getattr(update, "message", None) + async def _handle_message_reaction(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle Telegram user reactions as lightweight synthetic messages.""" + reaction_update = getattr(update, "message_reaction", None) + if reaction_update is None: + return + + user = getattr(reaction_update, "user", None) + user_id = str(getattr(user, "id", "") or "") + bot_id = str(getattr(getattr(self, "_bot", None), "id", "") or "") + if user_id and bot_id and user_id == bot_id: + return + + chat = getattr(reaction_update, "chat", None) + chat_id = str(getattr(chat, "id", "") or "") + if not chat_id or not user_id: + return + + chat_type_raw = getattr(chat, "type", None) + chat_type_value = str(getattr(chat_type_raw, "value", chat_type_raw) or "dm").lower() + chat_type = "dm" if chat_type_value == "private" else ( + "group" if chat_type_value in {"group", "supergroup"} else chat_type_value + ) + thread_id = getattr(reaction_update, "message_thread_id", None) + user_name = getattr(user, "first_name", None) or getattr(user, "username", None) + + if not self._is_callback_user_authorized( + user_id, + chat_id=chat_id, + chat_type=chat_type, + thread_id=str(thread_id) if thread_id is not None else None, + user_name=user_name, + ): + logger.info("[%s] Ignoring Telegram reaction from unauthorized user %s", self.name, user_id) + return + + def _reaction_label(obj: Any) -> str: + emoji = getattr(obj, "emoji", None) + if emoji: + return str(emoji) + custom = getattr(obj, "custom_emoji_id", None) + if custom: + return f"custom_emoji:{custom}" + return str(getattr(obj, "type", obj)) + + new_reactions = list(getattr(reaction_update, "new_reaction", None) or []) + old_reactions = list(getattr(reaction_update, "old_reaction", None) or []) + new_labels = [_reaction_label(r) for r in new_reactions] + old_labels = [_reaction_label(r) for r in old_reactions] + target_message_id = getattr(reaction_update, "message_id", None) + + if not new_labels: + logger.info( + "[%s] Telegram reaction removed: chat=%s user=%s message=%s previous=%s", + self.name, + chat_id, + user_id, + target_message_id, + " ".join(old_labels) if old_labels else "", + ) + return + + positive_labels = {"👍", "🔥", "👌", "❤️", "❤", "😍", "🥰", "👏", "🎉", "🤩"} + negative_labels = {"👎", "💩", "🤮", "😡", "😠", "🙄", "😴", "🤢"} + label_set = set(new_labels) + if label_set & negative_labels: + feedback = "negative feedback / мимо" + elif label_set & positive_labels: + feedback = "positive feedback / годно" + else: + feedback = "emoji feedback" + + text = ( + f"[Telegram reaction feedback: {' '.join(new_labels)} on message {target_message_id}. " + f"Interpretation: {feedback}. " + f"If the target was a cron/news/digest item, use this as taste feedback.]" + ) + + from gateway.session import SessionSource + + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id=chat_id, + chat_name=getattr(chat, "title", None), + chat_type=chat_type, + user_id=user_id, + user_name=str(user_name) if user_name else None, + thread_id=str(thread_id) if thread_id is not None else None, + message_id=str(target_message_id) if target_message_id is not None else None, + ) + event = MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + raw_message=reaction_update, + message_id=str(target_message_id) if target_message_id is not None else None, + platform_update_id=update.update_id, + ) + logger.info( + "[%s] Telegram reaction received: chat=%s user=%s message=%s emoji=%s", + self.name, + chat_id, + user_id, + target_message_id, + " ".join(new_labels) if new_labels else "", + ) + await self.handle_message(event) + async def _handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle incoming text messages. @@ -5257,8 +5416,9 @@ class TelegramAdapter(BasePlatformAdapter): event.media_urls = [cached_path] event.media_types = ["audio/ogg"] logger.info("[Telegram] Cached user voice at %s", cached_path) + await self._send_voice_pre_stt_ack(event) except Exception as e: - logger.warning("[Telegram] Failed to cache voice: %s", e, exc_info=True) + logger.warning("[Telegram] Failed to cache voice before STT: %s", e, exc_info=True) elif msg.audio: try: file_obj = await msg.audio.get_file() diff --git a/gateway/run.py b/gateway/run.py index 7b5ace070..929fceefe 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1266,6 +1266,23 @@ def _is_control_interrupt_message(message: Optional[str]) -> bool: return normalized in _CONTROL_INTERRUPT_MESSAGES +def _should_process_pending_during_drain(result: Optional[dict]) -> bool: + """Return True when a queued follow-up is safe to run before shutdown. + + If a user sends a follow-up while an active gateway turn is running, the + adapter stores it in the pending slot. A service restart can begin while + that active turn is still running. When the active turn then finishes + cleanly during the drain window, dropping the pending slot loses the user's + message. Process it before exit unless the active turn itself ended via an + interruption/failure, where continuing work during shutdown would be unsafe. + """ + if not result: + return False + if result.get("interrupted") or result.get("failed"): + return False + return True + + def _skill_slug_from_frontmatter(skill_md: Path) -> tuple[str | None, str | None]: """Derive the /command slug and declared frontmatter name from a SKILL.md. @@ -7535,6 +7552,9 @@ class GatewayRunner: if canonical == "insights": return await self._handle_insights_command(event) + if canonical == "stats": + return await self._handle_stats_command(event) + if canonical == "reload-mcp": return await self._handle_reload_mcp_command(event) @@ -8216,6 +8236,28 @@ class GatewayRunner: context_note = "[System note: The user's previous session expired due to inactivity. This is a fresh conversation with no prior context.]" context_prompt = context_note + "\n\n" + context_prompt + # If the previous agent turn crashed, prepend a crash-context notice + # so the agent knows what went wrong and can self-diagnose. The + # error fields are cleared after this read so the notice appears + # exactly once (on the first message of the fresh session). + _last_err_type = getattr(session_entry, 'last_error_type', None) + if _last_err_type: + _last_err_msg = getattr(session_entry, 'last_error_message', '') or '' + _crash_note = ( + f"[System note: The previous session crashed with " + f"{_last_err_type}: {_last_err_msg[:300]}. " + f"Diagnose and fix the root cause if possible — " + f"do NOT just retry the same thing.]" + ) + context_prompt = _crash_note + "\n\n" + context_prompt + # Clear so the notice appears only once + try: + session_entry.last_error_type = None + session_entry.last_error_message = None + session_entry.last_error_time = None + except Exception: + pass + # Send a user-facing notification explaining the reset, unless: # - notifications are disabled in config # - the platform is excluded (e.g. api_server, webhook) @@ -9098,6 +9140,14 @@ class GatewayRunner: logger.exception("Agent error in session %s", session_key) error_type = type(e).__name__ error_detail = str(e)[:300] if str(e) else "no details available" + # Record the crash on the session entry so the next session's + # agent can self-diagnose the failure instead of starting blind. + try: + session_entry.last_error_type = error_type + session_entry.last_error_message = error_detail + session_entry.last_error_time = time.time() + except Exception: + pass # defensive — never let error-recording cause a secondary crash status_hint = "" status_code = getattr(e, "status_code", None) _hist_len = len(history) if 'history' in locals() else 0 @@ -13234,6 +13284,196 @@ class GatewayRunner: logger.error("Insights command error: %s", e, exc_info=True) return t("gateway.insights.error", error=e) + async def _handle_stats_command(self, event: MessageEvent) -> str: + """Handle /stats command — comprehensive system and model report. + + Pure local compute — no LLM call, no prompt-cache impact. Surfaces + model config, context engine, skills usage, curator state, cron jobs, + and recent session activity in a compact dashboard. + """ + import json, time as _time, os as _os + from datetime import datetime, timezone, timedelta + + source = event.source + session_entry = self.session_store.get_or_create_session(source) + lines: list[str] = [] + + # ── Header ────────────────────────────────────────────────────── + _now = datetime.now(timezone.utc) + lines.append("📊 **Hermes Stats**") + lines.append("") + + # ── Model ─────────────────────────────────────────────────────── + cfg = self._read_user_config() + model_cfg = cfg.get("model", {}) if isinstance(cfg, dict) else {} + main_model = model_cfg.get("default", "") if isinstance(model_cfg, dict) else str(model_cfg) + main_provider = model_cfg.get("provider", "") if isinstance(model_cfg, dict) else "" + + fallback_chain = cfg.get("fallback_providers", []) if isinstance(cfg, dict) else [] + fb_models = [f"{fb.get('model','?')} ({fb.get('provider','?')})" for fb in fallback_chain[:3]] + + # Context length — from cached agent if available, else config + ctx_len = "?" + ctx_engine_name = "compressor" + try: + _cache_lock = getattr(self, "_agent_cache_lock", None) + if _cache_lock: + with _cache_lock: + _cached = self._agent_cache.get(session_entry.session_key) + if _cached: + _agent, _, _ts = _cached + _cc = getattr(_agent, "context_compressor", None) + if _cc: + ctx_len = f"{getattr(_cc, 'context_length', 0):,}" + _ce = getattr(_cc, "name", None) or type(_cc).__name__ + ctx_engine_name = _ce.lower() if _ce != "ContextCompressor" else "compressor" + except Exception: + pass + + lines.append("**🤖 Model**") + lines.append(f" Main: `{main_model}` ({main_provider})") + if fb_models: + lines.append(f" Fallback: {', '.join(fb_models)}") + lines.append(f" Context: {ctx_len} tokens | Engine: {ctx_engine_name}") + lines.append("") + + # ── Context Engine (Semantic RLE) ─────────────────────────────── + _rle_dir = _os.path.expanduser("~/.hermes/hermes-agent/plugins/context_engine/semantic_rle") + _rle_plugin_yaml = _os.path.join(_rle_dir, "plugin.yaml") if _os.path.isdir(_rle_dir) else "" + rle_version = "?" + if _rle_plugin_yaml and _os.path.isfile(_rle_plugin_yaml): + try: + import yaml as _yaml + with open(_rle_plugin_yaml) as f: + _rle_cfg = _yaml.safe_load(f) or {} + rle_version = _rle_cfg.get("version", "?") + except Exception: + pass + + _rle_stats = {} + try: + _rle_dir_cache = _os.path.join(_rle_dir, "__pycache__") + if _os.path.isdir(_rle_dir_cache): + _rle_stats["cached"] = True + except Exception: + pass + + lines.append("**🧠 Context Engine**") + if _os.path.isdir(_rle_dir): + lines.append(f" Semantic RLE: v{rle_version} ✓") + else: + lines.append(" Semantic RLE: not installed") + lines.append("") + + # ── Skills Usage ──────────────────────────────────────────────── + _skills_usage_path = _os.path.expanduser("~/.hermes/skills/.usage.json") + skills_active = 0 + skills_agent = 0 + skills_top: list[tuple[str, int]] = [] + if _os.path.isfile(_skills_usage_path): + try: + with open(_skills_usage_path) as f: + _su = json.load(f) + for _sn, _sd in _su.items(): + if _sd.get("state") == "active": + skills_active += 1 + if _sd.get("created_by") == "agent": + skills_agent += 1 + _by_use = sorted(_su.items(), key=lambda x: x[1].get("use_count", 0), reverse=True) + skills_top = [(_sn, _sd.get("use_count", 0)) for _sn, _sd in _by_use[:5]] + except Exception: + pass + lines.append("**📚 Skills**") + lines.append(f" Active: {skills_active} | Agent-created: {skills_agent}") + if skills_top: + _top_str = ", ".join(f"`{n}` ({c})" for n, c in skills_top) + lines.append(f" Top: {_top_str}") + lines.append("") + + # ── Curator ───────────────────────────────────────────────────── + _curator_state_path = _os.path.expanduser("~/.hermes/skills/.curator_state.json") + curator_runs = 0 + curator_last = "never" + curator_archived = 0 + if _os.path.isfile(_curator_state_path): + try: + with open(_curator_state_path) as f: + _cs = json.load(f) + curator_runs = _cs.get("runs", 0) + _cls = _cs.get("last_run") + if _cls: + _dt = datetime.fromisoformat(_cls.replace("Z", "+00:00")) + _delta = _now - _dt + if _delta.days > 0: + curator_last = f"{_delta.days}d ago" + elif _delta.seconds > 3600: + curator_last = f"{_delta.seconds // 3600}h ago" + else: + curator_last = f"{_delta.seconds // 60}m ago" + curator_archived = _cs.get("archived_count", 0) + except Exception: + pass + lines.append("**🌱 Curator**") + lines.append(f" Runs: {curator_runs} | Last: {curator_last} | Archived: {curator_archived}") + lines.append("") + + # ── Cron ──────────────────────────────────────────────────────── + try: + from cron.jobs import load_jobs + _jobs = load_jobs() + _active_jobs = [j for j in _jobs if j.get("paused") != True] + _job_names = [j.get("name", j.get("job_id", "?")[:8]) for j in _active_jobs[:5]] + lines.append("**⏰ Cron**") + if _job_names: + lines.append(f" Active: {len(_active_jobs)} — {', '.join(_job_names)}") + else: + lines.append(" Active: 0") + lines.append("") + except Exception: + pass + + # ── Recent Activity ───────────────────────────────────────────── + try: + from hermes_state import SessionDB + _db = SessionDB() + _recent = _db.list_sessions(limit=5, offset=0) + _db.close() + if _recent: + _recent_24h = 0 + _recent_msgs = 0 + _cutoff = _now - timedelta(hours=24) + for _rs in _recent: + _t = _rs.get("updated_at") or _rs.get("created_at") + if _t: + _dt = datetime.fromisoformat(str(_t).replace("Z", "+00:00")) if isinstance(_t, str) else _t + if _dt > _cutoff: + _recent_24h += 1 + _recent_msgs += _rs.get("message_count", 0) + lines.append("**⏱️ Activity (24h)**") + lines.append(f" Sessions: {_recent_24h} | Messages: {_recent_msgs}") + _latest = _recent[0] if _recent else {} + _latest_title = _latest.get("title") or _latest.get("session_id", "?") + _latest_preview = (_latest.get("preview") or "")[:80] + if _latest_preview: + lines.append(f" Latest: _{_latest_preview}_") + lines.append("") + except Exception: + pass + + # ── System ────────────────────────────────────────────────────── + try: + import psutil + _proc = psutil.Process() + _mem_mb = _proc.memory_info().rss / 1024 / 1024 + lines.append("**🔧 System**") + lines.append(f" Memory: {_mem_mb:.0f} MB") + except ImportError: + pass + except Exception: + pass + + return "\n".join(lines) + async def _handle_reload_mcp_command(self, event: MessageEvent) -> Optional[str]: """Handle /reload-mcp — reconnect MCP servers and rebuild the cached agent. @@ -14585,6 +14825,79 @@ class GatewayRunner: return prefix return user_text + @staticmethod + def _transcription_looks_uncertain(transcript: str) -> bool: + """Heuristic gate for optional OpenAI Whisper fallback. + + Keep this deliberately conservative: the fallback is paid and slower, + so we only retry when the primary transcript explicitly looks broken or + uncertain, not for every voice note. + """ + text = (transcript or "").strip() + if not text: + return True + lowered = text.lower() + uncertainty_markers = ( + "inaudible", + "unintelligible", + "can't understand", + "cannot understand", + "could not understand", + "неразбор", + "не слышно", + "плохо слышно", + "не понял", + "непонятно", + ) + if any(marker in lowered for marker in uncertainty_markers): + return True + if "�" in text: + return True + compact = lowered.strip(" .,…!?-—\n\t") + return compact in {"", "...", "[music]", "(music)", "тишина"} + + def _should_try_openai_whisper_fallback(self, result: Dict[str, Any]) -> bool: + """Return True only for the configured fast OpenAI model + uncertain text.""" + if not result.get("success"): + return False + try: + from tools.transcription_tools import _load_stt_config + stt_config = _load_stt_config() + except Exception: + logger.exception("Failed to load STT config while evaluating fallback policy") + return False + if str(stt_config.get("provider", "")).lower() != "openai": + return False + openai_cfg = stt_config.get("openai", {}) if isinstance(stt_config.get("openai"), dict) else {} + model = str(openai_cfg.get("model", "")).strip() + if model != "gpt-4o-mini-transcribe": + return False + return self._transcription_looks_uncertain(str(result.get("transcript") or "")) + + async def _transcribe_audio_with_fallback(self, transcribe_audio, path: str) -> Dict[str, Any]: + """Transcribe once, then optionally retry uncertain mini-transcribe output with whisper-1.""" + result = await asyncio.to_thread(transcribe_audio, path) + if not self._should_try_openai_whisper_fallback(result): + return result + + logger.info("Primary OpenAI STT transcript looks uncertain; retrying %s with whisper-1", os.path.basename(path)) + try: + fallback = await asyncio.to_thread(transcribe_audio, path, model="whisper-1") + except Exception: + logger.exception("OpenAI Whisper-1 STT fallback failed for %s; keeping primary transcript", path) + result["stt_uncertain"] = True + return result + if fallback.get("success") and str(fallback.get("transcript") or "").strip(): + fallback["fallback_used"] = "whisper-1" + return fallback + logger.warning( + "OpenAI Whisper-1 STT fallback did not produce a usable transcript for %s: %s", + path, + fallback.get("error") or "empty transcript", + ) + result["stt_uncertain"] = True + return result + async def _enrich_message_with_transcription( self, user_text: str, @@ -14628,13 +14941,20 @@ class GatewayRunner: for path in audio_paths: try: logger.debug("Transcribing user voice: %s", path) - result = await asyncio.to_thread(transcribe_audio, path) + result = await self._transcribe_audio_with_fallback(transcribe_audio, path) if result["success"]: transcript = result["transcript"] - enriched_parts.append( + note = ( f'[The user sent a voice message~ ' - f'Here\'s what they said: "{transcript}"]' + f'Here\'s what they said: "{transcript}"' ) + if result.get("stt_uncertain") or self._transcription_looks_uncertain(str(transcript)): + note += ( + " The transcript still looks uncertain; don't guess. " + "Ask the user a short clarification in Russian." + ) + note += "]" + enriched_parts.append(note) else: error = result.get("error", "unknown error") if ( @@ -14661,7 +14981,7 @@ class GatewayRunner: f"transcribing it~ ({error})]" ) except Exception as e: - logger.error("Transcription error: %s", e) + logger.error("Transcription error for %s: %s", path, e, exc_info=True) enriched_parts.append( "[The user sent a voice message but something went wrong " "when I tried to listen to it~ Let them know!]" @@ -17667,13 +17987,20 @@ class GatewayRunner: pass if self._draining and (pending_event or pending): - logger.info( - "Discarding pending follow-up for session %s during gateway %s", - session_key or "?", - self._status_action_label(), - ) - pending_event = None - pending = None + if _should_process_pending_during_drain(result): + logger.info( + "Processing pending follow-up for session %s before gateway %s completes", + session_key or "?", + self._status_action_label(), + ) + else: + logger.info( + "Discarding pending follow-up for session %s during gateway %s", + session_key or "?", + self._status_action_label(), + ) + pending_event = None + pending = None if pending_event or pending: logger.debug("Processing pending message: '%s...'", pending[:40]) diff --git a/gateway/session.py b/gateway/session.py index 5f6fcb9a6..83c7a184e 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -453,6 +453,13 @@ class SessionEntry: # Last API-reported prompt tokens (for accurate compression pre-check) last_prompt_tokens: int = 0 + # Set when the previous agent turn crashed with an unhandled exception. + # Consumed once by the message handler to inject a crash-context notice + # into the next session's system prompt so the agent can self-diagnose. + last_error_type: Optional[str] = None + last_error_message: Optional[str] = None + last_error_time: Optional[float] = None + # Set when a session was created because the previous one expired; # consumed once by the message handler to inject a notice into context was_auto_reset: bool = False diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index f58924862..0a0fc6f1c 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -98,6 +98,7 @@ COMMAND_REGISTRY: list[CommandDef] = [ aliases=("bg", "btw"), args_hint=""), CommandDef("agents", "Show active agents and running tasks", "Session", aliases=("tasks",)), + CommandDef("stats", "Show comprehensive system stats — model, skills, curator, cron, activity", "Info"), CommandDef("queue", "Queue a prompt for the next turn (doesn't interrupt)", "Session", aliases=("q",), args_hint=""), CommandDef("steer", "Inject a message after the next tool call without interrupting", "Session", diff --git a/plugins/context_engine/semantic_rle/__init__.py b/plugins/context_engine/semantic_rle/__init__.py new file mode 100644 index 000000000..c21719e77 --- /dev/null +++ b/plugins/context_engine/semantic_rle/__init__.py @@ -0,0 +1,300 @@ +"""Experimental Semantic RLE context engine. + +MVP goals: +- keep the hot tail verbatim; +- collapse older chat into a deterministic factual ledger; +- mark superseded facts instead of silently forgetting them; +- redact likely credentials and IP-like sensitive strings before ledgering. + +This plugin is intentionally deterministic and does not call cloud LLMs. +It is not enabled by default; select it explicitly with ``context.engine``. +""" + +from __future__ import annotations + +import hashlib +import re +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional, Tuple + +from agent.context_engine import ContextEngine + +Message = Dict[str, Any] + +_TOKEN_PATTERNS: tuple[re.Pattern[str], ...] = ( + re.compile(r"\b(?:sk|xox[baprs]?|gh[pousr]|hf|AIza|ya29|pat|tok)[-_][A-Za-z0-9_./+=-]{12,}\b"), + re.compile(r"\b[A-Za-z0-9_./+=-]{32,}\b"), +) +_KEY_VALUE_SECRET_RE = re.compile( + r"(?i)\b(api[_-]?key|token|secret|password|passwd|authorization|bearer)\b\s*[:=]\s*([^\s,;]+)" +) +_IPV4_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") +_SERVER_RE = re.compile(r"(?i)\bserver\s*(?:is|=|:|->|to)?\s*([A-Za-z0-9][A-Za-z0-9._-]{1,})") +_DECISION_RE = re.compile(r"(?i)\b(decided|decision|решили|решение|choose|chosen|use|используем)\b") +_OBLIGATION_RE = re.compile(r"(?i)\b(todo|надо|нужно|must|should|обяз|follow up|сделай|сделать)\b") +_QUESTION_RE = re.compile(r"(?i)(\?\s*$|\b(unresolved|open question|вопрос|непонятно|уточнить)\b)") +_FACT_RE = re.compile(r"(?i)\b(is|are|=|:|это|будет|uses|runs|host|server|model|provider)\b") + + +@dataclass +class LedgerFact: + key: str + value: str + role: str + turn_index: int + active: bool = True + superseded_by: Optional[str] = None + + def line(self) -> str: + state = "active" if self.active else f"superseded by {self.superseded_by or 'newer fact'}" + return f"- [{state}] {self.key}: {self.value} (turn {self.turn_index}, {self.role})" + + +class SemanticRLEEngine(ContextEngine): + """Deterministic context engine for a hot-tail + semantic-ledger experiment.""" + + threshold_percent = 0.75 + protect_first_n = 1 + protect_last_n = 8 + + def __init__(self, context_length: int = 200_000, hot_tail_messages: int = 8) -> None: + self.context_length = context_length + self.threshold_tokens = int(context_length * self.threshold_percent) + self.hot_tail_messages = hot_tail_messages + self.last_prompt_tokens = 0 + self.last_completion_tokens = 0 + self.last_total_tokens = 0 + self.compression_count = 0 + self._last_ledger: dict[str, Any] = {} + + @property + def name(self) -> str: + return "semantic_rle" + + def is_available(self) -> bool: + return True + + def update_from_response(self, usage: Dict[str, Any]) -> None: + self.last_prompt_tokens = int(usage.get("prompt_tokens") or usage.get("input_tokens") or 0) + self.last_completion_tokens = int(usage.get("completion_tokens") or usage.get("output_tokens") or 0) + self.last_total_tokens = int(usage.get("total_tokens") or (self.last_prompt_tokens + self.last_completion_tokens)) + + def should_compress(self, prompt_tokens: Optional[int] = None) -> bool: + tokens = self.last_prompt_tokens if prompt_tokens is None else int(prompt_tokens) + return bool(self.threshold_tokens and tokens >= self.threshold_tokens) + + def should_compress_preflight(self, messages: List[Message]) -> bool: + return self.has_content_to_compress(messages) + + def has_content_to_compress(self, messages: List[Message]) -> bool: + return len(self._non_system(messages)) > self.hot_tail_messages + + def update_model( + self, + model: str, + context_length: int, + base_url: str = "", + api_key: str = "", + provider: str = "", + api_mode: str = "", + ) -> None: + self.context_length = int(context_length or self.context_length or 0) + self.threshold_tokens = int(self.context_length * self.threshold_percent) if self.context_length else 0 + + def compress( + self, + messages: List[Message], + current_tokens: Optional[int] = None, + focus_topic: Optional[str] = None, + ) -> List[Message]: + """Return original head + semantic ledger for cold turns + verbatim hot tail. + + The deterministic path is fail-closed: on unexpected errors, return a + shallow copy of the original message list rather than dropping context. + """ + self.compression_count += 1 + try: + if not messages: + return [] + + copied = [dict(m) for m in messages] + system_head = [m for m in copied if m.get("role") == "system"] + non_system = [m for m in copied if m.get("role") != "system"] + if len(non_system) <= self.hot_tail_messages: + return copied + + hot_tail = non_system[-self.hot_tail_messages :] + cold = non_system[: -self.hot_tail_messages] + ledger = self._build_ledger(cold, focus_topic=focus_topic) + self._last_ledger = ledger + + summary_message: Message = { + "role": "system", + "content": self._render_summary(ledger, focus_topic=focus_topic), + } + return [*system_head, summary_message, *hot_tail] + except Exception: + return [dict(m) for m in messages] + + def get_status(self) -> Dict[str, Any]: + status = super().get_status() + status.update( + { + "engine": self.name, + "hot_tail_messages": self.hot_tail_messages, + "ledger_counts": { + key: len(value) for key, value in self._last_ledger.items() if isinstance(value, list) + }, + } + ) + return status + + @staticmethod + def _non_system(messages: Iterable[Message]) -> list[Message]: + return [m for m in messages if m.get("role") != "system"] + + def _build_ledger(self, messages: List[Message], focus_topic: Optional[str] = None) -> dict[str, Any]: + facts_by_key: dict[str, LedgerFact] = {} + superseded: list[LedgerFact] = [] + decisions: list[str] = [] + obligations: list[str] = [] + questions: list[str] = [] + credential_refs: list[str] = [] + retrieval_notes: list[str] = [] + + for index, msg in enumerate(messages, start=1): + role = str(msg.get("role", "unknown")) + text = self._string_content(msg.get("content", "")) + if not text.strip(): + continue + sanitized, refs = self._sanitize(text) + credential_refs.extend(refs) + snippet = self._snippet(sanitized) + + fact = self._extract_fact(sanitized, role=role, turn_index=index) + if fact: + old = facts_by_key.get(fact.key) + if old and old.value != fact.value: + old.active = False + old.superseded_by = fact.value + superseded.append(old) + facts_by_key[fact.key] = fact + + if _DECISION_RE.search(sanitized): + decisions.append(f"- {snippet} (turn {index}, {role})") + if _OBLIGATION_RE.search(sanitized): + obligations.append(f"- {snippet} (turn {index}, {role})") + if _QUESTION_RE.search(sanitized): + questions.append(f"- {snippet} (turn {index}, {role})") + if focus_topic and focus_topic.lower() in sanitized.lower(): + retrieval_notes.append(f"- Focus match `{focus_topic}` at turn {index}: {snippet}") + + active_facts = [fact for fact in facts_by_key.values() if fact.active] + return { + "active_facts": active_facts, + "decisions": self._dedupe(decisions), + "obligations": self._dedupe(obligations), + "superseded_facts": superseded, + "unresolved_questions": self._dedupe(questions), + "credential_refs": self._dedupe(credential_refs), + "retrieval_notes": self._dedupe(retrieval_notes), + "cold_turns_compacted": len(messages), + } + + def _extract_fact(self, text: str, role: str, turn_index: int) -> Optional[LedgerFact]: + server = _SERVER_RE.search(text) + if server: + return LedgerFact("server", server.group(1), role, turn_index) + if not _FACT_RE.search(text): + return None + cleaned = self._snippet(text, limit=180) + key = self._fact_key(cleaned) + return LedgerFact(key, cleaned, role, turn_index) + + @staticmethod + def _fact_key(text: str) -> str: + lower = text.lower() + before_sep = re.split(r"\s*(?:is|are|=|:|это|будет|uses|runs)\s*", lower, maxsplit=1)[0] + words = re.findall(r"[a-zа-я0-9_-]+", before_sep)[:6] + return " ".join(words) or "fact" + + @staticmethod + def _string_content(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, dict): + if isinstance(item.get("text"), str): + parts.append(item["text"]) + elif isinstance(item.get("content"), str): + parts.append(item["content"]) + elif isinstance(item, str): + parts.append(item) + return "\n".join(parts) + return str(content) + + def _sanitize(self, text: str) -> Tuple[str, list[str]]: + refs: list[str] = [] + + def ref_for(raw: str, kind: str = "credential") -> str: + digest = hashlib.sha256(raw.encode("utf-8", "ignore")).hexdigest()[:10] + ref = f"credential_ref:{kind}:{digest}" + refs.append(ref) + return ref + + def replace_key_value(match: re.Match[str]) -> str: + key = match.group(1) + raw = match.group(2) + return f"{key}=<{ref_for(raw)}>" + + sanitized = _KEY_VALUE_SECRET_RE.sub(replace_key_value, text) + for pattern in _TOKEN_PATTERNS: + sanitized = pattern.sub(lambda m: f"<{ref_for(m.group(0))}>", sanitized) + sanitized = _IPV4_RE.sub("[REDACTED_IP]", sanitized) + return sanitized, refs + + @staticmethod + def _snippet(text: str, limit: int = 220) -> str: + compact = " ".join(text.split()) + if len(compact) <= limit: + return compact + return compact[: limit - 1].rstrip() + "…" + + @staticmethod + def _dedupe(items: Iterable[str]) -> list[str]: + seen: set[str] = set() + result: list[str] = [] + for item in items: + if item not in seen: + seen.add(item) + result.append(item) + return result + + @staticmethod + def _render_summary(ledger: dict[str, Any], focus_topic: Optional[str] = None) -> str: + sections: list[str] = [ + "Semantic RLE context ledger (deterministic, older turns compacted).", + f"Cold turns compacted: {ledger.get('cold_turns_compacted', 0)}.", + "Hot tail messages after this block are preserved verbatim.", + ] + if focus_topic: + sections.append(f"Compression focus: {focus_topic}") + + def add_section(title: str, lines: list[str]) -> None: + sections.append(f"\n## {title}") + sections.extend(lines or ["- None detected."]) + + add_section("Active facts", [f.line() for f in ledger.get("active_facts", [])]) + add_section("Decisions", ledger.get("decisions", [])) + add_section("Obligations", ledger.get("obligations", [])) + add_section("Superseded facts", [f.line() for f in ledger.get("superseded_facts", [])]) + add_section("Unresolved questions", ledger.get("unresolved_questions", [])) + add_section("Credential refs", [f"- {ref}" for ref in ledger.get("credential_refs", [])]) + add_section("Retrieval notes", ledger.get("retrieval_notes", [])) + return "\n".join(sections) + + +def register(ctx: Any) -> None: + ctx.register_context_engine(SemanticRLEEngine()) diff --git a/plugins/context_engine/semantic_rle/plugin.yaml b/plugins/context_engine/semantic_rle/plugin.yaml new file mode 100644 index 000000000..41e79394e --- /dev/null +++ b/plugins/context_engine/semantic_rle/plugin.yaml @@ -0,0 +1,4 @@ +name: semantic_rle +version: 0.1.0 +experimental: true +description: "Experimental deterministic Semantic RLE context engine: hot tail + factual ledger + supersession notes for long chats." diff --git a/run_agent.py b/run_agent.py index d2d65314f..13ba45a90 100644 --- a/run_agent.py +++ b/run_agent.py @@ -1526,7 +1526,15 @@ class AIAgent: # Fallback: truncate the raw string but give more room than 200 chars status_code = getattr(error, "status_code", None) - prefix = f"HTTP {status_code}: " if status_code else "" + prefix_parts = [] + if status_code: + prefix_parts.append(f"HTTP {status_code}") + # For local bugs (TypeError, ValueError without status_code), include + # the exception type so logs clearly distinguish "provider rejected us" + # from "we have a code bug". + if not status_code and isinstance(error, (TypeError, ValueError)): + prefix_parts.append(type(error).__name__) + prefix = f"{': '.join(prefix_parts)}: " if prefix_parts else "" return f"{prefix}{raw[:500]}" def _mask_api_key_for_logs(self, key: Any) -> Optional[str]: diff --git a/scripts/semantic_rle_eval.py b/scripts/semantic_rle_eval.py new file mode 100644 index 000000000..55e573a25 --- /dev/null +++ b/scripts/semantic_rle_eval.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +"""Deterministic smoke-eval for the experimental semantic_rle context engine. + +This is not an LLM quality benchmark. It checks the minimum invariant we need +before trying live Telegram replay: after cold-history compaction, the +model-visible context still contains current facts/obligations, marks stale +facts as superseded, preserves the hot tail byte-for-byte, and does not leak +raw fake secrets from cold turns. +""" + +from __future__ import annotations + +import argparse +import json +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Callable + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from plugins.context_engine.semantic_rle import SemanticRLEEngine # noqa: E402 + +Message = dict[str, Any] + + +@dataclass(frozen=True) +class Scenario: + name: str + messages: list[Message] + hot_tail_messages: int + checks: dict[str, Callable[[list[Message]], bool]] + + +def _render(messages: list[Message]) -> str: + return "\n".join(str(message.get("content", "")) for message in messages) + + +def _hot_tail_only(messages: list[Message], hot_tail_messages: int) -> list[Message]: + system_head = [dict(m) for m in messages if m.get("role") == "system"] + non_system = [dict(m) for m in messages if m.get("role") != "system"] + return [*system_head, *non_system[-hot_tail_messages:]] + + +def _semantic_rle(messages: list[Message], hot_tail_messages: int) -> list[Message]: + return SemanticRLEEngine(hot_tail_messages=hot_tail_messages).compress(messages) + + +def _scenarios() -> list[Scenario]: + fake_token = "sk-test1234567890abcdef1234567890" + fake_pat = "ghp_abcdef1234567890abcdef1234567890" + ip = "203.0.113.42" + + supersession_messages = [ + {"role": "system", "content": "You are Hermes."}, + {"role": "user", "content": "server alpha.example is the current deployment target"}, + {"role": "assistant", "content": "Noted: server alpha.example."}, + {"role": "user", "content": "decision: use postgres for the ledger"}, + {"role": "assistant", "content": "I will use postgres."}, + {"role": "user", "content": "todo: compare misses against baseline"}, + {"role": "assistant", "content": "Added comparison todo."}, + {"role": "user", "content": "unresolved question: how often to compact?"}, + {"role": "assistant", "content": "We can measure compaction cadence."}, + {"role": "user", "content": "server beta.example is the current deployment target now"}, + {"role": "assistant", "content": "Switched to beta.example."}, + {"role": "user", "content": "hot tail user message"}, + {"role": "assistant", "content": "hot tail assistant message"}, + ] + + secret_messages = [ + {"role": "system", "content": "You are Hermes."}, + {"role": "user", "content": f"api_key={fake_token} server {ip}"}, + {"role": "assistant", "content": f"token: {fake_pat}"}, + {"role": "user", "content": "todo: rotate the credential reference"}, + {"role": "assistant", "content": "Will track credential refs only."}, + {"role": "user", "content": "hot tail one"}, + {"role": "assistant", "content": "hot tail two"}, + ] + + return [ + Scenario( + name="supersession_and_obligation", + messages=supersession_messages, + hot_tail_messages=2, + checks={ + "current_fact_retained": lambda m: "server: beta.example" in _render(m), + "old_fact_marked_superseded": lambda m: "[superseded by beta.example] server: alpha.example" in _render(m), + "decision_retained": lambda m: "use postgres" in _render(m), + "obligation_retained": lambda m: "compare misses against baseline" in _render(m), + "question_retained": lambda m: "how often to compact" in _render(m), + "hot_tail_preserved": lambda m: m[-2:] == supersession_messages[-2:], + }, + ), + Scenario( + name="cold_secret_redaction", + messages=secret_messages, + hot_tail_messages=2, + checks={ + "raw_fake_token_absent": lambda m: fake_token not in _render(m) and fake_pat not in _render(m), + "raw_ip_absent": lambda m: ip not in _render(m), + "credential_ref_present": lambda m: "credential_ref:credential:" in _render(m), + "ip_redacted_marker_present": lambda m: "[REDACTED_IP]" in _render(m), + "obligation_retained": lambda m: "rotate the credential reference" in _render(m), + "hot_tail_preserved": lambda m: m[-2:] == secret_messages[-2:], + }, + ), + ] + + +def run_eval() -> dict[str, Any]: + engines: dict[str, Callable[[list[Message], int], list[Message]]] = { + "hot_tail_only_baseline": _hot_tail_only, + "semantic_rle": _semantic_rle, + } + results: list[dict[str, Any]] = [] + + for scenario in _scenarios(): + for engine_name, engine_fn in engines.items(): + compacted = engine_fn(scenario.messages, scenario.hot_tail_messages) + checks = {name: bool(check(compacted)) for name, check in scenario.checks.items()} + passed = sum(1 for ok in checks.values() if ok) + results.append( + { + "scenario": scenario.name, + "engine": engine_name, + "passed": passed, + "total": len(checks), + "misses": [name for name, ok in checks.items() if not ok], + "message_count_before": len(scenario.messages), + "message_count_after": len(compacted), + "char_count_after": len(_render(compacted)), + } + ) + + by_engine: dict[str, dict[str, int]] = {} + for row in results: + aggregate = by_engine.setdefault(row["engine"], {"passed": 0, "total": 0}) + aggregate["passed"] += int(row["passed"]) + aggregate["total"] += int(row["total"]) + + return {"results": results, "summary": by_engine} + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--json", action="store_true", help="print machine-readable JSON") + args = parser.parse_args() + + report = run_eval() + if args.json: + print(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True)) + else: + print("Semantic RLE deterministic smoke-eval") + for engine, row in report["summary"].items(): + print(f"- {engine}: {row['passed']}/{row['total']} invariant checks passed") + print("Details:") + for row in report["results"]: + misses = ", ".join(row["misses"]) or "none" + print( + f"- {row['scenario']} / {row['engine']}: " + f"{row['passed']}/{row['total']}, misses={misses}, " + f"messages {row['message_count_before']}→{row['message_count_after']}, " + f"chars_after={row['char_count_after']}" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/gateway/test_shutdown_pending_followup.py b/tests/gateway/test_shutdown_pending_followup.py new file mode 100644 index 000000000..69b58f6fc --- /dev/null +++ b/tests/gateway/test_shutdown_pending_followup.py @@ -0,0 +1,32 @@ +"""Regression tests for queued follow-ups during gateway shutdown. + +A user message can arrive while the gateway is still draining an active agent +turn (for example during a service restart after a code update). The active +turn may finish cleanly while the gateway is in draining mode. In that case +we must process the already-queued follow-up before exiting instead of dropping +it silently. +""" + +from gateway.run import _should_process_pending_during_drain + + +def test_clean_result_processes_pending_followup_during_drain(): + result = {"final_response": "done", "messages": [], "api_calls": 1} + + assert _should_process_pending_during_drain(result) is True + + +def test_interrupted_result_drops_pending_followup_during_drain(): + result = {"interrupted": True, "final_response": ""} + + assert _should_process_pending_during_drain(result) is False + + +def test_failed_result_drops_pending_followup_during_drain(): + result = {"failed": True, "final_response": "boom"} + + assert _should_process_pending_during_drain(result) is False + + +def test_missing_result_drops_pending_followup_during_drain(): + assert _should_process_pending_during_drain(None) is False diff --git a/tests/gateway/test_telegram_audio_vs_voice.py b/tests/gateway/test_telegram_audio_vs_voice.py index d8ad38e29..dd8d4b9f3 100644 --- a/tests/gateway/test_telegram_audio_vs_voice.py +++ b/tests/gateway/test_telegram_audio_vs_voice.py @@ -79,6 +79,77 @@ async def test_voice_message_still_transcribed(): assert "voice message" in result.lower() +@pytest.mark.asyncio +async def test_voice_stt_fallback_only_for_uncertain_openai_mini(monkeypatch): + """OpenAI mini-transcribe should retry whisper-1 only when the transcript looks uncertain.""" + runner = _make_runner(stt_enabled=True) + source = SessionSource(platform=Platform.TELEGRAM, chat_id="1", chat_type="dm") + event = _voice_event("/tmp/voice.ogg") + + monkeypatch.setattr( + "tools.transcription_tools._load_stt_config", + lambda: {"provider": "openai", "openai": {"model": "gpt-4o-mini-transcribe"}}, + ) + calls = [] + + def fake_transcribe(path, model=None): + calls.append((path, model)) + if model == "whisper-1": + return {"success": True, "transcript": "нормальный текст", "provider": "openai"} + return {"success": True, "transcript": "[inaudible]", "provider": "openai"} + + with patch("tools.transcription_tools.transcribe_audio", side_effect=fake_transcribe): + result = await runner._prepare_inbound_message_text(event=event, source=source, history=[]) + + assert calls == [("/tmp/voice.ogg", None), ("/tmp/voice.ogg", "whisper-1")] + assert "нормальный текст" in result + + +@pytest.mark.asyncio +async def test_voice_stt_no_fallback_for_confident_openai_mini(monkeypatch): + """Confident primary transcripts should not pay for a Whisper fallback.""" + runner = _make_runner(stt_enabled=True) + source = SessionSource(platform=Platform.TELEGRAM, chat_id="1", chat_type="dm") + event = _voice_event("/tmp/voice.ogg") + + monkeypatch.setattr( + "tools.transcription_tools._load_stt_config", + lambda: {"provider": "openai", "openai": {"model": "gpt-4o-mini-transcribe"}}, + ) + with patch( + "tools.transcription_tools.transcribe_audio", + return_value={"success": True, "transcript": "привет, проверь статус", "provider": "openai"}, + ) as mock_transcribe: + result = await runner._prepare_inbound_message_text(event=event, source=source, history=[]) + + mock_transcribe.assert_called_once_with("/tmp/voice.ogg") + assert "привет, проверь статус" in result + + +@pytest.mark.asyncio +async def test_voice_stt_uncertain_after_failed_fallback_requests_short_clarification(monkeypatch): + """If fallback cannot rescue an uncertain transcript, the agent should ask briefly instead of guessing.""" + runner = _make_runner(stt_enabled=True) + source = SessionSource(platform=Platform.TELEGRAM, chat_id="1", chat_type="dm") + event = _voice_event("/tmp/voice.ogg") + + monkeypatch.setattr( + "tools.transcription_tools._load_stt_config", + lambda: {"provider": "openai", "openai": {"model": "gpt-4o-mini-transcribe"}}, + ) + + def fake_transcribe(path, model=None): + if model == "whisper-1": + return {"success": False, "error": "temporary provider error", "provider": "openai"} + return {"success": True, "transcript": "[inaudible]", "provider": "openai"} + + with patch("tools.transcription_tools.transcribe_audio", side_effect=fake_transcribe): + result = await runner._prepare_inbound_message_text(event=event, source=source, history=[]) + + assert "[inaudible]" in result + assert "short clarification in Russian" in result + + # --------------------------------------------------------------------------- # 2. AUDIO file attachment bypasses STT # --------------------------------------------------------------------------- diff --git a/tests/gateway/test_telegram_reactions.py b/tests/gateway/test_telegram_reactions.py index 8b3b0686b..30d6addc6 100644 --- a/tests/gateway/test_telegram_reactions.py +++ b/tests/gateway/test_telegram_reactions.py @@ -17,6 +17,7 @@ def _make_adapter(**extra_env): adapter.platform = Platform.TELEGRAM adapter.config = PlatformConfig(enabled=True, token="fake-token") adapter._bot = AsyncMock() + adapter._bot.id = 999 adapter._bot.set_message_reaction = AsyncMock() return adapter @@ -315,3 +316,79 @@ def test_config_reactions_env_takes_precedence(monkeypatch, tmp_path): import os assert os.getenv("TELEGRAM_REACTIONS") == "false" + + +# ── inbound user reactions ─────────────────────────────────────────── + + +def _make_reaction_update(*, emoji="👎", old_emoji=None, user_id=42, bot_id=999): + new_reaction = [] if emoji is None else [SimpleNamespace(emoji=emoji)] + old_reaction = [] if old_emoji is None else [SimpleNamespace(emoji=old_emoji)] + return SimpleNamespace( + update_id=777, + message_reaction=SimpleNamespace( + chat=SimpleNamespace(id=123, type="private", title=None), + user=SimpleNamespace(id=user_id, first_name="Anton", username="toxblh"), + message_id=456, + message_thread_id=None, + new_reaction=new_reaction, + old_reaction=old_reaction, + ), + ) + + +@pytest.mark.asyncio +async def test_inbound_negative_reaction_becomes_mimo_feedback(): + adapter = _make_adapter() + adapter._is_callback_user_authorized = lambda *args, **kwargs: True + adapter.handle_message = AsyncMock() + + await adapter._handle_message_reaction(_make_reaction_update(emoji="👎"), None) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.text == ( + "[Telegram reaction feedback: 👎 on message 456. " + "Interpretation: negative feedback / мимо. " + "If the target was a cron/news/digest item, use this as taste feedback.]" + ) + assert event.source.chat_id == "123" + assert event.source.user_id == "42" + assert event.source.message_id == "456" + assert event.message_id == "456" + assert event.platform_update_id == 777 + + +@pytest.mark.asyncio +async def test_inbound_positive_reaction_becomes_godno_feedback(): + adapter = _make_adapter() + adapter._is_callback_user_authorized = lambda *args, **kwargs: True + adapter.handle_message = AsyncMock() + + await adapter._handle_message_reaction(_make_reaction_update(emoji="👌"), None) + + event = adapter.handle_message.await_args.args[0] + assert "Telegram reaction feedback: 👌" in event.text + assert "Interpretation: positive feedback / годно" in event.text + + +@pytest.mark.asyncio +async def test_inbound_reaction_removal_is_ignored(): + adapter = _make_adapter() + adapter._is_callback_user_authorized = lambda *args, **kwargs: True + adapter.handle_message = AsyncMock() + + await adapter._handle_message_reaction(_make_reaction_update(emoji=None, old_emoji="👎"), None) + + adapter.handle_message.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_inbound_bot_own_reaction_is_ignored(): + adapter = _make_adapter() + adapter._is_callback_user_authorized = lambda *args, **kwargs: True + adapter.handle_message = AsyncMock() + + await adapter._handle_message_reaction(_make_reaction_update(emoji="👎", user_id=999), None) + + adapter.handle_message.assert_not_awaited() diff --git a/tests/gateway/test_telegram_voice_ack.py b/tests/gateway/test_telegram_voice_ack.py new file mode 100644 index 000000000..5f4ac5a00 --- /dev/null +++ b/tests/gateway/test_telegram_voice_ack.py @@ -0,0 +1,111 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import Platform +from gateway.platforms.base import MessageEvent, MessageType, SendResult +from gateway.session import SessionSource + + +def _voice_event(thread_id=None) -> MessageEvent: + return MessageEvent( + text="", + message_type=MessageType.VOICE, + source=SessionSource( + platform=Platform.TELEGRAM, + chat_id="123", + chat_type="dm", + thread_id=thread_id, + message_id="42", + ), + message_id="42", + ) + + +@pytest.mark.asyncio +async def test_pre_stt_ack_is_best_effort_and_platform_side(): + from gateway.platforms.telegram import TelegramAdapter + + adapter = TelegramAdapter.__new__(TelegramAdapter) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="99")) + + event = _voice_event() + await adapter._send_voice_pre_stt_ack(event) + + adapter.send.assert_awaited_once_with( + "123", + "🎧 получил, распознаю", + reply_to="42", + metadata=None, + ) + + +@pytest.mark.asyncio +async def test_pre_stt_ack_failure_does_not_raise(): + from gateway.platforms.telegram import TelegramAdapter + + adapter = TelegramAdapter.__new__(TelegramAdapter) + adapter.send = AsyncMock(side_effect=RuntimeError("telegram is temporarily unavailable")) + + await adapter._send_voice_pre_stt_ack(_voice_event()) + + adapter.send.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_voice_handler_sends_ack_before_dispatching_to_gateway(): + from gateway.platforms.telegram import TelegramAdapter + + adapter = TelegramAdapter.__new__(TelegramAdapter) + adapter.platform = Platform.TELEGRAM + adapter.config = MagicMock() + adapter.config.extra = {} + order = [] + adapter._should_process_message = lambda message: True + adapter._apply_telegram_group_observe_attribution = lambda event: event + adapter._send_voice_pre_stt_ack = AsyncMock(side_effect=lambda event: order.append("ack")) + adapter.handle_message = AsyncMock(side_effect=lambda event: order.append("handle")) + + file_obj = MagicMock() + file_obj.download_as_bytearray = AsyncMock(return_value=bytearray(b"ogg-data")) + voice = MagicMock() + voice.get_file = AsyncMock(return_value=file_obj) + + chat = MagicMock() + chat.id = 123 + chat.type = "private" + chat.title = None + chat.full_name = "Тохыч" + user = MagicMock() + user.id = 456 + user.full_name = "Тохыч" + + msg = MagicMock() + msg.chat = chat + msg.from_user = user + msg.message_id = 42 + msg.message_thread_id = None + msg.is_topic_message = False + msg.date = None + msg.caption = None + msg.text = "" + msg.reply_to_message = None + msg.sticker = None + msg.photo = [] + msg.video = None + msg.audio = None + msg.voice = voice + msg.document = None + msg.media_group_id = None + + update = MagicMock() + update.message = msg + update.update_id = 777 + + with patch("gateway.platforms.telegram.cache_audio_from_bytes", return_value="/tmp/voice.ogg"): + await adapter._handle_media_message(update, MagicMock()) + + assert order == ["ack", "handle"] + handled_event = adapter.handle_message.await_args.args[0] + assert handled_event.media_urls == ["/tmp/voice.ogg"] + assert handled_event.media_types == ["audio/ogg"] diff --git a/tests/plugins/test_semantic_rle_context_engine.py b/tests/plugins/test_semantic_rle_context_engine.py new file mode 100644 index 000000000..7abb9c9a9 --- /dev/null +++ b/tests/plugins/test_semantic_rle_context_engine.py @@ -0,0 +1,102 @@ +"""Tests for the experimental semantic_rle context engine plugin.""" + +from __future__ import annotations + +from agent.context_engine import ContextEngine +from plugins.context_engine import discover_context_engines, load_context_engine +from plugins.context_engine.semantic_rle import SemanticRLEEngine + + +def _messages() -> list[dict[str, str]]: + return [ + {"role": "system", "content": "You are Hermes."}, + {"role": "user", "content": "server alpha.example is the current deployment target"}, + {"role": "assistant", "content": "Noted: server alpha.example."}, + {"role": "user", "content": "decision: use postgres for the ledger"}, + {"role": "assistant", "content": "I will use postgres."}, + {"role": "user", "content": "todo: compare misses against baseline"}, + {"role": "assistant", "content": "Added comparison todo."}, + {"role": "user", "content": "unresolved question: how often to compact?"}, + {"role": "assistant", "content": "We can measure compaction cadence."}, + {"role": "user", "content": "server beta.example is the current deployment target now"}, + {"role": "assistant", "content": "Switched to beta.example."}, + {"role": "user", "content": "hot tail user message"}, + {"role": "assistant", "content": "hot tail assistant message"}, + ] + + +def test_semantic_rle_satisfies_abc_and_returns_valid_messages(): + engine = SemanticRLEEngine(hot_tail_messages=4) + assert isinstance(engine, ContextEngine) + assert engine.name == "semantic_rle" + + result = engine.compress(_messages()) + + assert isinstance(result, list) + assert all("role" in message and "content" in message for message in result) + assert result[0]["role"] == "system" + assert any("Semantic RLE context ledger" in message["content"] for message in result) + + +def test_hot_tail_preserved_verbatim(): + engine = SemanticRLEEngine(hot_tail_messages=4) + messages = _messages() + + result = engine.compress(messages) + + assert result[-4:] == messages[-4:] + + +def test_server_supersession_marks_old_fact_inactive(): + engine = SemanticRLEEngine(hot_tail_messages=2) + + result = engine.compress(_messages()) + summary = "\n".join(str(message["content"]) for message in result if message["role"] == "system") + + assert "server: beta.example" in summary + assert "[superseded by beta.example] server: alpha.example" in summary + + +def test_fake_tokens_and_ip_like_strings_are_redacted_to_refs(): + engine = SemanticRLEEngine(hot_tail_messages=2) + fake_token = "sk-test_abcdefghijklmnopqrstuvwxyz1234567890" + fake_pat = "ghp_abcdefghijklmnopqrstuvwxyz1234567890" + messages = [ + {"role": "user", "content": f"api_key={fake_token} server 203.0.113.9"}, + {"role": "assistant", "content": f"token: {fake_pat}"}, + {"role": "user", "content": "tail one"}, + {"role": "assistant", "content": "tail two"}, + {"role": "user", "content": "tail three"}, + ] + + result = engine.compress(messages) + rendered = "\n".join(str(message["content"]) for message in result) + + assert fake_token not in rendered + assert fake_pat not in rendered + assert "203.0.113.9" not in rendered + assert "credential_ref:credential:" in rendered + assert "[REDACTED_IP]" in rendered + + +def test_deterministic_failure_path_returns_original_messages(monkeypatch): + engine = SemanticRLEEngine(hot_tail_messages=2) + messages = _messages() + + def explode(*args, **kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr(engine, "_build_ledger", explode) + + assert engine.compress(messages) == messages + + +def test_plugin_discovery_and_explicit_load_without_global_activation(): + discovered = {name: (description, available) for name, description, available in discover_context_engines()} + + assert "semantic_rle" in discovered + assert discovered["semantic_rle"][1] is True + + engine = load_context_engine("semantic_rle") + assert isinstance(engine, SemanticRLEEngine) + assert engine.name == "semantic_rle" diff --git a/tests/scripts/test_semantic_rle_eval.py b/tests/scripts/test_semantic_rle_eval.py new file mode 100644 index 000000000..72e530d82 --- /dev/null +++ b/tests/scripts/test_semantic_rle_eval.py @@ -0,0 +1,39 @@ +"""Tests for scripts/semantic_rle_eval.py.""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path + + +def _load_eval_module(): + path = Path(__file__).resolve().parents[2] / "scripts" / "semantic_rle_eval.py" + spec = importlib.util.spec_from_file_location("semantic_rle_eval", path) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + sys.modules["semantic_rle_eval"] = module + spec.loader.exec_module(module) + return module + + +def test_semantic_rle_eval_reports_baseline_misses_and_semantic_passes(): + module = _load_eval_module() + + report = module.run_eval() + + assert report["summary"]["semantic_rle"] == {"passed": 12, "total": 12} + assert report["summary"]["hot_tail_only_baseline"]["passed"] < 12 + + semantic_rows = [row for row in report["results"] if row["engine"] == "semantic_rle"] + assert all(row["misses"] == [] for row in semantic_rows) + + baseline_misses = { + miss + for row in report["results"] + if row["engine"] == "hot_tail_only_baseline" + for miss in row["misses"] + } + assert "current_fact_retained" in baseline_misses + assert "obligation_retained" in baseline_misses + assert "credential_ref_present" in baseline_misses