Skip to content

架构概览

下图抽象展示 Easy-Agents 的核心:

  • WorkFlowManager(工作流协调器)
  • Node(节点抽象,包含 CodeNodeAIChatNode 等)
  • 结果池(每个节点的最后一个输出,聚合为 Map<UUID, NodeResult>)
  • 流式执行(Reactor Flux + Sinks 多播)
  • 并发/路由控制(多父节点阻塞 + 任一父允许放行)

关键职责

  • WorkFlowManager
    • 接收起始节点 setStartNode()(仅一次)
    • startBlocking():阻塞执行,返回完整结果池
    • startStreaming():流式执行,返回事件 Flux(订阅即执行)
    • 内部维护 parentsLeftallowedByAnyParent 控制多父节点的放行
  • Node
    • executeBlocking()executeStreaming() 两种执行通道
    • 可通过 inputResultId 从结果池复用上游节点的聚合结果

结果池与事件流

  • 事件流(Streaming)会发射每个节点产生的中间值,适合做在线观测与日志跟踪。
  • 结果池记录每个节点的“最后一个”输出,适合执行结束后的汇总与下游依赖输入。

为什么仅调用一次 setStartNode

  • 多次调用会更换根节点 UUID,导致子节点使用的旧 UUID 在结果池中查不到对应条目,引发 NPE。
  • 正确做法:
    1. EasyTree.TreeNode root = manager.setStartNode(start)
    2. 下游节点 build(root.getId())
    3. root.addChild(child) 接线

与 Spring AI 的集成

  • AIChatNode 通过 ChatClientRequestSpecprompt(input -> List<Message>) 将上游文本转为对话请求
  • 可流式消费大模型输出,用于对话、总结、代码生成等场景

典型使用场景

  • 数据预处理(CodeNode)→ 文本生成/洞察(AIChatNode)→ 结果持久化(CodeNode
  • 测试/调试时使用 Streaming 打点,生产时以结果池为下游任务输入

Released under the MIT License.