Go Gin 实现 SSE 服务端主动推流
package main import ( "fmt" "github.com/gin-gonic/gin" "time" ) func main() { r := gin.Default() // 创建SSE路由 r.GET("/sse", func(c *gin.Context) { // 配置响应头 c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") // 循环发送数据 ticker := time.NewTicker(2 * time.Second) for _ = range ticker.C { // 构建SSE格式的数据 data := fmt.Sprintf("data: %s\n\n", time.Now().Format(time.RFC1123)) // 向客户端发送数据 _, err := c.Writer.Write([]byte(data)) if err != nil { // 如果写入发生错误,退出循环 break } // 刷新缓冲区,确保即时发送 c.Writer.Flush() } }) // 启动服务器 r.Run() // 默认在 localhost:8080 上运行 }
使用 Gin 自带的Stream SSEvent 方法来实现
package main import ( "fmt" "github.com/gin-gonic/gin" "io" "time" ) func main() { router := gin.Default() // SSE路由 router.GET("/stream", func(c *gin.Context) { c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") // 创建消息通道 messageChan := make(chan string) // 后台定期发送消息 go func() { for { time.Sleep(time.Second * 10) currentTime := fmt.Sprintf("当前时间: %s", time.Now().Format("15:04:05")) messageChan <- currentTime } }() // 监听消息通道并发送SSE c.Stream(func(w io.Writer) bool { if msg, ok := <-messageChan; ok { c.SSEvent("message", msg) return true } return false }) }) router.Run(":8080") }
OpenaAI 的stream请求 CURL 测试脚本
curl https://api.openai.com/v1/chat/completions -H "Content-Type: application/json" -H "Authorization: Bearer sk-S0xkgox19mxQo " -d '{ "model": "gpt-3.5-turbo-1106", "stream":true, "messages": [ { "role": "system", "content": " " }, { "role": "user", "content": "apple" } ] }'
实际的返回结果为
... data: {"id":"chatcmpl-8hEoIdntJirx2wm4iCIT9u2fkECqe","object":"chat.completion.chunk","created":1705315726,"model":"gpt-3.5-turbo-1106","system_fingerprint":"fp_fe56e538d5","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]} data: {"id":"chatcmpl-8hEoIdntJirx2wm4iCIT9u2fkECqe","object":"chat.completion.chunk","created":1705315726,"model":"gpt-3.5-turbo-1106","system_fingerprint":"fp_fe56e538d5","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} data: [DONE]
Go转发请求到OpenAI并以SSE形式响应
package main import ( "bufio" "bytes" "encoding/json" "github.com/gin-gonic/gin" "io" "net/http" ) func main() { router := gin.Default() //转发请求到OpenAI并以SSE形式响应 router.GET("/stream", func(c *gin.Context) { // 创建到OpenAI API的请求 url := "https://api.openai.com/v1/chat/completions" requestBody, err := json.Marshal(map[string]interface{}{ "model": "gpt-3.5-turbo-1106", "stream": true, "messages": []map[string]string{ { "role": "system", "content": " ", }, { "role": "user", "content": "apple公司", }, }, }) if err != nil { c.AbortWithError(http.StatusInternalServerError, err) return } req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer sk-S0AtQo") // 使用您的API密钥 // 发送请求 resp, err := http.DefaultClient.Do(req) if err != nil { c.AbortWithError(http.StatusInternalServerError, err) return } // 设置SSE响应头 c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") // 使用c.Stream读取并转发数据 reader := bufio.NewReader(resp.Body) c.Stream(func(w io.Writer) bool { line, err := reader.ReadBytes('\n') if err != nil { return false // 结束流 } _, writeErr := c.Writer.Write(line) if writeErr != nil { return false // 结束流 } return true // 继续流 }) }) router.Run(":8080") }