长任务Agent架构
核对日期:2026-05-09。
1. 定义与边界
长任务 Agent 架构支持任务跨分钟、小时甚至天执行,并能在进程重启、工具失败、人类审批、额度限制或用户中断后恢复。它的核心不是“上下文更长”,而是 durable execution、checkpoint、幂等副作用和可恢复状态。
长任务 Agent 可能采用单 Agent、Planner-Executor、Supervisor-Worker 或 workflow 混合架构;本文件关注跨时间执行能力。
2. 为什么重要
真实任务经常不是一次模型调用能完成:代码修改要读仓库、改文件、跑测试;研究报告要多轮检索;企业审批要等人;数据处理要等外部系统。没有长任务架构,系统会在超时、重启或中断时丢失进度,或者重复执行副作用。
3. 核心机制
可恢复状态示例:
{
"run_id": "run_001",
"status": "paused_for_approval",
"current_node": "commit_refund",
"plan_version": 4,
"completed_steps": ["s1", "s2"],
"pending_action": {
"type": "refund",
"amount": 120,
"idempotency_key": "refund:order_9:v1"
},
"checkpoint_id": "ckpt_789"
}
4. 架构模式
| 模式 | 机制 | 适用 |
|---|---|---|
| Checkpointed loop | 每轮保存状态和 observation | 中等复杂 Agent |
| Durable workflow | Temporal/Step Functions/LangGraph 等保存执行历史 | 长时间、人类在环、高可靠 |
| Event-sourced Agent | 所有状态由事件重放得到 | 强审计、可回放 |
| Saga Agent | 每个副作用有补偿动作 | 多系统写操作 |
5. 工程实现
def run_long_task(run_id):
state = checkpoint_store.load(run_id)
while not state.done:
step = state.next_step()
if step.requires_human and not state.has_approval(step.id):
checkpoint_store.save(state.pause(step))
return {"status": "paused", "resume_token": state.resume_token}
result = execute_idempotent(step, key=step.idempotency_key)
state.apply(result)
checkpoint_store.save(state)
return finalize(state)
设计要求:
- checkpoint 必须包含足够恢复的信息,而不是只保存聊天记录。
- 副作用工具必须支持幂等键。
- 中断恢复后不要重新执行已成功的不可逆动作。
- 计划、工具版本、提示词版本进入 run metadata。
- 对节点内中断,恢复时要明确是“从节点开始重跑”还是“从下一步继续”。
6. 生产实践
- 对每个步骤声明
retry_policy、timeout、compensation。 - 将长任务拆成可验证里程碑,避免单个步骤过大。
- 用户可查看进度、取消任务、审批 pending action。
- 对模型调用、工具调用和人工审批使用同一个 trace id。
- 长任务恢复时先做环境校验:工具版本、权限、外部资源是否仍存在。
7. 常见反模式
- 只把完整对话塞回模型,期望模型自己恢复。
- 恢复时重复执行已完成写操作。
- checkpoint 存在内存或本地临时文件,进程重启即丢失。
- 人类审批没有绑定具体参数,恢复后参数被替换。
- 长任务无取消和超时策略。
8. 评测方法
- Resume Correctness:从任意 checkpoint 恢复是否正确。
- Interruption Test:在每个关键节点强制中断后恢复。
- Idempotency Test:重复执行同一步是否不产生重复副作用。
- Long-run Cost:跨小时任务的 token、工具、存储成本。
- Human Approval Audit:审批参数与最终执行参数是否一致。
9. 安全与治理
- resume token 必须短期有效、不可预测、绑定用户和 run。
- checkpoint 中敏感数据加密,并设置保留期限。
- 恢复前重新校验用户权限,不能因旧 checkpoint 绕过新权限。
- 对外部内容跨时间污染计划保持警惕:恢复时重新标记数据来源。
- 对取消任务定义补偿或人工清理。
10. 工程手册补充
10.1 控制流、状态流、工具流
长任务 Agent 的核心能力是“可暂停、可恢复、可审计”,不是让模型一次上下文撑到底。
| 流 | 工程要求 | 关键字段 |
|---|---|---|
| 控制流 | 以 milestone 推进,允许暂停、恢复、取消、接管 | run_status、current_milestone、resume_token |
| 状态流 | checkpoint 保存事实、产物、计划版本和未完成步骤 | checkpoint_id、plan_version、artifact_refs |
| 工具流 | 长耗时工具异步化;写工具有 preview/commit 和幂等键 | job_id、idempotency_key、tool_status |
| 观测流 | 每个恢复点能解释“为什么停在这里” | pause_reason、next_action、blocked_by |
10.2 中断恢复伪代码
def resume_run(run_id):
state = store.load_latest_checkpoint(run_id)
assert state.run_status in {"paused", "running", "failed_retryable"}
if state.requires_human_approval and not state.approval_id:
return ask_for_approval(state.next_action)
for step in state.remaining_steps:
result = execute_with_timeout(step, state)
store.write_event(run_id, step.id, result)
if result.retryable:
store.checkpoint(run_id, status="failed_retryable")
return schedule_retry(run_id)
if result.needs_replan:
state = replan_from_checkpoint(state, result)
store.checkpoint(run_id, status="running")
return finalize(run_id)
失败恢复与上线清单:
- checkpoint 必须小而完整:能恢复控制状态,但不把全部对话无限塞进去。
- 外部工具状态未知时,先 reconciliation,再继续执行。
- 长任务要有预算闸门:时间、token、工具成本、重试次数、人工等待时长。
- 支持用户取消和管理员终止,终止后进入补偿或只读归档状态。
- 评测要模拟进程重启、工具超时、用户改目标、审批延迟和重复恢复。
- 安全上,恢复时重新计算权限,不复用过期授权令牌。
11. 权威资料
- LangGraph durable execution: https://docs.langchain.com/oss/python/langgraph/durable-execution
- LangGraph persistence: https://docs.langchain.com/oss/python/langgraph/persistence
- LangGraph interrupts: https://docs.langchain.com/oss/python/langgraph/interrupts
- Temporal durable execution: https://temporal.io/
- AWS Step Functions long-running workflows: https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-continue-new.html
- AWS Step Functions callback task token: https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
- OpenAI Agents SDK tracing: https://openai.github.io/openai-agents-python/tracing/