Files
MP-Server/relay_client.py
jknapp f87dab6bc2 Add debug output for relay URL update issue
- Always trigger on_session_id callback when server returns session ID
- Add debug prints to trace URL update flow

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-05 20:56:30 -08:00

236 lines
8.1 KiB
Python

# Relay Client for MacroPad Server
# Connects to relay server and forwards API requests to local server
import asyncio
import json
import threading
import time
from typing import Optional, Callable
import aiohttp
class RelayClient:
"""WebSocket client that connects to relay server and proxies requests."""
def __init__(
self,
relay_url: str,
password: str,
session_id: Optional[str] = None,
local_port: int = 40000,
on_connected: Optional[Callable] = None,
on_disconnected: Optional[Callable] = None,
on_session_id: Optional[Callable[[str], None]] = None
):
self.relay_url = relay_url.rstrip('/')
if not self.relay_url.endswith('/desktop'):
self.relay_url += '/desktop'
self.password = password
self.session_id = session_id
self.local_url = f"http://localhost:{local_port}"
# Callbacks
self.on_connected = on_connected
self.on_disconnected = on_disconnected
self.on_session_id = on_session_id
# State
self._ws = None
self._session = None
self._running = False
self._connected = False
self._thread = None
self._loop = None
self._reconnect_delay = 1
def start(self):
"""Start the relay client in a background thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run_async_loop, daemon=True)
self._thread.start()
def stop(self):
"""Stop the relay client."""
self._running = False
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread:
self._thread.join(timeout=2)
self._thread = None
def is_connected(self) -> bool:
"""Check if connected to relay server."""
return self._connected
def _run_async_loop(self):
"""Run the asyncio event loop in the background thread."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._connection_loop())
except Exception as e:
print(f"Relay client error: {e}")
finally:
self._loop.close()
async def _connection_loop(self):
"""Main connection loop with reconnection logic."""
while self._running:
try:
await self._connect_and_run()
except Exception as e:
print(f"Relay connection error: {e}")
if self._running:
# Exponential backoff for reconnection
await asyncio.sleep(self._reconnect_delay)
self._reconnect_delay = min(self._reconnect_delay * 2, 30)
async def _connect_and_run(self):
"""Connect to relay server and handle messages."""
try:
async with aiohttp.ClientSession() as session:
self._session = session
async with session.ws_connect(self.relay_url) as ws:
self._ws = ws
# Authenticate
if not await self._authenticate():
return
self._connected = True
self._reconnect_delay = 1 # Reset backoff on successful connect
if self.on_connected:
self.on_connected()
# Message handling loop
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(json.loads(msg.data))
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
except aiohttp.ClientError as e:
print(f"Relay connection failed: {e}")
finally:
self._connected = False
self._ws = None
self._session = None
if self.on_disconnected:
self.on_disconnected()
async def _authenticate(self) -> bool:
"""Authenticate with the relay server."""
auth_msg = {
"type": "auth",
"sessionId": self.session_id,
"password": self.password
}
await self._ws.send_json(auth_msg)
# Wait for auth response
response = await self._ws.receive_json()
if response.get("type") == "auth_response":
if response.get("success"):
# Mark as connected before callbacks so update_ip_label works
self._connected = True
new_session_id = response.get("sessionId")
# Always update session_id and trigger callback to ensure URL updates
if new_session_id:
self.session_id = new_session_id
if self.on_session_id:
self.on_session_id(new_session_id)
return True
else:
print(f"Authentication failed: {response.get('error', 'Unknown error')}")
return False
return False
async def _handle_message(self, msg: dict):
"""Handle a message from the relay server."""
msg_type = msg.get("type")
if msg_type == "api_request":
await self._handle_api_request(msg)
elif msg_type == "ws_message":
# Forward WebSocket message from web client
await self._handle_ws_message(msg)
elif msg_type == "ping":
await self._ws.send_json({"type": "pong"})
async def _handle_api_request(self, msg: dict):
"""Forward API request to local server and send response back."""
request_id = msg.get("requestId")
method = msg.get("method", "GET").upper()
path = msg.get("path", "/")
body = msg.get("body")
headers = msg.get("headers", {})
url = f"{self.local_url}{path}"
try:
# Forward request to local server
async with self._session.request(
method,
url,
json=body if body and method in ("POST", "PUT", "PATCH") else None,
headers=headers
) as response:
# Handle binary responses (images)
content_type = response.headers.get("Content-Type", "")
if content_type.startswith("image/"):
# Base64 encode binary data
import base64
data = await response.read()
response_body = {
"base64": base64.b64encode(data).decode("utf-8"),
"contentType": content_type
}
else:
try:
response_body = await response.json()
except:
response_body = {"text": await response.text()}
await self._ws.send_json({
"type": "api_response",
"requestId": request_id,
"status": response.status,
"body": response_body
})
except Exception as e:
await self._ws.send_json({
"type": "api_response",
"requestId": request_id,
"status": 500,
"body": {"error": str(e)}
})
async def _handle_ws_message(self, msg: dict):
"""Handle WebSocket message from web client."""
data = msg.get("data", {})
# For now, we don't need to forward messages from web clients
# to the local server because the local server broadcasts changes
# The relay will handle broadcasting back to web clients
pass
async def broadcast(self, data: dict):
"""Broadcast a message to all connected web clients via relay."""
if self._ws and self._connected:
await self._ws.send_json({
"type": "ws_broadcast",
"data": data
})