Skip to content

流式执行(Streaming)

Easy-Agents 支持以 Reactor Flux 为基础的流式执行:节点边执行边发射事件,便于在线观测、日志跟踪与用户实时反馈。

入口 API

  • WorkFlowManager.startStreaming():返回 Flux<Object>(或键值对事件,具体以实现为准)
  • 订阅即执行:未订阅不会触发工作流
java
Flux<Object> flux = manager.startStreaming()
    .doOnSubscribe(s -> log.info("subscribe"))
    .doOnNext(e -> log.info("NEXT: {}", e))
    .doOnError(ex -> log.error("ERR", ex))
    .doOnComplete(() -> log.info("DONE"));

flux.blockLast();

事件来源

  • CodeNode.executeStreaming():可多次 onNext
  • AIChatNode.executeStreaming():模型增量输出逐步发射
  • 管理器将节点事件多播(Sinks),下游可同时观测

结果池与 Streaming 的关系

  • 结果池保存“每个节点的最后一个元素”作为聚合结果
  • Streaming 过程中可能会多次发射,但最终仅最后一次写入结果池

最佳实践

  • 仅调用一次 manager.setStartNode(start)
  • 使用 doOnNext/OnError/OnComplete 打点,生产环境也建议保留关键日志
  • AI 输出具有非确定性,测试中放宽对具体 onNext 内容的断言,改为只校验完成

常见陷阱

  • 未订阅:startStreaming() 后不订阅,工作流不会执行
  • 背压:默认简单场景无需关心,若处理高吞吐可基于 Reactor 策略调整
  • 串流与阻塞混用:建议在一个用例中坚持一种模式,避免状态混乱

Released under the MIT License.