import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'package:dio/dio.dart'; import 'package:flutter_secure_storage/flutter_secure_storage.dart'; import '../config/app_config.dart'; import 'api_client.dart'; class SseEvent { final String event; final Map data; SseEvent({required this.event, required this.data}); } class SseService { final ApiClient _api; final FlutterSecureStorage _storage = const FlutterSecureStorage(); final StreamController _eventController = StreamController.broadcast(); final StreamController _connectionController = StreamController.broadcast(); CancelToken? _cancelToken; Timer? _reconnectTimer; int _reconnectAttempt = 0; bool _shouldReconnect = true; Stream get events => _eventController.stream; Stream get connectionState => _connectionController.stream; SseService(this._api); Future connect() async { _shouldReconnect = true; _reconnectAttempt = 0; await _doConnect(); } Future _doConnect() async { _cancelToken?.cancel(); _cancelToken = CancelToken(); try { final token = await _storage.read(key: 'access_token'); final response = await _api.dio.get( '/stream/events', options: Options( headers: {'Authorization': 'Bearer $token'}, responseType: ResponseType.stream, ), cancelToken: _cancelToken, ); _connectionController.add(true); _reconnectAttempt = 0; final stream = response.data.stream as Stream>; String buffer = ''; await for (final chunk in stream) { buffer += utf8.decode(chunk); final lines = buffer.split('\n'); buffer = lines.removeLast(); // keep incomplete line in buffer String? eventName; String? dataStr; for (final line in lines) { if (line.startsWith('event:')) { eventName = line.substring(6).trim(); } else if (line.startsWith('data:')) { dataStr = line.substring(5).trim(); } else if (line.isEmpty && eventName != null && dataStr != null) { try { final data = jsonDecode(dataStr) as Map; _eventController.add(SseEvent(event: eventName, data: data)); } catch (_) {} eventName = null; dataStr = null; } } } } catch (e) { if (e is DioException && e.type == DioExceptionType.cancel) return; _connectionController.add(false); } if (_shouldReconnect) { _scheduleReconnect(); } } void _scheduleReconnect() { _reconnectTimer?.cancel(); final delay = Duration( milliseconds: min( AppConfig.sseMaxReconnect.inMilliseconds, AppConfig.sseReconnectBase.inMilliseconds * pow(2, _reconnectAttempt).toInt(), ), ); _reconnectAttempt++; _reconnectTimer = Timer(delay, _doConnect); } void disconnect() { _shouldReconnect = false; _reconnectTimer?.cancel(); _cancelToken?.cancel(); _connectionController.add(false); } void dispose() { disconnect(); _eventController.close(); _connectionController.close(); } }