Flutter Dart SSE 客户端的实现 要求请求携带payload 和认证信息

76 min read

以下是一种可能的实现方式:

import 'dart:async';
import 'dart:convert';

import 'package:http/http.dart' as http;
import 'package:http_parser/http_parser.dart';

class SSEClient {
  final String url;
  final Map<String, String> headers;
  final Map<String, dynamic> payload;

  SSEClient(this.url, this.headers, this.payload);

  Future<void> connect() async {
    final request = http.Request('GET', Uri.parse(url));
    request.headers.addAll(headers);
    if (payload != null) {
      request.body = json.encode(payload);
      request.headers[Headers.contentTypeHeader] = 'application/json';
    }
    
    final client = http.Client();
    final response = await client.send(request);
    
    // Check if the response from the server is OK.
    if (response.statusCode != HttpStatus.ok) {
      throw Exception('Failed to connect: ${response.statusCode}');
    }
    
    // Use the StreamTransformer to convert the response Stream to SSE events.
    final events = response.stream
        .transform(EventSourceTransformer())
        .handleError((error) => print('Error: $error'));
    
    // Handle the SSE events.
    await for (final event in events) {
      print('Event: ${event.type} ${event.data}');
    }
    
    // Close the client when the SSE connection is closed.
    await client.close();
  }
}

class EventSourceTransformer extends StreamTransformerBase<http.StreamedResponse, Event> {
  final _controller = StreamController<Event>();

  @override
  Stream<Event> bind(Stream<http.StreamedResponse> stream) {
    stream.listen((response) {
      response.stream.transform(utf8.decoder).transform(LineSplitter()).listen((line) {
        if (line.isEmpty) {
          // Dispatch the current event and start a new one.
          _dispatch();
        } else if (line.startsWith(':')) {
          // Ignore the comment line.
        } else if (line.contains(':')) {
          // Parse the event field.
          final parts = line.split(':');
          switch (parts[0]) {
            case 'event':
              _event.parts[1] = parts[1];
              break;
            case 'id':
              _event.id = parts[1];
              break;
            case 'retry':
              // Ignored for simplicity.
              break;
          }
        } else {
          // Append the data line.
          _event.data.add(line);
        }
      }, onDone: _dispatch);
    }, onError: _controller.addError, onDone: _controller.close);

    return _controller.stream;
  }

  @override
  StreamTransformer<http.StreamedResponse, R> cast<R>() => throw UnimplementedError();

  void _dispatch() {
    // Dispatch the current event and start a new one.
    _controller.add(_event);
    _event = Event();
  }
}

class Event {
  String type;
  String id;
  List<String> data = [];

  @override
  String toString() {
    return 'Event{type: $type, id: $id, data: ${data.join(', ')}}';
  }
}

使用示例:

void main() async {
  final client = SSEClient(
    'http://localhost:8080/sse',
    {
      'Authorization': 'Bearer j6y7UJYi56jK7j6hUIg69IGtyh8P',
    },
    {'name': 'Alice', 'age': 25},
  );
  
  await client.connect();
}