工作流与节点
本文介绍 easy-agents 的核心抽象:WorkFlowManager 与 Node。
WorkFlowManager
- 负责组装、校验并执行工作流。
- 提供两种运行模式:
startBlocking():阻塞执行,返回结果池(每个节点的聚合结果)。startStreaming():流式执行,返回 Flux 事件流,边执行边发射。
- 结果池(
Map<UUID, NodeResult>):记录每个节点的最终结果(通常为该节点流的“最后一条”)。 - 路由与并发:
- 多父节点阻塞:只有当所有父节点都完成,且满足至少一个父节点放行规则时,子节点才执行。
- 内部使用
parentsLeft与allowedByAnyParent进行调度。
Node 抽象
- 基类:
com.ai.agents.orchestrator.node.Node<IN> - 关键字段:
inType/outType:输入/输出类型(输出使用Class<?>表示)。input:输入数据;也可通过inputResultId从结果池读取上游节点结果。
- 两种执行方式:
executeBlocking():返回最终结果。executeStreaming():返回 Flux,适合需要逐步产出内容的场景。
输入来源的两种方式
- 直接输入:
builder().build(input)在构建时给定常量输入。 - 结果池引用:
builder().build(upstreamNodeId)运行时从结果池读取上游“最后一个值”。
建议:在 DAG 场景优先采用“结果池引用”,使接线清晰且便于复用。
常用节点
CodeNode<IN>:用一段 Java 代码(Lambda)处理输入,适合快速拼装与调试。AIChatNode<IN>:对接 Spring AI 的ChatClient,支持阻塞与流式对话。
CodeNode 示例
java
CodeNode<String> start = CodeNode.<String>builder()
.inType(String.class)
.outType(String.class)
.code(in -> "Hello " + in)
.build("World");AIChatNode 串联示例
java
EasyTree.TreeNode root = manager.setStartNode(start);
AIChatNode<String> ai = AIChatNode.<String>builder()
.chatClientRequestSpec(chatClient.prompt().system("你是助手"))
.prompt(input -> List.of(new UserMessage(input)))
.outType(String.class)
.build(root.getId()); // 从结果池读取 start 的聚合结果
root.addChild(ai);结果复用(通过节点 ID)
- 任何节点都可以通过
build(upstreamNodeId)的方式,将上游节点的“聚合结果”作为自己的输入:- 使用
manager.setStartNode(startNode)拿到rootNode引用。 - 构造下游节点时传入
rootNode.getId()。 - 通过
rootNode.addChild(child)正确接线。
- 使用
- 注意:一个起始节点只调用一次
setStartNode,避免根节点 UUID 被替换导致结果池找不到上游结果。
路由(与 RouteOption 配合)
- 在接线时可指定路由条件:
root.addChild(child, routeOption)。 RouteOption通过when/and/or组合条件,运行期以resultPool判定放行。
java
RouteOption onlyIfOk = RouteOption
.when(pool -> Objects.equals(pool.get(root.getId()).getValue(), "OK"))
.build();
root.addChild(child, onlyIfOk);调试建议
- 流式执行时:在测试或应用中对返回的 Flux 增加
doOnNext、doOnError、doOnComplete打点。 - 结果池检查:工作流完成后,打印
manager.getResultPool()观察每个节点的聚合结果。
