s07
Sub-Agent 编排
智能层动态拆解与并行执行
Lead Agent 动态拆解 → 子 Agent 并行执行 → 结果合并硬性并发控制避免 API 过载,超限任务静默丢弃而非排队
动态拆解与并行执行
Lead Agent 动态拆解 → 子 Agent 并行执行 → 结果合并硬性并发控制避免 API 过载,超限任务静默丢弃而非排队
单个 Agent 串行执行复杂任务效率低下——搜索、编码、审查按顺序做可能需要几分钟。DeerFlow 通过 Lead Agent 动态拆解任务,并行拉起多个子 Agent 执行,配合 SubagentLimitMiddleware 硬性并发控制(截断而非排队),在效率与稳定性之间取得平衡。
子 Agent 编排策略 — Lead Agent 的 system prompt 定义分解→委派→合成三步流程
# subagent_system prompt 核心逻辑
# 1. DECOMPOSE: 将复杂任务拆为并行子任务
# 2. DELEGATE: 用 task() 工具并行启动子 Agent
# 3. SYNTHESIZE: 收集结果整合为最终回答
# 硬性并发限制
f"⛔ HARD CONCURRENCY LIMIT: MAXIMUM {n} task CALLS PER RESPONSE"
# 超出限制的 task 调用被系统静默丢弃
# 多批次执行策略:
# Turn 1: 启动前 n 个子任务 → 等待结果
# Turn 2: 启动下一批 → 等待结果
# Final: 合成所有结果SubagentLimitMiddleware — after_model 钩子硬性截断超额的 task 调用
class SubagentLimitMiddleware(AgentMiddleware):
def __init__(self, max_concurrent: int = 3):
self.max_concurrent = _clamp_subagent_limit(max_concurrent)
def after_model(self, state, runtime):
tool_calls = last_msg.tool_calls
task_indices = [i for i, tc in enumerate(tool_calls)
if tc.get("name") == "task"]
if len(task_indices) <= self.max_concurrent:
return None
# 截断超额调用,而非排队
indices_to_drop = set(task_indices[self.max_concurrent:])
truncated = [tc for i, tc in enumerate(tool_calls)
if i not in indices_to_drop]
return {"messages": [msg.model_copy(
update={"tool_calls": truncated})]}