s10
IM 通道与开发者体验
集成层即插即用的多通道集成
Telegram/Slack/飞书一行配置 + Claude Code 原生集成通道层与核心解耦,开发者可在熟悉的界面中使用 Agent
即插即用的多通道集成
Telegram/Slack/飞书一行配置 + Claude Code 原生集成通道层与核心解耦,开发者可在熟悉的界面中使用 Agent
Agent 的核心逻辑不应与接入方式耦合——Telegram 的消息格式和 Slack 完全不同,飞书又是另一套 API。DeerFlow 通过 Channel ABC + MessageBus 发布/订阅模式,将 Agent 执行逻辑与通道实现完全解耦。新增 IM 通道只需实现 start/stop/send 三个方法,核心逻辑零修改。
receive_message → process → send_response
Channel ABC — 所有通道的统一接口:start/stop/send + MessageBus 集成
class Channel(ABC):
"""所有 IM 通道的基类。
1. 接收消息 → InboundMessage → 发布到 bus
2. 订阅 outbound → 发送回平台"""
def __init__(self, name: str, bus: MessageBus, config: dict):
self.name = name
self.bus = bus
self.config = config
@abstractmethod
async def start(self) -> None: ... # 开始监听
@abstractmethod
async def stop(self) -> None: ... # 优雅停止
@abstractmethod
async def send(self, msg: OutboundMessage) -> None: ...
async def send_file(self, msg, attachment) -> bool:
return False # 默认不支持文件上传
async def _on_outbound(self, msg: OutboundMessage):
if msg.channel_name == self.name:
await self.send(msg) # 先发文本
for att in msg.attachments: # 再传文件
await self.send_file(msg, att)MessageBus — 异步发布/订阅中枢,解耦 Channel 与 Agent Dispatcher
class MessageBus:
"""异步 pub/sub 中枢:
Channel → publish_inbound → Queue → Dispatcher
Dispatcher → publish_outbound → Callbacks → Channels"""
def __init__(self):
self._inbound_queue: asyncio.Queue[InboundMessage] = asyncio.Queue()
self._outbound_listeners: list[OutboundCallback] = []
async def publish_inbound(self, msg: InboundMessage):
await self._inbound_queue.put(msg)
async def get_inbound(self) -> InboundMessage:
return await self._inbound_queue.get()
def subscribe_outbound(self, callback: OutboundCallback):
self._outbound_listeners.append(callback)
async def publish_outbound(self, msg: OutboundMessage):
for callback in self._outbound_listeners:
await callback(msg)