架构概览
下图抽象展示 Easy-Agents 的核心:
- WorkFlowManager(工作流协调器)
- Node(节点抽象,包含
CodeNode、AIChatNode等) - 结果池(每个节点的最后一个输出,聚合为 Map<UUID, NodeResult>)
- 流式执行(Reactor Flux + Sinks 多播)
- 并发/路由控制(多父节点阻塞 + 任一父允许放行)
关键职责
- WorkFlowManager
- 接收起始节点
setStartNode()(仅一次) startBlocking():阻塞执行,返回完整结果池startStreaming():流式执行,返回事件 Flux(订阅即执行)- 内部维护
parentsLeft、allowedByAnyParent控制多父节点的放行
- 接收起始节点
- Node
executeBlocking()与executeStreaming()两种执行通道- 可通过
inputResultId从结果池复用上游节点的聚合结果
结果池与事件流
- 事件流(Streaming)会发射每个节点产生的中间值,适合做在线观测与日志跟踪。
- 结果池记录每个节点的“最后一个”输出,适合执行结束后的汇总与下游依赖输入。
为什么仅调用一次 setStartNode
- 多次调用会更换根节点 UUID,导致子节点使用的旧 UUID 在结果池中查不到对应条目,引发 NPE。
- 正确做法:
EasyTree.TreeNode root = manager.setStartNode(start)- 下游节点
build(root.getId()) root.addChild(child)接线
与 Spring AI 的集成
AIChatNode通过ChatClientRequestSpec与prompt(input -> List<Message>)将上游文本转为对话请求- 可流式消费大模型输出,用于对话、总结、代码生成等场景
典型使用场景
- 数据预处理(
CodeNode)→ 文本生成/洞察(AIChatNode)→ 结果持久化(CodeNode) - 测试/调试时使用 Streaming 打点,生产时以结果池为下游任务输入
