fix: auto-create Telegram DM topics for delivery
(cherry picked from commit 5cde0614e894c73400bc7e4fe9df1fe523a2e547)
This commit is contained in:
parent
96c71d8c46
commit
dcd504cea4
@ -34,6 +34,16 @@ def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _looks_like_int(value: Optional[str]) -> bool:
|
||||||
|
if value is None:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
int(value)
|
||||||
|
return True
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DeliveryTarget:
|
class DeliveryTarget:
|
||||||
"""
|
"""
|
||||||
@ -263,30 +273,50 @@ class DeliveryRouter:
|
|||||||
"direct_messages_topic_id" in send_metadata
|
"direct_messages_topic_id" in send_metadata
|
||||||
or "telegram_direct_messages_topic_id" in send_metadata
|
or "telegram_direct_messages_topic_id" in send_metadata
|
||||||
)
|
)
|
||||||
if (
|
target_thread_id = target.thread_id
|
||||||
|
is_named_telegram_private_topic = (
|
||||||
|
target.platform == Platform.TELEGRAM
|
||||||
|
and _looks_like_telegram_private_chat_id(target.chat_id)
|
||||||
|
and not _looks_like_int(target_thread_id)
|
||||||
|
and "thread_id" not in send_metadata
|
||||||
|
and "message_thread_id" not in send_metadata
|
||||||
|
and not has_explicit_direct_topic
|
||||||
|
)
|
||||||
|
if is_named_telegram_private_topic:
|
||||||
|
ensure_dm_topic = getattr(adapter, "ensure_dm_topic", None)
|
||||||
|
if ensure_dm_topic is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Telegram adapter cannot create named private DM topics"
|
||||||
|
)
|
||||||
|
created_thread_id = await ensure_dm_topic(target.chat_id, target_thread_id)
|
||||||
|
if not created_thread_id:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Failed to create Telegram private DM topic '{target_thread_id}'"
|
||||||
|
)
|
||||||
|
target_thread_id = str(created_thread_id)
|
||||||
|
send_metadata["thread_id"] = target_thread_id
|
||||||
|
send_metadata["telegram_dm_topic_created_for_send"] = True
|
||||||
|
elif (
|
||||||
target.platform == Platform.TELEGRAM
|
target.platform == Platform.TELEGRAM
|
||||||
and _looks_like_telegram_private_chat_id(target.chat_id)
|
and _looks_like_telegram_private_chat_id(target.chat_id)
|
||||||
and "thread_id" not in send_metadata
|
and "thread_id" not in send_metadata
|
||||||
and "message_thread_id" not in send_metadata
|
and "message_thread_id" not in send_metadata
|
||||||
and not has_explicit_direct_topic
|
and not has_explicit_direct_topic
|
||||||
):
|
):
|
||||||
# Telegram has two similar-but-not-equivalent private topic modes:
|
# Legacy private topic/thread ids that were not created by this
|
||||||
# true Bot API Direct Messages topics use direct_messages_topic_id,
|
# send path may still need a reply anchor to stay visible in the
|
||||||
# while Hermes-created private DM lanes only route reliably with
|
# requested lane. Named targets are created above via
|
||||||
# message_thread_id plus a reply anchor to a message in that lane.
|
# createForumTopic and can use message_thread_id directly.
|
||||||
# DeliveryRouter often handles proactive/cron sends, so an anchor
|
|
||||||
# may not exist. Refuse the send rather than reporting success for
|
|
||||||
# a message that lands in General/All Messages or is invisible.
|
|
||||||
reply_anchor = send_metadata.get("telegram_reply_to_message_id")
|
reply_anchor = send_metadata.get("telegram_reply_to_message_id")
|
||||||
if reply_anchor is None:
|
if reply_anchor is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"Telegram private DM topic delivery requires telegram_reply_to_message_id; "
|
"Telegram private DM topic delivery requires telegram_reply_to_message_id; "
|
||||||
"send to the bare chat or provide a reply anchor"
|
"send to the bare chat or provide a reply anchor"
|
||||||
)
|
)
|
||||||
send_metadata["thread_id"] = target.thread_id
|
send_metadata["thread_id"] = target_thread_id
|
||||||
send_metadata["telegram_dm_topic_reply_fallback"] = True
|
send_metadata["telegram_dm_topic_reply_fallback"] = True
|
||||||
elif "thread_id" not in send_metadata and "message_thread_id" not in send_metadata and not has_explicit_direct_topic:
|
elif "thread_id" not in send_metadata and "message_thread_id" not in send_metadata and not has_explicit_direct_topic:
|
||||||
send_metadata["thread_id"] = target.thread_id
|
send_metadata["thread_id"] = target_thread_id
|
||||||
result = await adapter.send(target.chat_id, content, metadata=send_metadata or None)
|
result = await adapter.send(target.chat_id, content, metadata=send_metadata or None)
|
||||||
if getattr(result, "success", True) is False:
|
if getattr(result, "success", True) is False:
|
||||||
raise RuntimeError(getattr(result, "error", None) or f"{target.platform.value} delivery failed")
|
raise RuntimeError(getattr(result, "error", None) or f"{target.platform.value} delivery failed")
|
||||||
|
|||||||
@ -584,6 +584,8 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
) -> bool:
|
) -> bool:
|
||||||
if cls._metadata_direct_messages_topic_id(metadata) is not None:
|
if cls._metadata_direct_messages_topic_id(metadata) is not None:
|
||||||
return False
|
return False
|
||||||
|
if metadata and metadata.get("telegram_dm_topic_created_for_send"):
|
||||||
|
return False
|
||||||
return bool(
|
return bool(
|
||||||
thread_id
|
thread_id
|
||||||
and (
|
and (
|
||||||
@ -1190,6 +1192,59 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
thread_id = await self._create_dm_topic(chat_id_int, name=name)
|
thread_id = await self._create_dm_topic(chat_id_int, name=name)
|
||||||
return str(thread_id) if thread_id else None
|
return str(thread_id) if thread_id else None
|
||||||
|
|
||||||
|
async def ensure_dm_topic(self, chat_id: str, topic_name: str) -> Optional[str]:
|
||||||
|
"""Return a private DM topic thread id, creating and persisting it if needed."""
|
||||||
|
name = str(topic_name or "").strip()
|
||||||
|
if not name:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
chat_id_int = int(chat_id)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
cache_key = f"{chat_id_int}:{name}"
|
||||||
|
cached = self._dm_topics.get(cache_key)
|
||||||
|
if cached:
|
||||||
|
return str(cached)
|
||||||
|
|
||||||
|
topic_conf: Optional[Dict[str, Any]] = None
|
||||||
|
chat_entry: Optional[Dict[str, Any]] = None
|
||||||
|
for entry in self._dm_topics_config:
|
||||||
|
if str(entry.get("chat_id")) != str(chat_id_int):
|
||||||
|
continue
|
||||||
|
chat_entry = entry
|
||||||
|
for candidate in entry.get("topics", []):
|
||||||
|
if candidate.get("name") == name:
|
||||||
|
topic_conf = candidate
|
||||||
|
break
|
||||||
|
break
|
||||||
|
|
||||||
|
if topic_conf and topic_conf.get("thread_id"):
|
||||||
|
thread_id = int(topic_conf["thread_id"])
|
||||||
|
self._dm_topics[cache_key] = thread_id
|
||||||
|
return str(thread_id)
|
||||||
|
|
||||||
|
if chat_entry is None:
|
||||||
|
chat_entry = {"chat_id": chat_id_int, "topics": []}
|
||||||
|
self._dm_topics_config.append(chat_entry)
|
||||||
|
if topic_conf is None:
|
||||||
|
topic_conf = {"name": name}
|
||||||
|
chat_entry.setdefault("topics", []).append(topic_conf)
|
||||||
|
|
||||||
|
thread_id = await self._create_dm_topic(
|
||||||
|
chat_id_int,
|
||||||
|
name=name,
|
||||||
|
icon_color=topic_conf.get("icon_color"),
|
||||||
|
icon_custom_emoji_id=topic_conf.get("icon_custom_emoji_id"),
|
||||||
|
)
|
||||||
|
if not thread_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
topic_conf["thread_id"] = thread_id
|
||||||
|
self._dm_topics[cache_key] = int(thread_id)
|
||||||
|
self._persist_dm_topic_thread_id(chat_id_int, name, int(thread_id))
|
||||||
|
return str(thread_id)
|
||||||
|
|
||||||
async def rename_dm_topic(
|
async def rename_dm_topic(
|
||||||
self,
|
self,
|
||||||
chat_id: int,
|
chat_id: int,
|
||||||
@ -1226,25 +1281,43 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
with open(config_path, "r", encoding="utf-8") as f:
|
with open(config_path, "r", encoding="utf-8") as f:
|
||||||
config = _yaml.safe_load(f) or {}
|
config = _yaml.safe_load(f) or {}
|
||||||
|
|
||||||
# Navigate to platforms.telegram.extra.dm_topics
|
# Navigate to platforms.telegram.extra.dm_topics, creating the path
|
||||||
dm_topics = (
|
# when a named delivery target asks us to create a topic that was
|
||||||
config.get("platforms", {})
|
# not predeclared in config.yaml.
|
||||||
.get("telegram", {})
|
platforms = config.setdefault("platforms", {})
|
||||||
.get("extra", {})
|
telegram_config = platforms.setdefault("telegram", {})
|
||||||
.get("dm_topics", [])
|
extra = telegram_config.setdefault("extra", {})
|
||||||
)
|
dm_topics = extra.setdefault("dm_topics", [])
|
||||||
if not dm_topics:
|
|
||||||
return
|
|
||||||
|
|
||||||
changed = False
|
changed = False
|
||||||
|
matching_chat_entry = None
|
||||||
for chat_entry in dm_topics:
|
for chat_entry in dm_topics:
|
||||||
if int(chat_entry.get("chat_id", 0)) != int(chat_id):
|
try:
|
||||||
|
chat_matches = int(chat_entry.get("chat_id", 0)) == int(chat_id)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
chat_matches = False
|
||||||
|
if not chat_matches:
|
||||||
continue
|
continue
|
||||||
for t in chat_entry.get("topics", []):
|
matching_chat_entry = chat_entry
|
||||||
if t.get("name") == topic_name and not t.get("thread_id"):
|
for t in chat_entry.setdefault("topics", []):
|
||||||
t["thread_id"] = thread_id
|
if t.get("name") == topic_name:
|
||||||
changed = True
|
if not t.get("thread_id"):
|
||||||
|
t["thread_id"] = thread_id
|
||||||
|
changed = True
|
||||||
break
|
break
|
||||||
|
else:
|
||||||
|
chat_entry.setdefault("topics", []).append(
|
||||||
|
{"name": topic_name, "thread_id": thread_id}
|
||||||
|
)
|
||||||
|
changed = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if matching_chat_entry is None:
|
||||||
|
dm_topics.append({
|
||||||
|
"chat_id": chat_id,
|
||||||
|
"topics": [{"name": topic_name, "thread_id": thread_id}],
|
||||||
|
})
|
||||||
|
changed = True
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
fd, tmp_path = tempfile.mkstemp(
|
fd, tmp_path = tempfile.mkstemp(
|
||||||
|
|||||||
@ -128,11 +128,16 @@ class TestPlatformNameCaseInsensitivity:
|
|||||||
class RecordingAdapter:
|
class RecordingAdapter:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.calls = []
|
self.calls = []
|
||||||
|
self.ensure_dm_topic_calls = []
|
||||||
|
|
||||||
async def send(self, chat_id, content, metadata=None):
|
async def send(self, chat_id, content, metadata=None):
|
||||||
self.calls.append({"chat_id": chat_id, "content": content, "metadata": metadata})
|
self.calls.append({"chat_id": chat_id, "content": content, "metadata": metadata})
|
||||||
return {"success": True}
|
return {"success": True}
|
||||||
|
|
||||||
|
async def ensure_dm_topic(self, chat_id, topic_name):
|
||||||
|
self.ensure_dm_topic_calls.append({"chat_id": chat_id, "topic_name": topic_name})
|
||||||
|
return "38049"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path, monkeypatch):
|
async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path, monkeypatch):
|
||||||
@ -147,6 +152,30 @@ async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path,
|
|||||||
assert adapter.calls == []
|
assert adapter.calls == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_named_telegram_private_topic_is_created_before_delivery(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
|
||||||
|
adapter = RecordingAdapter()
|
||||||
|
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
|
||||||
|
target = DeliveryTarget.parse("telegram:722341991:Hermes API Test")
|
||||||
|
|
||||||
|
await router._deliver_to_platform(target, "hello", metadata=None)
|
||||||
|
|
||||||
|
assert adapter.ensure_dm_topic_calls == [
|
||||||
|
{"chat_id": "722341991", "topic_name": "Hermes API Test"}
|
||||||
|
]
|
||||||
|
assert adapter.calls == [
|
||||||
|
{
|
||||||
|
"chat_id": "722341991",
|
||||||
|
"content": "hello",
|
||||||
|
"metadata": {
|
||||||
|
"thread_id": "38049",
|
||||||
|
"telegram_dm_topic_created_for_send": True,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_explicit_telegram_private_thread_uses_reply_fallback_with_anchor(tmp_path, monkeypatch):
|
async def test_explicit_telegram_private_thread_uses_reply_fallback_with_anchor(tmp_path, monkeypatch):
|
||||||
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
|
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
|
||||||
|
|||||||
@ -205,6 +205,28 @@ async def test_create_dm_topic_returns_none_without_bot():
|
|||||||
assert result is None
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_ensure_dm_topic_creates_on_demand_and_persists():
|
||||||
|
"""Named delivery targets should create missing private DM topics on demand."""
|
||||||
|
adapter = _make_adapter()
|
||||||
|
adapter._bot = AsyncMock()
|
||||||
|
adapter._bot.create_forum_topic.return_value = SimpleNamespace(message_thread_id=444)
|
||||||
|
adapter._persist_dm_topic_thread_id = MagicMock()
|
||||||
|
|
||||||
|
result = await adapter.ensure_dm_topic("111", "On Demand")
|
||||||
|
|
||||||
|
assert result == "444"
|
||||||
|
adapter._bot.create_forum_topic.assert_called_once_with(
|
||||||
|
chat_id=111,
|
||||||
|
name="On Demand",
|
||||||
|
)
|
||||||
|
assert adapter._dm_topics["111:On Demand"] == 444
|
||||||
|
assert adapter._dm_topics_config == [
|
||||||
|
{"chat_id": 111, "topics": [{"name": "On Demand", "thread_id": 444}]}
|
||||||
|
]
|
||||||
|
adapter._persist_dm_topic_thread_id.assert_called_once_with(111, "On Demand", 444)
|
||||||
|
|
||||||
|
|
||||||
# ── _persist_dm_topic_thread_id ──
|
# ── _persist_dm_topic_thread_id ──
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -597,6 +597,33 @@ async def test_send_uses_reply_fallback_for_hermes_dm_topics():
|
|||||||
assert "direct_messages_topic_id" not in call_log[0]
|
assert "direct_messages_topic_id" not in call_log[0]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_created_private_topic_uses_message_thread_without_anchor():
|
||||||
|
"""Topics created via createForumTopic are addressable by message_thread_id directly."""
|
||||||
|
adapter = _make_adapter()
|
||||||
|
call_log = []
|
||||||
|
|
||||||
|
async def mock_send_message(**kwargs):
|
||||||
|
call_log.append(kwargs)
|
||||||
|
return SimpleNamespace(message_id=781)
|
||||||
|
|
||||||
|
adapter._bot = SimpleNamespace(send_message=mock_send_message)
|
||||||
|
|
||||||
|
result = await adapter.send(
|
||||||
|
chat_id="123",
|
||||||
|
content="created topic message",
|
||||||
|
metadata={
|
||||||
|
"thread_id": "38049",
|
||||||
|
"telegram_dm_topic_created_for_send": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.success is True
|
||||||
|
assert call_log[0]["reply_to_message_id"] is None
|
||||||
|
assert call_log[0]["message_thread_id"] == 38049
|
||||||
|
assert "direct_messages_topic_id" not in call_log[0]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_send_uses_metadata_reply_fallback_for_streaming_dm_topics():
|
async def test_send_uses_metadata_reply_fallback_for_streaming_dm_topics():
|
||||||
"""Metadata-only sends still stay in Hermes-created Telegram DM topics."""
|
"""Metadata-only sends still stay in Hermes-created Telegram DM topics."""
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user