fix: route Telegram DM topic deliveries directly

(cherry picked from commit ad8f97db6c9e1a93ec38c5d616b2e37941187ef3)
This commit is contained in:
stepanov1975 2026-05-16 18:00:05 +00:00 committed by Teknium
parent 0dee92df22
commit 415be55394
4 changed files with 140 additions and 20 deletions

View File

@ -25,6 +25,15 @@ from .config import Platform, GatewayConfig
from .session import SessionSource from .session import SessionSource
def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool:
if chat_id is None:
return False
try:
return int(chat_id) > 0
except (TypeError, ValueError):
return False
@dataclass @dataclass
class DeliveryTarget: class DeliveryTarget:
""" """
@ -249,9 +258,22 @@ class DeliveryRouter:
) )
send_metadata = dict(metadata or {}) send_metadata = dict(metadata or {})
if target.thread_id and "thread_id" not in send_metadata: if target.thread_id:
send_metadata["thread_id"] = target.thread_id if (
return await adapter.send(target.chat_id, content, metadata=send_metadata or None) target.platform == Platform.TELEGRAM
and _looks_like_telegram_private_chat_id(target.chat_id)
and "thread_id" not in send_metadata
and "message_thread_id" not in send_metadata
and "direct_messages_topic_id" not in send_metadata
and "telegram_direct_messages_topic_id" not in send_metadata
):
send_metadata["telegram_direct_messages_topic_id"] = target.thread_id
elif "thread_id" not in send_metadata:
send_metadata["thread_id"] = target.thread_id
result = await adapter.send(target.chat_id, content, metadata=send_metadata or None)
if getattr(result, "success", True) is False:
raise RuntimeError(getattr(result, "error", None) or f"{target.platform.value} delivery failed")
return result

View File

