SSE(Server-Sent Events,服务端推送事件)是一种允许服务端主动向客户端推送数据的技术,常用于实时数据更新和消息通知的场景。本文详细介绍了SSE的工作原理及其与WebSocket的区别,并通过代码示例展示了如何实现SSE。
SSE的工作原理
-
建立连接
客户端发起一个包含特殊头字段的HTTP请求以开启SSE会话:GET /events HTTP/1.1 Accept: text/event-stream
服务端看到该头字段后,返回包含以下内容的响应头,表示连接成功:
HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive
-
发送消息
服务端通过保持连接开放,随时向客户端推送数据。每条消息由以下部分组成:data
: 实际的消息数据。id
: 可选,消息的唯一标识符。event
: 可选,定义事件类型。retry
: 可选,自动重连时间。
每条消息以两个换行符结尾,例如:
data: Hello World\n\n
SSE与WebSocket的区别
-
通信方式
- WebSocket提供全双工通信,服务端和客户端都可以发送和接收数据。
- SSE仅提供服务端到客户端的单向通信。
-
协议和实现
- WebSocket使用自己的协议(ws://或wss://),实现复杂。
- SSE使用标准的HTTP协议,实现简单。
-
适用场景
- WebSocket适用于双向实时通信,如在线游戏、聊天应用。
- SSE适用于单向数据推送,如消息通知、数据更新。
-
复杂性和资源使用
- WebSocket更复杂,资源消耗更多。
- SSE简单,利用现有网络基础设施,更易实现和维护。
代码示例
服务端代码(Python)
使用asyncio
库实现SSE:
import asyncio from asyncio import StreamReader, StreamWriter class SSE: def __init__(self, host="0.0.0.0", port=9999): self.host = host self.port = port @staticmethod def parse_request_headers(data: bytes) -> dict: headers = data.split(b"\r\n\r\n")[0].split(b"\r\n") header_dict = {} for header in headers[1:]: key, val = header.decode("utf-8").split(":", 1) header_dict[key.lower()] = val.strip() return header_dict async def handler_requests(self, reader: StreamReader, writer: StreamWriter): data = await reader.readuntil(b"\r\n\r\n") request_headers = self.parse_request_headers(data) if request_headers.get("accept") != "text/event-stream": writer.close() return await writer.wait_closed() response_header = ( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/event-stream\r\n" b"Cache-Control: no-cache\r\n" b"Connection: keep-alive\r\n" b'Access-Control-Allow-Origin: *\r\n' b"\r\n" ) writer.write(response_header) await writer.drain() for _ in range(5): data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8") writer.write(data) await writer.drain() await asyncio.sleep(1) writer.close() await writer.wait_closed() async def __create_server(self): server = await asyncio.start_server(self.handler_requests, self.host, self.port) async with server: await server.serve_forever() def run_server(self): loop = asyncio.get_event_loop() loop.run_until_complete(self.__create_server()) if __name__ == '__main__': sse = SSE() sse.run_server()
前端代码(HTML)
与服务端建立SSE连接并接收数据:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <style> #data { font-weight: bold; color: cadetblue; font-size: large; } </style> </head> <body> <h1>SSE Test</h1> <div id="data"></div> <script> document.addEventListener("DOMContentLoaded", function () { var eventSource = new EventSource("http://localhost:9999"); eventSource.onmessage = function (e) { var data = e.data + "\n"; document.getElementById('data').innerText += data; }; eventSource.onerror = function (e) { console.error('Error occurred:', e); eventSource.close(); }; }); </script> </body> </html>
以上代码展示了如何使用SSE进行实时数据推送。根据业务需求,选择适合的技术实现实时通信。