- 什么是 SSE
SSE(Server-Sent Event,服务端推送事件)是一种用于客户端与服务器端实时通讯的技术。它允许服务器端发送事件到客户端,客户端可以通过 EventSource 接口来接收这些事件。通常情况下,SSE 是基于 HTTP 协议实现的,它不需要建立和维护长连接,但服务器可以长时间向客户端推送数据,而客户端只需要等待并处理收到的数据即可。
- Go 语言实现 SSE 服务端
实现 SSE 服务端需要使用 Go 的 net/http 包,具体步骤如下:
首先,定义一个结构体 ServerSentEvent,用于存储 SSE 事件的数据:
type ServerSentEvent struct {
EventType string
Data string
}
然后,定义一个函数 serveSSE,用于处理 SSE 请求。在函数中,我们首先要设置响应头,告知客户端返回数据的 MIME 类型为 text/event-stream,字符集为 utf-8:
func serveSSE(w http.ResponseWriter, r http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "")
// ...
}
接下来,我们可以在函数中添加一个 for 循环,不断向客户端推送数据。在循环中,我们可以使用 Fprintf 函数来向客户端写入数据,格式为:
id: 事件 ID\n
event: 事件类型\n
data: 事件数据\n\n
其中,事件 ID 和事件类型是可选的。
func serveSSE(w http.ResponseWriter, r *http.Request) {
// 设置响应头
for {
event := &ServerSentEvent{
EventType: "message",
Data: "this is a test message",
}
fmt.Fprintf(w, "id: %d\n", time.Now().Unix())
fmt.Fprintf(w, "event: %s\n", event.EventType)
fmt.Fprintf(w, "data: %s\n\n", event.Data)
// 在每次写入数据后需要刷新缓冲区
w.(http.Flusher).Flush()
time.Sleep(time.Second * 3)
}
}
完整的代码如下:
package main
import (
"fmt"
"net/http"
"time"
)
type ServerSentEvent struct {
EventType string
Data string
}
func serveSSE(w http.ResponseWriter, r http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "")
for {
event := &ServerSentEvent{
EventType: "message",
Data: "this is a test message",
}
fmt.Fprintf(w, "id: %d\n", time.Now().Unix())
fmt.Fprintf(w, "event: %s\n", event.EventType)
fmt.Fprintf(w, "data: %s\n\n", event.Data)
w.(http.Flusher).Flush()
time.Sleep(time.Second * 3)
}
}
func main() {
http.HandleFunc("/", serveSSE)
http.ListenAndServe(":8080", nil)
}
- Go 语言实现 SSE 客户端
实现 SSE 客户端同样需要使用 Go 的 net/http 包。在客户端中,我们可以使用 http.NewRequest 函数创建一个 HTTP 请求,并设置请求头。然后,我们可以使用 http.Client 的 Do 方法发送请求,得到一个响应,使用 bufio.NewReader 函数创建一个 Reader 对象以读取 SSE 事件的数据。
接下来,我们可以使用一个 for 循环不断从 Reader 对象中读取数据,并对传来的数据进行解析:
func main() {
req, _ := http.NewRequest("GET", "http://localhost:8080", nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
res, _ := client.Do(req)
defer res.Body.Close()
reader := bufio.NewReader(res.Body)
for {
line, _ := reader.ReadString('\n')
if line == "\n" || line == "\r\n" {
// 一个完整的事件读取完成
continue
}
fields := strings.SplitN(line, ":", 2)
if len(fields) < 2 {
continue
}
switch fields[0] {
case "event":
fmt.Printf("event: %s\n", fields[1])
case "data":
fmt.Printf("data: %s\n", fields[1])
case "id":
fmt.Printf("id: %s\n", fields[1])
case "retry":
fmt.Printf("retry: %s\n", fields[1])
}
}
}
完整的代码如下:
package main
import (
"bufio"
"fmt"
"net/http"
"strings"
)
func main() {
req, _ := http.NewRequest("GET", "http://localhost:8080", nil)
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
res, _ := client.Do(req)
defer res.Body.Close()
reader := bufio.NewReader(res.Body)
for {
line, _ := reader.ReadString('\n')
if line == "\n" || line == "\r\n" {
continue
}
fields := strings.SplitN(line, ":", 2)
if len(fields) < 2 {
continue
}
switch fields[0] {
case "event":
fmt.Printf("event: %s\n", fields[1])
case "data":
fmt.Printf("data: %s\n", fields[1])
case "id":
fmt.Printf("id: %s\n", fields[1])
case "retry":
fmt.Printf("retry: %s\n", fields[1])
}
}
}
- 测试 SSE 服务端和客户端
在终端中运行 SSE 服务端代码,可以看到服务端不断向客户端推送事件:
$ go run server.go
id: 1636455068
event: message
data: this is a test message
id: 1636455071
event: message
data: this is a test message
...
在另一个终端中运行 SSE 客户端代码,可以看到客户端不断接收到事件:
$ go run client.go
event: message
data: this is a test message
event: message
data: this is a test message
event: message
data: this is a test message
...
表示 SSE 服务端和客户端都已经成功实现。