Skip to content

单节点流式输出

本例展示如何使用 WorkFlowManager.startStreaming() 对单个 CodeNode 进行流式执行与观测。

要点:只调用一次 setStartNode,并使用 doOnNext/onError/onComplete 做运行期打点。

java
import com.ai.agents.orchestrator.workflow.WorkFlowManager;
import com.ai.agents.orchestrator.node.CodeNode;
import reactor.core.publisher.Flux;

public class Demo {
    public static void main(String[] args) {
        WorkFlowManager<String> manager = WorkFlowManager.<String>builder().build();

        CodeNode<String> start = CodeNode.<String>builder()
                .inType(String.class)
                .outType(String.class)
                .code(in -> "hello " + in)
                .build("world");

        // 仅设置一次起始节点
        manager.setStartNode(start);

        Flux<Object> flux = manager.startStreaming()
                .doOnSubscribe(s -> System.out.println("[SUB] start"))
                .doOnNext(o -> System.out.println("[NEXT] " + o))
                .doOnError(e -> System.out.println("[ERR] " + e))
                .doOnComplete(() -> System.out.println("[DONE]"));

        // 触发执行(仅订阅后才会运行)
        flux.blockLast();

        // 结果池中保存了每个节点的“最后一个元素”
        System.out.println("pool = " + manager.getResultPool());
    }
}

常见问题

  • 看到多个 onNext?流式执行会“边执行边发射”,属于正常现象。
  • 想只关心完成:使用 StepVerifier.create(flux).thenConsumeWhile(o -> true).verifyComplete();

Released under the MIT License.