流式输出技术SSE:前后端实践

74 min read

在本文中,我们介绍了ChatGPT使用的流式输出技术SSE(Server-Sent Events)的技术原理,并通过代码示例展示了如何在Web应用程序中有效地使用SSE实现实时数据推送和流式输出。

一、背景介绍

在使用ChatGPT时,模型的回复是逐字逐句生成的,而不是一次性生成整个回答。这是因为语言模型需要在每个时间步骤预测下一个最合适的单词或字符。逐字生成的方式可以实现更快的交互响应,提高用户体验,避免长时间等待。

二、ChatGPT 流式输出原理

ChatGPT的completion API支持流式输出,当stream参数设置为true时,将会使用SSE技术流式输出结果。

示例请求:

curl -i -X POST -H 'Content-Type: application/json' -H 'Authorization: Bearer sk-************************************************' https://api.openai.com/v1/chat/completions -d '{"model":"gpt-3.5-turbo","messages":[{"role": "user", "content": "3+5=?"}],"temperature":0.8,"stream":true}'

响应结果:

HTTP/2 200
content-type: text/event-stream
...

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","choices":[{"delta":{"content":"3"}}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","choices":[{"delta":{"content":" +"}}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","choices":[{"delta":{"content":"5"}}]}
data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","choices":[{"delta":{"content":" = 8"}}]}
data: [DONE]

三、SSE技术介绍

SSE(Server-Sent Events)是一种基于HTTP的技术,允许服务器主动推送数据到客户端。SSE使用标准的HTTP协议,简单易用,适用于需要实时更新和通知的应用场景。

SSE的优点包括:

  • 实时性:服务器可以主动推送数据,实时更新客户端。
  • 简单易用:基于HTTP协议,无需额外库或协议转换。
  • 可靠性:使用HTTP连接,兼容性好,支持自动重连。
  • 轻量级:不需要建立全双工连接,减少通信开销。

四、SSE前端实践

在前端使用JavaScript的EventSource来实现流式输出。

示例代码:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Example</title>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        const eventSource = new EventSource('/stream');

        eventSource.onmessage = function(event) {
            const data = JSON.parse(event.data);
            document.getElementById('data').innerText += data.message + "\n";
        };

        eventSource.onerror = function(event) {
            console.error('Error occurred:', event);
            eventSource.close();
        };
    </script>
</body>
</html>

五、SSE Java 实践

使用Spring WebFlux实现SSE流式输出。

示例代码:

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.time.Duration;

@RestController
public class SSEController {

    @GetMapping("/stream")
    public Flux<ServerSentEvent<String>> streamData() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(String.valueOf(sequence))
                        .event("data")
                        .data("Sample data " + sequence)
                        .build());
    }
}

六、SSE Python 实践

使用Flask和flask-sse库实现SSE流式输出。

示例代码:

from flask import Flask, render_template
from flask_sse import sse

app = Flask(__name__)
app.register_blueprint(sse, url_prefix="/stream")

@app.route("/")
def index():
    return render_template("index.html")

@app.route("/send_message")
def send_message():
    sse.publish({"message": "Hello, SSE!"}, type="message")
    return "Message sent!"

if __name__ == "__main__":
    app.run(debug=True)

七、实时消息推送实践

结合Redis pub/sub频道和SSE技术,实现实时消息推送。

示例代码:

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

@RestController
public class SSEController {

    @GetMapping("/stream-with-heart-beat")
    public Flux<ServerSentEvent<String>> streamWithHeartBeat() {
        return Flux.merge(
                Flux.interval(Duration.ofSeconds(1)).map(sequence -> ServerSentEvent.builder()
                        .event("data")
                        .data("Sample data " + sequence)
                        .build()),
                Flux.interval(Duration.ofSeconds(5)).map(sequence -> ServerSentEvent.builder()
                        .event("heartbeat")
                        .data("Heartbeat")
                        .build())
        );
    }
}

通过以上的示例代码,你可以了解如何使用SSE技术在前后端实现流式输出,并结合Redis实现实时消息推送。希望这些示例能够帮助你在实际项目中更好地应用SSE技术。