Go 转发请求到OpenAI并以SSE形式响应

101 min read

Go Gin 实现 SSE 服务端主动推流

package main

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"time"
)

func main() {
	r := gin.Default()

	// 创建SSE路由
	r.GET("/sse", func(c *gin.Context) {
		// 配置响应头
		c.Writer.Header().Set("Content-Type", "text/event-stream")
		c.Writer.Header().Set("Cache-Control", "no-cache")
		c.Writer.Header().Set("Connection", "keep-alive")

		// 循环发送数据
		ticker := time.NewTicker(2 * time.Second)
		for _ = range ticker.C {
			// 构建SSE格式的数据
			data := fmt.Sprintf("data: %s\n\n", time.Now().Format(time.RFC1123))

			// 向客户端发送数据
			_, err := c.Writer.Write([]byte(data))
			if err != nil {
				// 如果写入发生错误,退出循环
				break
			}

			// 刷新缓冲区,确保即时发送
			c.Writer.Flush()
		}
	})

	// 启动服务器
	r.Run() // 默认在 localhost:8080 上运行
}

使用 Gin 自带的Stream SSEvent 方法来实现

package main

import (
	"fmt"
	"github.com/gin-gonic/gin"
	"io"
	"time"
)

func main() {
	router := gin.Default()

	// SSE路由
	router.GET("/stream", func(c *gin.Context) {
		c.Writer.Header().Set("Content-Type", "text/event-stream")
		c.Writer.Header().Set("Cache-Control", "no-cache")
		c.Writer.Header().Set("Connection", "keep-alive")

		// 创建消息通道
		messageChan := make(chan string)

		// 后台定期发送消息
		go func() {
			for {
				time.Sleep(time.Second * 10)
				currentTime := fmt.Sprintf("当前时间: %s", time.Now().Format("15:04:05"))
				messageChan <- currentTime
			}
		}()

		// 监听消息通道并发送SSE
		c.Stream(func(w io.Writer) bool {
			if msg, ok := <-messageChan; ok {
				c.SSEvent("message", msg)
				return true
			}
			return false
		})
	})

	router.Run(":8080")
}

OpenaAI 的stream请求 CURL 测试脚本

curl https://api.openai.com/v1/chat/completions   -H "Content-Type: application/json"   -H "Authorization: Bearer  sk-S0xkgox19mxQo "   -d '{
    "model": "gpt-3.5-turbo-1106",
    "stream":true,
    "messages": [
      {
        "role": "system",
        "content": " "
      },
      {
        "role": "user",
        "content": "apple"
      }
    ]
}'

实际的返回结果为

...



data: {"id":"chatcmpl-8hEoIdntJirx2wm4iCIT9u2fkECqe","object":"chat.completion.chunk","created":1705315726,"model":"gpt-3.5-turbo-1106","system_fingerprint":"fp_fe56e538d5","choices":[{"index":0,"delta":{"content":"."},"logprobs":null,"finish_reason":null}]}

data: {"id":"chatcmpl-8hEoIdntJirx2wm4iCIT9u2fkECqe","object":"chat.completion.chunk","created":1705315726,"model":"gpt-3.5-turbo-1106","system_fingerprint":"fp_fe56e538d5","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}


data: [DONE]

Go转发请求到OpenAI并以SSE形式响应

package main

import (
	"bufio"
	"bytes"
	"encoding/json"
	"github.com/gin-gonic/gin"
	"io"
	"net/http"
)

func main() {
	router := gin.Default()

	//转发请求到OpenAI并以SSE形式响应
	router.GET("/stream", func(c *gin.Context) {
		// 创建到OpenAI API的请求
		url := "https://api.openai.com/v1/chat/completions"
		requestBody, err := json.Marshal(map[string]interface{}{
			"model":  "gpt-3.5-turbo-1106",
			"stream": true,
			"messages": []map[string]string{
				{
					"role":    "system",
					"content": " ",
				},
				{
					"role":    "user",
					"content": "apple公司",
				},
			},
		})
		if err != nil {
			c.AbortWithError(http.StatusInternalServerError, err)
			return
		}

		req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
		req.Header.Set("Content-Type", "application/json")
		req.Header.Set("Authorization", "Bearer sk-S0AtQo") // 使用您的API密钥

		// 发送请求
		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			c.AbortWithError(http.StatusInternalServerError, err)
			return
		}

		// 设置SSE响应头
		c.Writer.Header().Set("Content-Type", "text/event-stream")
		c.Writer.Header().Set("Cache-Control", "no-cache")
		c.Writer.Header().Set("Connection", "keep-alive")
		// 使用c.Stream读取并转发数据
		reader := bufio.NewReader(resp.Body)
		c.Stream(func(w io.Writer) bool {
			line, err := reader.ReadBytes('\n')
			if err != nil {
				return false // 结束流
			}
			_, writeErr := c.Writer.Write(line)
			if writeErr != nil {
				return false // 结束流
			}
			return true // 继续流
		})
	})

	router.Run(":8080")
}