import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'package:dio/dio.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter_secure_storage/flutter_secure_storage.dart'; import '../config/app_config.dart'; import 'api_client.dart'; class SseEvent { final String event; final Map data; SseEvent({required this.event, required this.data}); } class SseService { final ApiClient _api; final FlutterSecureStorage _storage = const FlutterSecureStorage(); final StreamController _eventController = StreamController.broadcast(); final StreamController _connectionController = StreamController.broadcast(); CancelToken? _cancelToken; Timer? _reconnectTimer; int _reconnectAttempt = 0; bool _shouldReconnect = true; int _sseFailures = 0; Timer? _pollTimer; Map? _previousPollState; Stream get events => _eventController.stream; Stream get connectionState => _connectionController.stream; SseService(this._api); Future connect() async { _shouldReconnect = true; _reconnectAttempt = 0; _sseFailures = 0; await _doConnect(); } Future _doConnect() async { // After 2 SSE failures, fall back to polling if (_sseFailures >= 2) { debugPrint('SSE: falling back to polling after $_sseFailures failures'); _startPolling(); return; } _cancelToken?.cancel(); _cancelToken = CancelToken(); // Timer to detect if SSE stream never delivers data (Apache buffering) Timer? firstDataTimer; bool gotData = false; try { final token = await _storage.read(key: 'access_token'); debugPrint('SSE: connecting via stream (attempt ${_sseFailures + 1})'); firstDataTimer = Timer(const Duration(seconds: 8), () { if (!gotData) { debugPrint('SSE: no data received in 8s, cancelling'); _cancelToken?.cancel(); } }); final response = await _api.dio.get( '/stream/events', options: Options( headers: {'Authorization': 'Bearer $token'}, responseType: ResponseType.stream, receiveTimeout: Duration.zero, ), cancelToken: _cancelToken, ); debugPrint('SSE: connected, status=${response.statusCode}'); _connectionController.add(true); _reconnectAttempt = 0; _sseFailures = 0; final stream = response.data.stream as Stream>; String buffer = ''; await for (final chunk in stream) { if (!gotData) { gotData = true; firstDataTimer.cancel(); debugPrint('SSE: first data received'); } buffer += utf8.decode(chunk); final lines = buffer.split('\n'); buffer = lines.removeLast(); String? eventName; String? dataStr; for (final line in lines) { if (line.startsWith('event:')) { eventName = line.substring(6).trim(); } else if (line.startsWith('data:')) { dataStr = line.substring(5).trim(); } else if (line.isEmpty && eventName != null && dataStr != null) { try { final data = jsonDecode(dataStr) as Map; _eventController.add(SseEvent(event: eventName, data: data)); } catch (_) {} eventName = null; dataStr = null; } } } } catch (e) { firstDataTimer?.cancel(); // Distinguish user-initiated cancel from timeout cancel if (e is DioException && e.type == DioExceptionType.cancel) { if (!gotData && _shouldReconnect) { // Cancelled by our firstDataTimer — count as SSE failure debugPrint('SSE: stream timed out (no data), failure ${_sseFailures + 1}'); _sseFailures++; _connectionController.add(false); } else { return; // User-initiated disconnect } } else { debugPrint('SSE: stream error: $e'); _sseFailures++; _connectionController.add(false); } } if (_shouldReconnect) { _scheduleReconnect(); } } void _scheduleReconnect() { _reconnectTimer?.cancel(); final delay = Duration( milliseconds: min( AppConfig.sseMaxReconnect.inMilliseconds, AppConfig.sseReconnectBase.inMilliseconds * pow(2, _reconnectAttempt).toInt(), ), ); _reconnectAttempt++; _reconnectTimer = Timer(delay, _doConnect); } // Polling fallback when SSE streaming doesn't work void _startPolling() { _pollTimer?.cancel(); _previousPollState = null; _poll(); _pollTimer = Timer.periodic(const Duration(seconds: 5), (_) => _poll()); } Future _poll() async { if (!_shouldReconnect) return; try { final response = await _api.dio.get('/stream/poll'); final data = Map.from(response.data); _connectionController.add(true); if (_previousPollState != null) { _diffAndEmit(_previousPollState!, data); } _previousPollState = data; } catch (e) { debugPrint('SSE poll error: $e'); _connectionController.add(false); } } void _diffAndEmit(Map prev, Map curr) { final prevStatus = prev['agent_status']?.toString(); final currStatus = curr['agent_status']?.toString(); if (prevStatus != currStatus) { _eventController.add(SseEvent( event: 'agent_status_changed', data: (curr['agent_status'] as Map?) ?? {}, )); } final prevQueues = prev['queues'] as Map? ?? {}; final currQueues = curr['queues'] as Map? ?? {}; for (final entry in currQueues.entries) { final currQueue = Map.from(entry.value); final prevQueue = prevQueues[entry.key] as Map?; if (prevQueue == null) { _eventController.add(SseEvent(event: 'queue_added', data: currQueue)); continue; } final currCount = currQueue['waiting_count'] as int? ?? 0; final prevCount = prevQueue['waiting_count'] as int? ?? 0; if (currCount > prevCount) { _eventController.add(SseEvent(event: 'call_enqueued', data: currQueue)); } else if (currCount < prevCount) { _eventController.add(SseEvent(event: 'call_dequeued', data: currQueue)); } } final prevCall = prev['current_call']?.toString(); final currCall = curr['current_call']?.toString(); if (prevCall != currCall) { if (curr['current_call'] != null && prev['current_call'] == null) { _eventController.add(SseEvent( event: 'call_started', data: curr['current_call'] as Map, )); } else if (curr['current_call'] == null && prev['current_call'] != null) { _eventController.add(SseEvent( event: 'call_ended', data: prev['current_call'] as Map, )); } } } void disconnect() { _shouldReconnect = false; _reconnectTimer?.cancel(); _pollTimer?.cancel(); _pollTimer = null; _cancelToken?.cancel(); _connectionController.add(false); } void dispose() { disconnect(); _eventController.close(); _connectionController.close(); } }