@ -568,6 +568,34 @@ class TelegramAdapter(BasePlatformAdapter):
reply_to = metadata.get("telegram_reply_to_message_id") reply_to = metadata.get("telegram_reply_to_message_id")
return int(reply_to) if reply_to is not None else None return int(reply_to) if reply_to is not None else None
@staticmethod
def _looks_like_private_chat_id(chat_id: str) -> bool:
try:
return int(chat_id) > 0
except (TypeError, ValueError):
return False
@classmethod
def _is_private_dm_topic_send(
cls,
chat_id: str,
thread_id: Optional[str],
metadata: Optional[Dict[str, Any]],
) -> bool:
if cls._metadata_direct_messages_topic_id(metadata) is not None:
return False
return bool(
thread_id
and (
metadata and metadata.get("telegram_dm_topic_reply_fallback")
or cls._looks_like_private_chat_id(chat_id)
)
)
@staticmethod
def _dm_topic_missing_anchor_error() -> str:
return "Telegram DM topic delivery requires a reply anchor; refusing to send outside the requested topic"
@classmethod @classmethod
def _reply_to_message_id_for_send( def _reply_to_message_id_for_send(
cls, cls,
@ -1739,11 +1767,11 @@ class TelegramAdapter(BasePlatformAdapter):
for i, chunk in enumerate(chunks): for i, chunk in enumerate(chunks):
retried_thread_not_found = False retried_thread_not_found = False
metadata_reply_to = self._metadata_reply_to_message_id(metadata) metadata_reply_to = self._metadata_reply_to_message_id(metadata)
private_dm_topic_send = self._is_private_dm_topic_send(chat_id, thread_id, metadata)
reply_to_source = reply_to or ( reply_to_source = reply_to or (
str(metadata_reply_to) str(metadata_reply_to) if private_dm_topic_send and metadata_reply_to is not None else None
if metadata and metadata.get("telegram_dm_topic_reply_fallback") and metadata_reply_to is not None else None
) )
if metadata and metadata.get("telegram_dm_topic_reply_fallback"): if private_dm_topic_send:
should_thread = ( should_thread = (
reply_to_source is not None reply_to_source is not None
and self._reply_to_mode != "off" and self._reply_to_mode != "off"
@ -1751,6 +1779,12 @@ class TelegramAdapter(BasePlatformAdapter):
else: else:
should_thread = self._should_thread_reply(reply_to_source, i) should_thread = self._should_thread_reply(reply_to_source, i)
reply_to_id = int(reply_to_source) if should_thread and reply_to_source else None reply_to_id = int(reply_to_source) if should_thread and reply_to_source else None
if private_dm_topic_send and reply_to_id is None:
return SendResult(
success=False,
error=self._dm_topic_missing_anchor_error(),
retryable=False,
)
thread_kwargs = self._thread_kwargs_for_send( thread_kwargs = self._thread_kwargs_for_send(
chat_id, chat_id,
thread_id, thread_id,
@ -1801,6 +1835,12 @@ class TelegramAdapter(BasePlatformAdapter):
# specific cases instead of blindly retrying. # specific cases instead of blindly retrying.
if _BadReq and isinstance(send_err, _BadReq): if _BadReq and isinstance(send_err, _BadReq):
if self._is_thread_not_found_error(send_err) and effective_thread_id is not None: if self._is_thread_not_found_error(send_err) and effective_thread_id is not None:
if private_dm_topic_send or (metadata and metadata.get("telegram_dm_topic_created_for_send")):
return SendResult(
success=False,
error=str(send_err),
retryable=False,
)
# Telegram has been observed to return a # Telegram has been observed to return a
# one-off "thread not found" that recovers on # one-off "thread not found" that recovers on
# an immediate retry (transient flake — see # an immediate retry (transient flake — see
@ -1827,6 +1867,12 @@ class TelegramAdapter(BasePlatformAdapter):
continue continue
err_lower = str(send_err).lower() err_lower = str(send_err).lower()
if "message to be replied not found" in err_lower and reply_to_id is not None: if "message to be replied not found" in err_lower and reply_to_id is not None:
if private_dm_topic_send:
return SendResult(
success=False,
error=str(send_err),
retryable=False,
)
# Original message was deleted before we # Original message was deleted before we
# could reply. For private-topic fallback # could reply. For private-topic fallback
# sends, message_thread_id is only valid with # sends, message_thread_id is only valid with

View File

@ -1,7 +1,10 @@
"""Tests for the delivery routing module.""" """Tests for the delivery routing module."""
from gateway.config import Platform import pytest
from gateway.delivery import DeliveryTarget
from gateway.config import GatewayConfig, Platform
from gateway.delivery import DeliveryRouter, DeliveryTarget
from gateway.platforms.base import SendResult
from gateway.session import SessionSource from gateway.session import SessionSource
@ -122,5 +125,57 @@ class TestPlatformNameCaseInsensitivity:
assert target.platform == Platform.TELEGRAM assert target.platform == Platform.TELEGRAM
assert target.chat_id == "12345" assert target.chat_id == "12345"
class RecordingAdapter:
def __init__(self):
self.calls = []
async def send(self, chat_id, content, metadata=None):
self.calls.append({"chat_id": chat_id, "content": content, "metadata": metadata})
return {"success": True}
@pytest.mark.asyncio
async def test_explicit_telegram_private_thread_uses_direct_messages_topic_id(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
adapter = RecordingAdapter()
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
target = DeliveryTarget.parse("telegram:722341991:32344")
await router._deliver_to_platform(target, "hello", metadata=None)
assert adapter.calls == [
{
"chat_id": "722341991",
"content": "hello",
"metadata": {
"telegram_direct_messages_topic_id": "32344",
},
}
]
@pytest.mark.asyncio
async def test_explicit_telegram_group_thread_does_not_mark_dm_fallback(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
adapter = RecordingAdapter()
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
target = DeliveryTarget.parse("telegram:-100123:42")
await router._deliver_to_platform(target, "hello", metadata=None)
assert adapter.calls[0]["metadata"] == {"thread_id": "42"}
class FailingAdapter:
async def send(self, chat_id, content, metadata=None):
return SendResult(success=False, error="route failed", retryable=False)
@pytest.mark.asyncio
async def test_platform_send_failure_raises_for_delivery_result(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: FailingAdapter()})
target = DeliveryTarget.parse("telegram:722341991:32344")
with pytest.raises(RuntimeError, match="route failed"):
await router._deliver_to_platform(target, "hello", metadata=None)

View File

@ -388,7 +388,7 @@ async def test_send_retries_without_thread_on_thread_not_found():
adapter._bot = SimpleNamespace(send_message=mock_send_message) adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send( result = await adapter.send(
chat_id="123", chat_id="-100123",
content="test message", content="test message",
metadata={"thread_id": "99999"}, metadata={"thread_id": "99999"},
) )
@ -716,16 +716,14 @@ async def test_send_dm_topic_fallback_without_anchor_does_not_crash():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_dm_topic_reply_not_found_retry_drops_thread_id(): async def test_send_dm_topic_reply_not_found_fails_closed():
"""If Telegram deletes the reply anchor, private-topic retry must drop thread id too.""" """If Telegram deletes the reply anchor, private-topic sends must not fall back elsewhere."""
adapter = _make_adapter() adapter = _make_adapter()
call_log = [] call_log = []
async def mock_send_message(**kwargs): async def mock_send_message(**kwargs):
call_log.append(dict(kwargs)) call_log.append(dict(kwargs))
if len(call_log) == 1: raise FakeBadRequest("Message to be replied not found")
raise FakeBadRequest("Message to be replied not found")
return SimpleNamespace(message_id=781)
adapter._bot = SimpleNamespace(send_message=mock_send_message) adapter._bot = SimpleNamespace(send_message=mock_send_message)
@ -739,12 +737,11 @@ async def test_send_dm_topic_reply_not_found_retry_drops_thread_id():
}, },
) )
assert result.success is True assert result.success is False
assert result.retryable is False
assert call_log[0]["reply_to_message_id"] == 462 assert call_log[0]["reply_to_message_id"] == 462
assert call_log[0]["message_thread_id"] == 20197 assert call_log[0]["message_thread_id"] == 20197
assert call_log[1]["reply_to_message_id"] is None assert len(call_log) == 1
assert "message_thread_id" not in call_log[1]
assert "direct_messages_topic_id" not in call_log[1]
@pytest.mark.asyncio @pytest.mark.asyncio
@ -1085,7 +1082,7 @@ async def test_send_raises_on_other_bad_request():
adapter._bot = SimpleNamespace(send_message=mock_send_message) adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send( result = await adapter.send(
chat_id="123", chat_id="-100123",
content="test message", content="test message",
metadata={"thread_id": "99999"}, metadata={"thread_id": "99999"},
) )
@ -1246,7 +1243,7 @@ async def test_thread_fallback_only_fires_once():
# Send a long message that gets split into chunks # Send a long message that gets split into chunks
long_msg = "A" * 5000 # Exceeds Telegram's 4096 limit long_msg = "A" * 5000 # Exceeds Telegram's 4096 limit
result = await adapter.send( result = await adapter.send(
chat_id="123", chat_id="-100123",
content=long_msg, content=long_msg,
metadata={"thread_id": "99999"}, metadata={"thread_id": "99999"},
) )