SSE的工作原理

61 min read

SSE(Server-Sent Events,服务端推送事件)是一种允许服务端主动向客户端推送数据的技术,常用于实时数据更新和消息通知的场景。本文详细介绍了SSE的工作原理及其与WebSocket的区别,并通过代码示例展示了如何实现SSE。

SSE的工作原理

  1. 建立连接
    客户端发起一个包含特殊头字段的HTTP请求以开启SSE会话:

    GET /events HTTP/1.1
    Accept: text/event-stream
    

    服务端看到该头字段后,返回包含以下内容的响应头,表示连接成功:

    HTTP/1.1 200 OK
    Content-Type: text/event-stream
    Cache-Control: no-cache
    Connection: keep-alive
    
  2. 发送消息
    服务端通过保持连接开放,随时向客户端推送数据。每条消息由以下部分组成:

    • data: 实际的消息数据。
    • id: 可选,消息的唯一标识符。
    • event: 可选,定义事件类型。
    • retry: 可选,自动重连时间。

    每条消息以两个换行符结尾,例如:

    data: Hello World\n\n
    

SSE与WebSocket的区别

  • 通信方式

    • WebSocket提供全双工通信,服务端和客户端都可以发送和接收数据。
    • SSE仅提供服务端到客户端的单向通信。
  • 协议和实现

    • WebSocket使用自己的协议(ws://或wss://),实现复杂。
    • SSE使用标准的HTTP协议,实现简单。
  • 适用场景

    • WebSocket适用于双向实时通信,如在线游戏、聊天应用。
    • SSE适用于单向数据推送,如消息通知、数据更新。
  • 复杂性和资源使用

    • WebSocket更复杂,资源消耗更多。
    • SSE简单,利用现有网络基础设施,更易实现和维护。

代码示例

服务端代码(Python)

使用asyncio库实现SSE:

import asyncio
from asyncio import StreamReader, StreamWriter

class SSE:

    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port

    @staticmethod
    def parse_request_headers(data: bytes) -> dict:
        headers = data.split(b"\r\n\r\n")[0].split(b"\r\n")
        header_dict = {}
        for header in headers[1:]:
            key, val = header.decode("utf-8").split(":", 1)
            header_dict[key.lower()] = val.strip()
        return header_dict

    async def handler_requests(self, reader: StreamReader, writer: StreamWriter):
        data = await reader.readuntil(b"\r\n\r\n")
        request_headers = self.parse_request_headers(data)
        if request_headers.get("accept") != "text/event-stream":
            writer.close()
            return await writer.wait_closed()
        
        response_header = (
            b"HTTP/1.1 200 OK\r\n"
            b"Content-Type: text/event-stream\r\n"
            b"Cache-Control: no-cache\r\n"
            b"Connection: keep-alive\r\n"
            b'Access-Control-Allow-Origin: *\r\n'
            b"\r\n"
        )
        writer.write(response_header)
        await writer.drain()

        for _ in range(5):
            data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
            writer.write(data)
            await writer.drain()
            await asyncio.sleep(1)
        writer.close()
        await writer.wait_closed()

    async def __create_server(self):
        server = await asyncio.start_server(self.handler_requests, self.host, self.port)
        async with server:
            await server.serve_forever()

    def run_server(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__create_server())

if __name__ == '__main__':
    sse = SSE()
    sse.run_server()

前端代码(HTML)

与服务端建立SSE连接并接收数据:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <style>
        #data {
            font-weight: bold;
            color: cadetblue;
            font-size: large;
        }
    </style>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        document.addEventListener("DOMContentLoaded", function () {
            var eventSource = new EventSource("http://localhost:9999");
            eventSource.onmessage = function (e) {
                var data = e.data + "\n";
                document.getElementById('data').innerText += data;
            };
            eventSource.onerror = function (e) {
                console.error('Error occurred:', e);
                eventSource.close();
            };
        });
    </script>
</body>
</html>

以上代码展示了如何使用SSE进行实时数据推送。根据业务需求,选择适合的技术实现实时通信。