
专栏《Java后端工程师进阶之路》Day 14 / 90 主题从零搭建流式AI聊天后端SSE协议原理 WebFlux响应式流 前端EventSource接收一、当用户盯着“转圈圈”时我们在浪费什么流式输出的本质不是炫技而是对齐用户心理预期。人眼对“有反馈的等待”容忍度极高对“死寂的等待”零容忍。今天这篇我们就把 WebFlux SSE Spring AI 流式对话接口的完整实现拆清楚。二、先搞懂 SSE比 WebSocket 更省事的“服务器单向推送”SSEServer-Sent Events是一种让服务器向浏览器单向推送文本流的标准协议。它和 WebSocket 最大的区别特性SSEWebSocket通信方向服务器→客户端单向全双工协议层基于 HTTP天然支持心跳/重连需要单独握手数据格式纯文本默认 UTF-8二进制/文本皆可适用场景流式输出、股票行情、日志推送即时聊天、游戏、协同编辑AI 流式对话只需要服务器推给客户端SSE 完全够用而且心智负担小得多。Spring WebFlux 对 SSE 的支持非常自然返回FluxServerSentEventString即可。三、代码实战一最简 SSE 接口先把“流”跑起来依赖Spring Boot 3.2.xparent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version3.2.5/version /parent dependencies !-- WebFlux -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webflux/artifactId /dependency /dependencies最简 SSE 控制器import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import java.time.Duration; RestController public class SseDemoController { /** * 每 200ms 推送一个数字模拟后端持续产生数据 */ GetMapping(value /sse/numbers, produces MediaType.TEXT_EVENT_STREAM_VALUE) public FluxServerSentEventString numberStream() { return Flux.interval(Duration.ofMillis(200)) .take(20) // 只推送 20 条避免演示时停不下来 .map(seq - ServerSentEvent.Stringbuilder() .id(String.valueOf(seq)) .event(number) .data(当前序号 seq) .comment(心跳保持) // 浏览器端不会触发 onmessage但能保持连接活性 .build()); } }注意几个细节produces MediaType.TEXT_EVENT_STREAM_VALUE把响应头设为Content-Type: text/event-stream。ServerSentEvent.builder()可以显式设置id、event、data、comment这是 SSE 协议的标准字段。Flux.interval是冷的还是热的它是热的订阅后按时间轴发数据适合模拟真实推送。启动后浏览器访问 http://localhost:8080/sse/numbers如果看到一段段data:开头的文本说明 SSE 通路已通。四、代码实战二接入 Spring AI实现真正的流式对话Spring AI 从 0.8 开始就支持stream()方法返回FluxString或FluxChatResponse。我们把它桥接到 SSE。dependency groupIdorg.springframework.ai/groupId artifactIdspring-ai-openai-spring-boot-starter/artifactId version1.0.0-M1/version /dependency !-- 如果你用通义千问 / DeepSeek换成对应的 starter 即可 --配置文件application.ymlspring: ai: openai: api-key: ${OPENAI_API_KEY} base-url: https://api.openai.com/v1 # 国内环境通常需要代理或中转 chat: options: model: gpt-4o-mini temperature: 0.7流式对话控制器import org.springframework.ai.chat.client.ChatClient; import org.springframework.ai.chat.messages.UserMessage; import org.springframework.ai.chat.model.ChatResponse; import org.springframework.ai.chat.prompt.Prompt; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; RestController public class ChatStreamController { private final ChatClient chatClient; public ChatStreamController(ChatClient.Builder chatClientBuilder) { this.chatClient chatClientBuilder.build(); } GetMapping(value /ai/chat/stream, produces MediaType.TEXT_EVENT_STREAM_VALUE) public FluxServerSentEventString chatStream(RequestParam String question) { // 1. 构造 Prompt Prompt prompt new Prompt(new UserMessage(question)); // 2. 调用大模型流式输出 FluxChatResponse responseFlux chatClient.prompt(prompt) .stream() .chatResponse(); // 3. 把 ChatResponse 转成 SSE 事件 return responseFlux .map(resp - { String content resp.getResult() ! null resp.getResult().getOutput() ! null ? resp.getResult().getOutput().getContent() : ; return ServerSentEvent.Stringbuilder() .event(message) .data(content) .build(); }) // 4. 流结束时推送一个 [DONE] 标记前端好做收尾 .concatWith(Flux.just( ServerSentEvent.Stringbuilder() .event(done) .data([DONE]) .build() )) // 5. 异常处理不要让整个流直接断掉 .onErrorResume(e - Flux.just( ServerSentEvent.Stringbuilder() .event(error) .data(调用模型失败 e.getMessage()) .build() )); } }这里有几个老兵经验不要直接返回FluxString给前端。用ServerSentEvent包装后前端可以通过event字段区分“正常内容”“结束标记”“错误消息”。一定要处理空 token。大模型流式接口有时会发空包直接getContent()可能 NPE。错误处理用onErrorResume。如果模型 API 超时或限流整个Flux会终止必须给用户一个错误事件而不是浏览器半天没反应。五、代码实战三前端 EventSource 接收与渲染SSE 在浏览器端用原生EventSource即可无需额外库!DOCTYPE html html head meta charsetUTF-8 titleAI 流式对话/title /head body input idquestion typetext value用 Java 写一个线程安全的单例模式 stylewidth: 400px; button onclickstartChat()发送/button pre idanswer stylebackground:#f5f5f5;padding:16px;border-radius:6px;min-height:60px;/pre script function startChat() { const question document.getElementById(question).value; const answerEl document.getElementById(answer); answerEl.textContent ; // 注意EventSource 只支持 GET且不能自定义请求头 const evtSource new EventSource(/ai/chat/stream?question${encodeURIComponent(question)}); evtSource.addEventListener(message, (event) { answerEl.textContent event.data; }); evtSource.addEventListener(done, () { console.log(流式输出完成); evtSource.close(); }); evtSource.addEventListener(error, (event) { console.error(发生错误, event.data); evtSource.close(); }); // 浏览器自动重连如果连接断开EventSource 会按指数退避重试 // 生产环境建议配合 last-event-id 做断点续传 } /script /body /html前端有三个注意点EventSource只支持 GET参数放在 URL 里。如果需要 POST 或复杂请求头得用fetch ReadableStream自己解析 SSE。收到done事件后一定要close()否则浏览器会按 SSE 规范自动重连。生产环境可以利用Last-Event-ID做断线续传但这需要后端在ServerSentEvent中设置id()。六、原理图解一条 AI 回复是怎么“流”到前端的┌─────────────┐ HTTP GET/SSE ┌─────────────────────┐ │ 浏览器 │ ◄───────────────────── │ Spring WebFlux │ │ EventSource │ text/event-stream │ ChatStreamController│ └─────────────┘ └──────────┬──────────┘ │ ▼ ┌──────────────────┐ │ Spring AI │ │ ChatClient │ └──────────┬───────┘ │ ▼ ┌───────────────┐ │ 大模型 API │ │ stream() │ └───────────────┘整个链路是异步非阻塞的WebFlux 用 Reactor Netty 接收请求不会为每个连接占一个线程。Spring AI 调用大模型流式接口拿到FluxChatResponse。每个 token 到达后立即通过 SSE 推送给浏览器。浏览器逐字渲染用户感知不到后端等待。在高并发场景下虚拟线程 WebFlux SSE 的组合能让一台 4C8G 的机器轻松支撑几千个并发长连接。换成传统 Servlet 每个连接一个线程的方案线程数早就被打爆了。七、建议超时与限流必须做。大模型 API 不是你家 MySQL延迟波动极大。给Flux加上.timeout(Duration.ofSeconds(30))网关层配 Token 桶限流避免一个慢请求拖垮连接池。日志不要打印完整流。我见过有人在doOnNext里把每个 token 都打印出来结果 3000 字的回复打了 3000 行日志ELK 直接爆表。只记录首包时间和总 token 数即可。简单问题别用流式。如果业务场景是“生成一句话摘要”直接同步返回更省资源。流式更适合长文本、低延迟感知、强交互感的场景。八、结尾AI 时代接口设计正在从“请求-响应”进化成“请求-流”。用户体验的差距往往不在模型本身而在你能不能让用户“看见”答案正在生成。下一篇预告Day 15《Spring IOC 容器启动全流程从 ApplicationContext 到 Bean 实例化》我们正式进入 Spring 源码深水区。