工作流与节点
本文介绍 easy-agents 的核心抽象:WorkFlowManager
与 Node
。
WorkFlowManager
- 负责组装、校验并执行工作流。
- 提供两种运行模式:
startBlocking()
:阻塞执行,返回结果池(每个节点的聚合结果)。startStreaming()
:流式执行,返回 Flux 事件流,边执行边发射。
- 结果池(
Map<UUID, NodeResult>
):记录每个节点的最终结果(通常为该节点流的“最后一条”)。 - 路由与并发:
- 多父节点阻塞:只有当所有父节点都完成,且满足至少一个父节点放行规则时,子节点才执行。
- 内部使用
parentsLeft
与allowedByAnyParent
进行调度。
Node 抽象
- 基类:
com.ai.agents.orchestrator.node.Node<IN>
- 关键字段:
inType
/outType
:输入/输出类型(输出使用Class<?>
表示)。input
:输入数据;也可通过inputResultId
从结果池读取上游节点结果。
- 两种执行方式:
executeBlocking()
:返回最终结果。executeStreaming()
:返回 Flux,适合需要逐步产出内容的场景。
输入来源的两种方式
- 直接输入:
builder().build(input)
在构建时给定常量输入。 - 结果池引用:
builder().build(upstreamNodeId)
运行时从结果池读取上游“最后一个值”。
建议:在 DAG 场景优先采用“结果池引用”,使接线清晰且便于复用。
常用节点
CodeNode<IN>
:用一段 Java 代码(Lambda)处理输入,适合快速拼装与调试。AIChatNode<IN>
:对接 Spring AI 的ChatClient
,支持阻塞与流式对话。
CodeNode 示例
java
CodeNode<String> start = CodeNode.<String>builder()
.inType(String.class)
.outType(String.class)
.code(in -> "Hello " + in)
.build("World");
AIChatNode 串联示例
java
EasyTree.TreeNode root = manager.setStartNode(start);
AIChatNode<String> ai = AIChatNode.<String>builder()
.chatClientRequestSpec(chatClient.prompt().system("你是助手"))
.prompt(input -> List.of(new UserMessage(input)))
.outType(String.class)
.build(root.getId()); // 从结果池读取 start 的聚合结果
root.addChild(ai);
结果复用(通过节点 ID)
- 任何节点都可以通过
build(upstreamNodeId)
的方式,将上游节点的“聚合结果”作为自己的输入:- 使用
manager.setStartNode(startNode)
拿到rootNode
引用。 - 构造下游节点时传入
rootNode.getId()
。 - 通过
rootNode.addChild(child)
正确接线。
- 使用
- 注意:一个起始节点只调用一次
setStartNode
,避免根节点 UUID 被替换导致结果池找不到上游结果。
路由(与 RouteOption 配合)
- 在接线时可指定路由条件:
root.addChild(child, routeOption)
。 RouteOption
通过when/and/or
组合条件,运行期以resultPool
判定放行。
java
RouteOption onlyIfOk = RouteOption
.when(pool -> Objects.equals(pool.get(root.getId()).getValue(), "OK"))
.build();
root.addChild(child, onlyIfOk);
调试建议
- 流式执行时:在测试或应用中对返回的 Flux 增加
doOnNext
、doOnError
、doOnComplete
打点。 - 结果池检查:工作流完成后,打印
manager.getResultPool()
观察每个节点的聚合结果。