# Relay Client for MacroPad Server # Connects to relay server and forwards API requests to local server import asyncio import json import threading import time from typing import Optional, Callable import aiohttp class RelayClient: """WebSocket client that connects to relay server and proxies requests.""" def __init__( self, relay_url: str, password: str, session_id: Optional[str] = None, local_port: int = 40000, on_connected: Optional[Callable] = None, on_disconnected: Optional[Callable] = None, on_session_id: Optional[Callable[[str], None]] = None ): self.relay_url = relay_url.rstrip('/') if not self.relay_url.endswith('/desktop'): self.relay_url += '/desktop' self.password = password self.session_id = session_id self.local_url = f"http://localhost:{local_port}" # Callbacks self.on_connected = on_connected self.on_disconnected = on_disconnected self.on_session_id = on_session_id # State self._ws = None self._session = None self._running = False self._connected = False self._thread = None self._loop = None self._reconnect_delay = 1 def start(self): """Start the relay client in a background thread.""" if self._running: return self._running = True self._thread = threading.Thread(target=self._run_async_loop, daemon=True) self._thread.start() def stop(self): """Stop the relay client.""" self._running = False if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._thread: self._thread.join(timeout=2) self._thread = None def is_connected(self) -> bool: """Check if connected to relay server.""" return self._connected def _run_async_loop(self): """Run the asyncio event loop in the background thread.""" self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: self._loop.run_until_complete(self._connection_loop()) except Exception as e: print(f"Relay client error: {e}") finally: self._loop.close() async def _connection_loop(self): """Main connection loop with reconnection logic.""" while self._running: try: await self._connect_and_run() except Exception as e: print(f"Relay connection error: {e}") if self._running: # Exponential backoff for reconnection await asyncio.sleep(self._reconnect_delay) self._reconnect_delay = min(self._reconnect_delay * 2, 30) async def _connect_and_run(self): """Connect to relay server and handle messages.""" try: async with aiohttp.ClientSession() as session: self._session = session async with session.ws_connect(self.relay_url) as ws: self._ws = ws # Authenticate if not await self._authenticate(): return self._connected = True self._reconnect_delay = 1 # Reset backoff on successful connect if self.on_connected: self.on_connected() # Message handling loop async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: await self._handle_message(json.loads(msg.data)) elif msg.type == aiohttp.WSMsgType.ERROR: print(f"WebSocket error: {ws.exception()}") break elif msg.type == aiohttp.WSMsgType.CLOSED: break except aiohttp.ClientError as e: print(f"Relay connection failed: {e}") finally: self._connected = False self._ws = None self._session = None if self.on_disconnected: self.on_disconnected() async def _authenticate(self) -> bool: """Authenticate with the relay server.""" auth_msg = { "type": "auth", "sessionId": self.session_id, "password": self.password } await self._ws.send_json(auth_msg) # Wait for auth response response = await self._ws.receive_json() if response.get("type") == "auth_response": if response.get("success"): # Mark as connected before callbacks so update_ip_label works self._connected = True new_session_id = response.get("sessionId") if new_session_id and new_session_id != self.session_id: self.session_id = new_session_id if self.on_session_id: self.on_session_id(new_session_id) return True else: print(f"Authentication failed: {response.get('error', 'Unknown error')}") return False return False async def _handle_message(self, msg: dict): """Handle a message from the relay server.""" msg_type = msg.get("type") if msg_type == "api_request": await self._handle_api_request(msg) elif msg_type == "ws_message": # Forward WebSocket message from web client await self._handle_ws_message(msg) elif msg_type == "ping": await self._ws.send_json({"type": "pong"}) async def _handle_api_request(self, msg: dict): """Forward API request to local server and send response back.""" request_id = msg.get("requestId") method = msg.get("method", "GET").upper() path = msg.get("path", "/") body = msg.get("body") headers = msg.get("headers", {}) url = f"{self.local_url}{path}" try: # Forward request to local server async with self._session.request( method, url, json=body if body and method in ("POST", "PUT", "PATCH") else None, headers=headers ) as response: # Handle binary responses (images) content_type = response.headers.get("Content-Type", "") if content_type.startswith("image/"): # Base64 encode binary data import base64 data = await response.read() response_body = { "base64": base64.b64encode(data).decode("utf-8"), "contentType": content_type } else: try: response_body = await response.json() except: response_body = {"text": await response.text()} await self._ws.send_json({ "type": "api_response", "requestId": request_id, "status": response.status, "body": response_body }) except Exception as e: await self._ws.send_json({ "type": "api_response", "requestId": request_id, "status": 500, "body": {"error": str(e)} }) async def _handle_ws_message(self, msg: dict): """Handle WebSocket message from web client.""" data = msg.get("data", {}) # For now, we don't need to forward messages from web clients # to the local server because the local server broadcasts changes # The relay will handle broadcasting back to web clients pass async def broadcast(self, data: dict): """Broadcast a message to all connected web clients via relay.""" if self._ws and self._connected: await self._ws.send_json({ "type": "ws_broadcast", "data": data })