Skip to content

工作流与节点

本文介绍 easy-agents 的核心抽象:WorkFlowManagerNode

WorkFlowManager

  • 负责组装、校验并执行工作流。
  • 提供两种运行模式:
    • startBlocking():阻塞执行,返回结果池(每个节点的聚合结果)。
    • startStreaming():流式执行,返回 Flux 事件流,边执行边发射。
  • 结果池(Map<UUID, NodeResult>):记录每个节点的最终结果(通常为该节点流的“最后一条”)。
  • 路由与并发:
    • 多父节点阻塞:只有当所有父节点都完成,且满足至少一个父节点放行规则时,子节点才执行。
    • 内部使用 parentsLeftallowedByAnyParent 进行调度。

Node 抽象

  • 基类:com.ai.agents.orchestrator.node.Node<IN>
  • 关键字段:
    • inType/outType:输入/输出类型(输出使用 Class<?> 表示)。
    • input:输入数据;也可通过 inputResultId 从结果池读取上游节点结果。
  • 两种执行方式:
    • executeBlocking():返回最终结果。
    • executeStreaming():返回 Flux,适合需要逐步产出内容的场景。

输入来源的两种方式

  • 直接输入:builder().build(input) 在构建时给定常量输入。
  • 结果池引用:builder().build(upstreamNodeId) 运行时从结果池读取上游“最后一个值”。

建议:在 DAG 场景优先采用“结果池引用”,使接线清晰且便于复用。

常用节点

  • CodeNode<IN>:用一段 Java 代码(Lambda)处理输入,适合快速拼装与调试。
  • AIChatNode<IN>:对接 Spring AI 的 ChatClient,支持阻塞与流式对话。

CodeNode 示例

java
CodeNode<String> start = CodeNode.<String>builder()
    .inType(String.class)
    .outType(String.class)
    .code(in -> "Hello " + in)
    .build("World");

AIChatNode 串联示例

java
EasyTree.TreeNode root = manager.setStartNode(start);

AIChatNode<String> ai = AIChatNode.<String>builder()
    .chatClientRequestSpec(chatClient.prompt().system("你是助手"))
    .prompt(input -> List.of(new UserMessage(input)))
    .outType(String.class)
    .build(root.getId()); // 从结果池读取 start 的聚合结果

root.addChild(ai);

结果复用(通过节点 ID)

  • 任何节点都可以通过 build(upstreamNodeId) 的方式,将上游节点的“聚合结果”作为自己的输入:
    1. 使用 manager.setStartNode(startNode) 拿到 rootNode 引用。
    2. 构造下游节点时传入 rootNode.getId()
    3. 通过 rootNode.addChild(child) 正确接线。
  • 注意:一个起始节点只调用一次 setStartNode,避免根节点 UUID 被替换导致结果池找不到上游结果。

路由(与 RouteOption 配合)

  • 在接线时可指定路由条件:root.addChild(child, routeOption)
  • RouteOption 通过 when/and/or 组合条件,运行期以 resultPool 判定放行。
java
RouteOption onlyIfOk = RouteOption
    .when(pool -> Objects.equals(pool.get(root.getId()).getValue(), "OK"))
    .build();
root.addChild(child, onlyIfOk);

调试建议

  • 流式执行时:在测试或应用中对返回的 Flux 增加 doOnNextdoOnErrordoOnComplete 打点。
  • 结果池检查:工作流完成后,打印 manager.getResultPool() 观察每个节点的聚合结果。

Released under the MIT License.