流式执行(Streaming)
Easy-Agents 支持以 Reactor Flux 为基础的流式执行:节点边执行边发射事件,便于在线观测、日志跟踪与用户实时反馈。
入口 API
WorkFlowManager.startStreaming()
:返回Flux<Object>
(或键值对事件,具体以实现为准)- 订阅即执行:未订阅不会触发工作流
java
Flux<Object> flux = manager.startStreaming()
.doOnSubscribe(s -> log.info("subscribe"))
.doOnNext(e -> log.info("NEXT: {}", e))
.doOnError(ex -> log.error("ERR", ex))
.doOnComplete(() -> log.info("DONE"));
flux.blockLast();
事件来源
CodeNode.executeStreaming()
:可多次onNext
AIChatNode.executeStreaming()
:模型增量输出逐步发射- 管理器将节点事件多播(Sinks),下游可同时观测
结果池与 Streaming 的关系
- 结果池保存“每个节点的最后一个元素”作为聚合结果
- Streaming 过程中可能会多次发射,但最终仅最后一次写入结果池
最佳实践
- 仅调用一次
manager.setStartNode(start)
- 使用
doOnNext/OnError/OnComplete
打点,生产环境也建议保留关键日志 - AI 输出具有非确定性,测试中放宽对具体
onNext
内容的断言,改为只校验完成
常见陷阱
- 未订阅:
startStreaming()
后不订阅,工作流不会执行 - 背压:默认简单场景无需关心,若处理高吞吐可基于 Reactor 策略调整
- 串流与阻塞混用:建议在一个用例中坚持一种模式,避免状态混乱