auth = new TWP_Mobile_Auth(); } /** * Register SSE endpoint */ public function register_endpoints() { add_action('rest_api_init', function() { register_rest_route('twilio-mobile/v1', '/stream/events', array( 'methods' => 'GET', 'callback' => array($this, 'stream_events'), 'permission_callback' => array($this->auth, 'verify_token') )); }); } /** * Stream events to mobile app */ public function stream_events($request) { $user_id = $this->auth->get_current_user_id(); if (!$user_id) { return new WP_Error('unauthorized', 'Invalid token', array('status' => 401)); } // Set headers for SSE header('Content-Type: text/event-stream'); header('Cache-Control: no-cache'); header('Connection: keep-alive'); header('X-Accel-Buffering: no'); // Disable nginx buffering // Disable PHP output buffering if (function_exists('apache_setenv')) { @apache_setenv('no-gzip', '1'); } @ini_set('zlib.output_compression', 0); @ini_set('implicit_flush', 1); ob_implicit_flush(1); while (ob_get_level() > 0) { ob_end_flush(); } // Send initial connection event $this->send_event('connected', array('user_id' => $user_id, 'timestamp' => time())); // Get initial state $last_check = time(); $previous_state = $this->get_current_state($user_id); // Stream loop - check for changes every 2 seconds $max_duration = 300; // 5 minutes max connection time $start_time = time(); while (time() - $start_time < $max_duration) { // Check if connection is still alive if (connection_aborted()) { break; } // Get current state $current_state = $this->get_current_state($user_id); // Compare and send updates $this->check_and_send_updates($previous_state, $current_state); // Update previous state $previous_state = $current_state; // Send heartbeat every 15 seconds if (time() - $last_check >= 15) { $this->send_event('heartbeat', array('timestamp' => time())); $last_check = time(); } // Sleep for 2 seconds sleep(2); } // Connection closing $this->send_event('disconnect', array('reason' => 'timeout', 'timestamp' => time())); exit; } /** * Get current state for agent */ private function get_current_state($user_id) { global $wpdb; $state = array( 'agent_status' => $this->get_agent_status($user_id), 'queues' => $this->get_queues_state($user_id), 'current_call' => $this->get_current_call($user_id) ); return $state; } /** * Get agent status */ private function get_agent_status($user_id) { global $wpdb; $table = $wpdb->prefix . 'twp_agent_status'; $status = $wpdb->get_row($wpdb->prepare( "SELECT status, is_logged_in, current_call_sid FROM $table WHERE user_id = %d", $user_id )); if (!$status) { return array('status' => 'offline', 'is_logged_in' => false, 'current_call_sid' => null); } return array( 'status' => $status->status, 'is_logged_in' => (bool)$status->is_logged_in, 'current_call_sid' => $status->current_call_sid ); } /** * Get queues state */ private function get_queues_state($user_id) { global $wpdb; $queues_table = $wpdb->prefix . 'twp_call_queues'; $calls_table = $wpdb->prefix . 'twp_queued_calls'; $assignments_table = $wpdb->prefix . 'twp_queue_assignments'; // Get queue IDs $queue_ids = $wpdb->get_col($wpdb->prepare( "SELECT queue_id FROM $assignments_table WHERE user_id = %d", $user_id )); $personal_queue_ids = $wpdb->get_col($wpdb->prepare( "SELECT id FROM $queues_table WHERE user_id = %d", $user_id )); $all_queue_ids = array_unique(array_merge($queue_ids, $personal_queue_ids)); if (empty($all_queue_ids)) { return array(); } $queue_ids_str = implode(',', array_map('intval', $all_queue_ids)); $queues = $wpdb->get_results(" SELECT q.id, q.queue_name, COUNT(c.id) as waiting_count, MIN(c.enqueued_at) as oldest_call_time FROM $queues_table q LEFT JOIN $calls_table c ON q.id = c.queue_id AND c.status = 'waiting' WHERE q.id IN ($queue_ids_str) GROUP BY q.id "); $result = array(); foreach ($queues as $queue) { $result[$queue->id] = array( 'id' => (int)$queue->id, 'name' => $queue->queue_name, 'waiting_count' => (int)$queue->waiting_count, 'oldest_call_time' => $queue->oldest_call_time ); } return $result; } /** * Get current call for agent */ private function get_current_call($user_id) { global $wpdb; $calls_table = $wpdb->prefix . 'twp_queued_calls'; $agent_number = get_user_meta($user_id, 'twp_agent_phone', true); if (!$agent_number) { return null; } $call = $wpdb->get_row($wpdb->prepare( "SELECT call_sid, from_number, queue_id, status, joined_at FROM $calls_table WHERE agent_phone = %s AND status IN ('connecting', 'in_progress') ORDER BY joined_at DESC LIMIT 1", $agent_number )); if (!$call) { return null; } return array( 'call_sid' => $call->call_sid, 'from_number' => $call->from_number, 'queue_id' => (int)$call->queue_id, 'status' => $call->status, 'duration' => time() - strtotime($call->joined_at) ); } /** * Check state changes and send updates */ private function check_and_send_updates($previous, $current) { // Check agent status changes if ($previous['agent_status'] !== $current['agent_status']) { $this->send_event('agent_status_changed', $current['agent_status']); } // Check queue changes $this->check_queue_changes($previous['queues'], $current['queues']); // Check current call changes if ($previous['current_call'] !== $current['current_call']) { if ($current['current_call'] && !$previous['current_call']) { // New call started $this->send_event('call_started', $current['current_call']); } elseif (!$current['current_call'] && $previous['current_call']) { // Call ended $this->send_event('call_ended', $previous['current_call']); } elseif ($current['current_call'] && $previous['current_call']) { // Call status changed if ($current['current_call']['status'] !== $previous['current_call']['status']) { $this->send_event('call_status_changed', $current['current_call']); } } } } /** * Check for queue changes */ private function check_queue_changes($previous_queues, $current_queues) { foreach ($current_queues as $queue_id => $current_queue) { $previous_queue = $previous_queues[$queue_id] ?? null; if (!$previous_queue) { // New queue added $this->send_event('queue_added', $current_queue); continue; } // Check for waiting count changes if ($current_queue['waiting_count'] !== $previous_queue['waiting_count']) { if ($current_queue['waiting_count'] > $previous_queue['waiting_count']) { // New call in queue $this->send_event('call_enqueued', array( 'queue_id' => $queue_id, 'queue_name' => $current_queue['name'], 'waiting_count' => $current_queue['waiting_count'] )); } else { // Call removed from queue $this->send_event('call_dequeued', array( 'queue_id' => $queue_id, 'queue_name' => $current_queue['name'], 'waiting_count' => $current_queue['waiting_count'] )); } } } // Check for removed queues foreach ($previous_queues as $queue_id => $previous_queue) { if (!isset($current_queues[$queue_id])) { $this->send_event('queue_removed', array('queue_id' => $queue_id)); } } } /** * Send SSE event */ private function send_event($event_type, $data) { echo "event: $event_type\n"; echo "data: " . json_encode($data) . "\n\n"; if (ob_get_level() > 0) { ob_flush(); } flush(); } }