架构概览
下图抽象展示 Easy-Agents 的核心:
- WorkFlowManager(工作流协调器)
- Node(节点抽象,包含
CodeNode
、AIChatNode
等) - 结果池(每个节点的最后一个输出,聚合为 Map<UUID, NodeResult>)
- 流式执行(Reactor Flux + Sinks 多播)
- 并发/路由控制(多父节点阻塞 + 任一父允许放行)
关键职责
- WorkFlowManager
- 接收起始节点
setStartNode()
(仅一次) startBlocking()
:阻塞执行,返回完整结果池startStreaming()
:流式执行,返回事件 Flux(订阅即执行)- 内部维护
parentsLeft
、allowedByAnyParent
控制多父节点的放行
- 接收起始节点
- Node
executeBlocking()
与executeStreaming()
两种执行通道- 可通过
inputResultId
从结果池复用上游节点的聚合结果
结果池与事件流
- 事件流(Streaming)会发射每个节点产生的中间值,适合做在线观测与日志跟踪。
- 结果池记录每个节点的“最后一个”输出,适合执行结束后的汇总与下游依赖输入。
为什么仅调用一次 setStartNode
- 多次调用会更换根节点 UUID,导致子节点使用的旧 UUID 在结果池中查不到对应条目,引发 NPE。
- 正确做法:
EasyTree.TreeNode root = manager.setStartNode(start)
- 下游节点
build(root.getId())
root.addChild(child)
接线
与 Spring AI 的集成
AIChatNode
通过ChatClientRequestSpec
与prompt(input -> List<Message>)
将上游文本转为对话请求- 可流式消费大模型输出,用于对话、总结、代码生成等场景
典型使用场景
- 数据预处理(
CodeNode
)→ 文本生成/洞察(AIChatNode
)→ 结果持久化(CodeNode
) - 测试/调试时使用 Streaming 打点,生产时以结果池为下游任务输入