以下是一种可能的实现方式:
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:http_parser/http_parser.dart';
class SSEClient {
final String url;
final Map<String, String> headers;
final Map<String, dynamic> payload;
SSEClient(this.url, this.headers, this.payload);
Future<void> connect() async {
final request = http.Request('GET', Uri.parse(url));
request.headers.addAll(headers);
if (payload != null) {
request.body = json.encode(payload);
request.headers[Headers.contentTypeHeader] = 'application/json';
}
final client = http.Client();
final response = await client.send(request);
// Check if the response from the server is OK.
if (response.statusCode != HttpStatus.ok) {
throw Exception('Failed to connect: ${response.statusCode}');
}
// Use the StreamTransformer to convert the response Stream to SSE events.
final events = response.stream
.transform(EventSourceTransformer())
.handleError((error) => print('Error: $error'));
// Handle the SSE events.
await for (final event in events) {
print('Event: ${event.type} ${event.data}');
}
// Close the client when the SSE connection is closed.
await client.close();
}
}
class EventSourceTransformer extends StreamTransformerBase<http.StreamedResponse, Event> {
final _controller = StreamController<Event>();
@override
Stream<Event> bind(Stream<http.StreamedResponse> stream) {
stream.listen((response) {
response.stream.transform(utf8.decoder).transform(LineSplitter()).listen((line) {
if (line.isEmpty) {
// Dispatch the current event and start a new one.
_dispatch();
} else if (line.startsWith(':')) {
// Ignore the comment line.
} else if (line.contains(':')) {
// Parse the event field.
final parts = line.split(':');
switch (parts[0]) {
case 'event':
_event.parts[1] = parts[1];
break;
case 'id':
_event.id = parts[1];
break;
case 'retry':
// Ignored for simplicity.
break;
}
} else {
// Append the data line.
_event.data.add(line);
}
}, onDone: _dispatch);
}, onError: _controller.addError, onDone: _controller.close);
return _controller.stream;
}
@override
StreamTransformer<http.StreamedResponse, R> cast<R>() => throw UnimplementedError();
void _dispatch() {
// Dispatch the current event and start a new one.
_controller.add(_event);
_event = Event();
}
}
class Event {
String type;
String id;
List<String> data = [];
@override
String toString() {
return 'Event{type: $type, id: $id, data: ${data.join(', ')}}';
}
}
使用示例:
void main() async {
final client = SSEClient(
'http://localhost:8080/sse',
{
'Authorization': 'Bearer j6y7UJYi56jK7j6hUIg69IGtyh8P',
},
{'name': 'Alice', 'age': 25},
);
await client.connect();
}