单节点流式输出
本例展示如何使用 WorkFlowManager.startStreaming()
对单个 CodeNode
进行流式执行与观测。
要点:只调用一次
setStartNode
,并使用doOnNext/onError/onComplete
做运行期打点。
java
import com.ai.agents.orchestrator.workflow.WorkFlowManager;
import com.ai.agents.orchestrator.node.CodeNode;
import reactor.core.publisher.Flux;
public class Demo {
public static void main(String[] args) {
WorkFlowManager<String> manager = WorkFlowManager.<String>builder().build();
CodeNode<String> start = CodeNode.<String>builder()
.inType(String.class)
.outType(String.class)
.code(in -> "hello " + in)
.build("world");
// 仅设置一次起始节点
manager.setStartNode(start);
Flux<Object> flux = manager.startStreaming()
.doOnSubscribe(s -> System.out.println("[SUB] start"))
.doOnNext(o -> System.out.println("[NEXT] " + o))
.doOnError(e -> System.out.println("[ERR] " + e))
.doOnComplete(() -> System.out.println("[DONE]"));
// 触发执行(仅订阅后才会运行)
flux.blockLast();
// 结果池中保存了每个节点的“最后一个元素”
System.out.println("pool = " + manager.getResultPool());
}
}
常见问题
- 看到多个
onNext
?流式执行会“边执行边发射”,属于正常现象。 - 想只关心完成:使用
StepVerifier.create(flux).thenConsumeWhile(o -> true).verifyComplete();