- Filter chat events by sessionKey before forwarding to mobile clients
- Only forward messages matching the user's expected session (agent:main:{userId})
- Prevents desktop TUI messages from appearing on mobile devices
- Maintains backwards compatibility for main session messages
956 lines
30 KiB
JavaScript
956 lines
30 KiB
JavaScript
#!/usr/bin/env node
|
|
|
|
import express from 'express';
|
|
import { WebSocketServer, WebSocket } from 'ws';
|
|
import jwt from 'jsonwebtoken';
|
|
import fetch from 'node-fetch';
|
|
import admin from 'firebase-admin';
|
|
import { readFileSync, writeFileSync, existsSync, mkdirSync, statSync } from 'fs';
|
|
import { fileURLToPath } from 'url';
|
|
import { dirname, join, basename } from 'path';
|
|
import multer from 'multer';
|
|
import { spawn } from 'child_process';
|
|
|
|
// Get current directory for ES modules
|
|
const __filename = fileURLToPath(import.meta.url);
|
|
const __dirname = dirname(__filename);
|
|
|
|
// Initialize Firebase Admin SDK
|
|
const serviceAccount = JSON.parse(
|
|
readFileSync(join(__dirname, 'service-account.json'), 'utf8')
|
|
);
|
|
|
|
admin.initializeApp({
|
|
credential: admin.credential.cert(serviceAccount)
|
|
});
|
|
|
|
console.log('[firebase] Firebase Admin SDK initialized');
|
|
|
|
// ElevenLabs API key for TTS
|
|
const ELEVENLABS_API_KEY = process.env.ELEVENLABS_API_KEY || '';
|
|
|
|
// Configuration (use environment variables)
|
|
const config = {
|
|
port: parseInt(process.env.PROXY_PORT || '18790', 10),
|
|
openclawUrl: process.env.OPENCLAW_URL || 'ws://127.0.0.1:18789',
|
|
openclawToken: process.env.OPENCLAW_TOKEN || '',
|
|
authentikUrl: process.env.AUTHENTIK_URL || '',
|
|
authentikClientId: process.env.AUTHENTIK_CLIENT_ID || '',
|
|
requireAuth: process.env.REQUIRE_AUTH !== 'false',
|
|
};
|
|
|
|
// Health check endpoint and notification API
|
|
const app = express();
|
|
app.use(express.json());
|
|
|
|
app.get('/health', (req, res) => {
|
|
res.json({ status: 'ok', service: 'alfred-proxy' });
|
|
});
|
|
|
|
// Track connected mobile clients for notifications
|
|
const connectedClients = new Set();
|
|
|
|
// Track FCM tokens by user ID
|
|
const fcmTokens = new Map(); // userId -> Set of FCM tokens
|
|
|
|
// Token persistence file
|
|
const tokensFile = join(__dirname, 'fcm-tokens.json');
|
|
|
|
// User preferences storage
|
|
const usersDir = join(__dirname, 'users');
|
|
const userPreferences = new Map(); // userId -> preferences object
|
|
|
|
// Ensure users directory exists
|
|
if (!existsSync(usersDir)) {
|
|
mkdirSync(usersDir, { recursive: true });
|
|
console.log(`[prefs] Created users directory: ${usersDir}`);
|
|
}
|
|
|
|
// Load user preferences from disk
|
|
function loadUserPreferences(userId) {
|
|
try {
|
|
const userFile = join(usersDir, `${userId}.json`);
|
|
if (existsSync(userFile)) {
|
|
const data = JSON.parse(readFileSync(userFile, 'utf8'));
|
|
userPreferences.set(userId, data);
|
|
console.log(`[prefs] Loaded preferences for user ${userId}`);
|
|
return data;
|
|
} else {
|
|
console.log(`[prefs] No preferences file for user ${userId}, using defaults`);
|
|
return null;
|
|
}
|
|
} catch (error) {
|
|
console.error(`[prefs] Error loading preferences for user ${userId}:`, error.message);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// Save user preferences to disk
|
|
function saveUserPreferences(userId, preferences) {
|
|
try {
|
|
const userFile = join(usersDir, `${userId}.json`);
|
|
writeFileSync(userFile, JSON.stringify(preferences, null, 2), 'utf8');
|
|
userPreferences.set(userId, preferences);
|
|
console.log(`[prefs] Saved preferences for user ${userId}`);
|
|
|
|
// Also write to workspace for agent visibility (optional)
|
|
try {
|
|
const workspaceUsersDir = '/home/jknapp/.openclaw/workspace/users';
|
|
if (!existsSync(workspaceUsersDir)) {
|
|
mkdirSync(workspaceUsersDir, { recursive: true });
|
|
}
|
|
const workspaceUserFile = join(workspaceUsersDir, `${userId}.json`);
|
|
writeFileSync(workspaceUserFile, JSON.stringify(preferences, null, 2), 'utf8');
|
|
console.log(`[prefs] Synced preferences to workspace for agent`);
|
|
} catch (wsError) {
|
|
console.warn(`[prefs] Could not sync to workspace:`, wsError.message);
|
|
}
|
|
|
|
return true;
|
|
} catch (error) {
|
|
console.error(`[prefs] Error saving preferences for user ${userId}:`, error.message);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Get user preferences (from cache or load from disk)
|
|
function getUserPreferences(userId) {
|
|
if (userPreferences.has(userId)) {
|
|
return userPreferences.get(userId);
|
|
}
|
|
return loadUserPreferences(userId);
|
|
}
|
|
|
|
// Load FCM tokens from disk
|
|
function loadFcmTokens() {
|
|
try {
|
|
if (existsSync(tokensFile)) {
|
|
const data = JSON.parse(readFileSync(tokensFile, 'utf8'));
|
|
Object.entries(data).forEach(([userId, tokens]) => {
|
|
fcmTokens.set(userId, new Set(tokens));
|
|
});
|
|
const totalTokens = Array.from(fcmTokens.values()).reduce((sum, set) => sum + set.size, 0);
|
|
console.log(`[fcm] Loaded ${totalTokens} token(s) for ${fcmTokens.size} user(s) from disk`);
|
|
} else {
|
|
console.log('[fcm] No persisted tokens found, starting fresh');
|
|
}
|
|
} catch (error) {
|
|
console.error('[fcm] Error loading tokens from disk:', error.message);
|
|
}
|
|
}
|
|
|
|
// Save FCM tokens to disk
|
|
function saveFcmTokens() {
|
|
try {
|
|
const data = {};
|
|
fcmTokens.forEach((tokens, userId) => {
|
|
data[userId] = Array.from(tokens);
|
|
});
|
|
writeFileSync(tokensFile, JSON.stringify(data, null, 2), 'utf8');
|
|
console.log(`[fcm] Saved tokens to disk`);
|
|
} catch (error) {
|
|
console.error('[fcm] Error saving tokens to disk:', error.message);
|
|
}
|
|
}
|
|
|
|
// Load tokens on startup
|
|
loadFcmTokens();
|
|
|
|
// Notification endpoint (for mobile-notify tool)
|
|
app.post('/api/notify', (req, res) => {
|
|
try {
|
|
const {
|
|
notificationType = 'alert',
|
|
title = 'AI Assistant',
|
|
message,
|
|
priority = 'default',
|
|
sound = true,
|
|
vibrate = true,
|
|
timestamp = Date.now(),
|
|
action = null,
|
|
userId = null
|
|
} = req.body;
|
|
|
|
if (!message) {
|
|
return res.status(400).json({ error: 'message is required' });
|
|
}
|
|
|
|
console.log(`[notify] Broadcasting notification: type=${notificationType} title="${title}" message="${message}"`);
|
|
|
|
// Create notification event
|
|
const notificationEvent = {
|
|
type: 'event',
|
|
event: 'mobile.notification',
|
|
payload: {
|
|
notificationType,
|
|
title,
|
|
message,
|
|
priority,
|
|
sound,
|
|
vibrate,
|
|
timestamp,
|
|
action
|
|
}
|
|
};
|
|
|
|
// Broadcast to all connected clients
|
|
let sentCount = 0;
|
|
const notificationJson = JSON.stringify(notificationEvent);
|
|
console.log(`[notify] Broadcast event: ${notificationJson}`);
|
|
|
|
connectedClients.forEach(client => {
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
client.send(notificationJson);
|
|
console.log(`[notify] Sent to client (ready state: ${client.readyState})`);
|
|
sentCount++;
|
|
} else {
|
|
console.log(`[notify] Skipped client (ready state: ${client.readyState})`);
|
|
}
|
|
});
|
|
|
|
console.log(`[notify] Sent notification to ${sentCount} connected client(s)`);
|
|
|
|
// Also send FCM notifications to devices that might be asleep
|
|
let fcmSentCount = 0;
|
|
const allTokens = [];
|
|
|
|
// Filter tokens by userId if specified
|
|
if (userId) {
|
|
console.log(`[fcm] Filtering notifications for user: ${userId}`);
|
|
const userTokens = fcmTokens.get(userId);
|
|
if (userTokens) {
|
|
userTokens.forEach(token => allTokens.push(token));
|
|
} else {
|
|
console.log(`[fcm] No tokens found for user ${userId}`);
|
|
}
|
|
} else {
|
|
// Send to all users if no userId specified
|
|
fcmTokens.forEach((tokens, uid) => {
|
|
tokens.forEach(token => allTokens.push(token));
|
|
});
|
|
}
|
|
|
|
if (allTokens.length > 0) {
|
|
console.log(`[fcm] Sending push notification to ${allTokens.length} registered device(s)`);
|
|
|
|
// Send FCM notification (don't wait for it)
|
|
admin.messaging().sendEachForMulticast({
|
|
tokens: allTokens,
|
|
data: {
|
|
notificationType,
|
|
title,
|
|
message,
|
|
priority,
|
|
sound: sound.toString(),
|
|
vibrate: vibrate.toString(),
|
|
timestamp: timestamp.toString(),
|
|
action: action || ''
|
|
},
|
|
android: {
|
|
priority: notificationType === 'alarm' ? 'high' : 'normal',
|
|
ttl: 60000 // 60 seconds
|
|
}
|
|
})
|
|
.then(response => {
|
|
console.log(`[fcm] Successfully sent ${response.successCount} message(s)`);
|
|
if (response.failureCount > 0) {
|
|
console.log(`[fcm] Failed to send ${response.failureCount} message(s)`);
|
|
response.responses.forEach((resp, idx) => {
|
|
if (!resp.success) {
|
|
console.log(`[fcm] Error for token ${allTokens[idx].substring(0, 20)}: ${resp.error}`);
|
|
}
|
|
});
|
|
}
|
|
fcmSentCount = response.successCount;
|
|
})
|
|
.catch(error => {
|
|
console.error('[fcm] Error sending push notification:', error);
|
|
});
|
|
} else {
|
|
console.log('[fcm] No FCM tokens registered, skipping push notification');
|
|
}
|
|
|
|
res.json({ success: true, clients: sentCount, fcm: allTokens.length });
|
|
} catch (err) {
|
|
console.error('[notify] Error broadcasting notification:', err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// Alarm dismissal endpoint - broadcast dismissal to all user's devices
|
|
app.post('/api/alarm/dismiss', (req, res) => {
|
|
try {
|
|
const { userId, alarmId } = req.body;
|
|
|
|
if (!userId || !alarmId) {
|
|
return res.status(400).json({ error: 'userId and alarmId are required' });
|
|
}
|
|
|
|
console.log(`[alarm] Broadcasting dismissal for alarm ${alarmId} to user ${userId}`);
|
|
|
|
// Create dismissal event
|
|
const dismissalEvent = {
|
|
type: 'alarm_dismiss',
|
|
alarmId: alarmId,
|
|
timestamp: Date.now()
|
|
};
|
|
|
|
// Broadcast to all connected WebSocket clients for this user
|
|
let sentCount = 0;
|
|
const dismissalJson = JSON.stringify(dismissalEvent);
|
|
|
|
connectedClients.forEach(client => {
|
|
if (client.userId === userId && client.readyState === WebSocket.OPEN) {
|
|
client.send(dismissalJson);
|
|
console.log(`[alarm] Sent dismissal to WebSocket client for user ${userId}`);
|
|
sentCount++;
|
|
}
|
|
});
|
|
|
|
console.log(`[alarm] Sent dismissal to ${sentCount} WebSocket client(s)`);
|
|
|
|
// Also send FCM to devices that might be offline/asleep
|
|
const userTokens = fcmTokens.get(userId) || new Set();
|
|
const tokens = Array.from(userTokens);
|
|
|
|
if (tokens.length > 0) {
|
|
admin.messaging().sendEachForMulticast({
|
|
tokens: tokens,
|
|
data: {
|
|
type: 'alarm_dismiss',
|
|
alarmId: alarmId,
|
|
timestamp: Date.now().toString()
|
|
},
|
|
android: {
|
|
priority: 'high'
|
|
}
|
|
})
|
|
.then(response => {
|
|
console.log(`[fcm] Sent dismissal to ${response.successCount}/${tokens.length} FCM devices`);
|
|
})
|
|
.catch(error => {
|
|
console.error('[fcm] Error sending dismissal via FCM:', error);
|
|
});
|
|
} else {
|
|
console.log('[fcm] No FCM tokens for user, skipping push dismissal');
|
|
}
|
|
|
|
res.json({ success: true, websocket: sentCount, fcm: tokens.length });
|
|
} catch (err) {
|
|
console.error('[alarm] Error broadcasting dismissal:', err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// File upload configuration
|
|
const uploadsDir = join(__dirname, 'uploads');
|
|
if (!existsSync(uploadsDir)) {
|
|
mkdirSync(uploadsDir, { recursive: true });
|
|
console.log(`[upload] Created uploads directory: ${uploadsDir}`);
|
|
}
|
|
|
|
const storage = multer.diskStorage({
|
|
destination: (req, file, cb) => {
|
|
cb(null, uploadsDir);
|
|
},
|
|
filename: (req, file, cb) => {
|
|
// Generate unique filename with timestamp
|
|
const uniqueSuffix = `${Date.now()}-${Math.round(Math.random() * 1E9)}`;
|
|
const ext = file.originalname.split('.').pop();
|
|
cb(null, `${file.fieldname}-${uniqueSuffix}.${ext}`);
|
|
}
|
|
});
|
|
|
|
const upload = multer({
|
|
storage: storage,
|
|
limits: {
|
|
fileSize: 100 * 1024 * 1024 // 100MB limit
|
|
}
|
|
});
|
|
|
|
// File upload endpoint
|
|
app.post('/api/upload', upload.single('file'), (req, res) => {
|
|
try {
|
|
if (!req.file) {
|
|
return res.status(400).json({ error: 'No file uploaded' });
|
|
}
|
|
|
|
const filePath = join(uploadsDir, req.file.filename);
|
|
const fileUrl = `/uploads/${req.file.filename}`;
|
|
|
|
console.log(`[upload] File uploaded: ${req.file.originalname} -> ${req.file.filename}`);
|
|
console.log(`[upload] Size: ${(req.file.size / 1024 / 1024).toFixed(2)} MB`);
|
|
console.log(`[upload] Type: ${req.file.mimetype}`);
|
|
|
|
res.json({
|
|
success: true,
|
|
filename: req.file.filename,
|
|
originalName: req.file.originalname,
|
|
size: req.file.size,
|
|
mimeType: req.file.mimetype,
|
|
path: filePath,
|
|
url: fileUrl
|
|
});
|
|
} catch (err) {
|
|
console.error('[upload] Error handling file upload:', err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
/**
|
|
* Text-to-speech endpoint using SAG (file-based)
|
|
*/
|
|
app.post('/api/tts', async (req, res) => {
|
|
const { text, voiceId } = req.body;
|
|
|
|
if (!text) {
|
|
return res.status(400).json({ error: 'text is required' });
|
|
}
|
|
|
|
try {
|
|
// Create temp file for audio output
|
|
const tempFile = join(uploadsDir, `tts-${Date.now()}.mp3`);
|
|
|
|
// Build SAG command
|
|
const args = ['-o', tempFile];
|
|
if (voiceId) {
|
|
args.push('-v', voiceId);
|
|
}
|
|
|
|
console.log(`[tts] Generating speech for text: "${text.substring(0, 50)}..." voice: ${voiceId || 'default'}`);
|
|
|
|
// Spawn SAG process with 120 second timeout for long responses
|
|
const sagPath = '/home/linuxbrew/.linuxbrew/bin/sag';
|
|
const sagProcess = spawn(sagPath, args, {
|
|
timeout: 120000, // 2 minutes for very long responses
|
|
env: {
|
|
...process.env,
|
|
ELEVENLABS_API_KEY: ELEVENLABS_API_KEY
|
|
}
|
|
});
|
|
|
|
let stderr = '';
|
|
|
|
sagProcess.stderr.on('data', (data) => {
|
|
stderr += data.toString();
|
|
});
|
|
|
|
// Write text to stdin
|
|
sagProcess.stdin.write(text);
|
|
sagProcess.stdin.end();
|
|
|
|
sagProcess.on('close', (code) => {
|
|
if (code !== 0) {
|
|
console.error(`[tts] SAG failed with code ${code}: ${stderr}`);
|
|
return res.status(500).json({ error: `TTS failed: ${stderr}` });
|
|
}
|
|
|
|
// Check if file was created
|
|
if (!existsSync(tempFile)) {
|
|
console.error('[tts] SAG succeeded but no audio file created');
|
|
return res.status(500).json({ error: 'No audio file generated' });
|
|
}
|
|
|
|
const audioUrl = `/uploads/${basename(tempFile)}`;
|
|
|
|
console.log(`[tts] Generated audio: ${basename(tempFile)} (${statSync(tempFile).size} bytes)`);
|
|
|
|
res.json({
|
|
success: true,
|
|
audioUrl: audioUrl,
|
|
filename: basename(tempFile)
|
|
});
|
|
});
|
|
|
|
sagProcess.on('error', (err) => {
|
|
console.error('[tts] SAG spawn error:', err);
|
|
res.status(500).json({ error: `Failed to spawn SAG: ${err.message}` });
|
|
});
|
|
|
|
} catch (err) {
|
|
console.error('[tts] Error:', err);
|
|
res.status(500).json({ error: err.message });
|
|
}
|
|
});
|
|
|
|
// Streaming endpoint removed - SAG doesn't easily support stdout streaming
|
|
// The file-based endpoint with extended timeout is sufficient for now
|
|
|
|
// Serve uploaded files
|
|
app.use('/uploads', express.static(uploadsDir));
|
|
|
|
const httpServer = app.listen(config.port, () => {
|
|
console.log(`[alfred-proxy] HTTP server listening on port ${config.port}`);
|
|
console.log(`[alfred-proxy] WebSocket endpoint: ws://localhost:${config.port}`);
|
|
console.log(`[alfred-proxy] OpenClaw target: ${config.openclawUrl}`);
|
|
console.log(`[alfred-proxy] Auth required: ${config.requireAuth}`);
|
|
});
|
|
|
|
// WebSocket server
|
|
const wss = new WebSocketServer({ server: httpServer });
|
|
|
|
/**
|
|
* Extract stable user ID from userInfo.
|
|
* Uses same logic as Android app: preferred_username > email > sub
|
|
*/
|
|
function extractUserId(userInfo) {
|
|
return userInfo.preferred_username || userInfo.email || userInfo.sub || 'unknown';
|
|
}
|
|
|
|
/**
|
|
* Validate OAuth token with Authentik's userinfo endpoint
|
|
*/
|
|
async function validateAuthentikToken(token) {
|
|
if (!config.requireAuth) {
|
|
console.log('[auth] Auth disabled, skipping validation');
|
|
return { valid: true, user: { sub: 'dev-user', email: 'dev@local' } };
|
|
}
|
|
|
|
try {
|
|
const response = await fetch(`${config.authentikUrl}/application/o/userinfo/`, {
|
|
headers: {
|
|
'Authorization': `Bearer ${token}`,
|
|
},
|
|
});
|
|
|
|
if (!response.ok) {
|
|
const errorText = await response.text();
|
|
console.error(`[auth] Authentik validation failed: ${response.status} - ${errorText}`);
|
|
console.error(`[auth] Token (first 20 chars): ${token.substring(0, 20)}...`);
|
|
return { valid: false, error: 'Invalid token' };
|
|
}
|
|
|
|
const userInfo = await response.json();
|
|
const userId = extractUserId(userInfo);
|
|
console.log(`[auth] Token validated for user: ${userId} (${userInfo.email || userInfo.sub})`);
|
|
|
|
return { valid: true, user: userInfo };
|
|
} catch (err) {
|
|
console.error('[auth] Authentik validation error:', err.message);
|
|
return { valid: false, error: err.message };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Extract OAuth token from WebSocket upgrade request
|
|
*/
|
|
function extractToken(req) {
|
|
// Check Authorization header
|
|
const authHeader = req.headers['authorization'];
|
|
if (authHeader && authHeader.startsWith('Bearer ')) {
|
|
return authHeader.slice(7).trim();
|
|
}
|
|
|
|
// Check query parameter (for testing)
|
|
const url = new URL(req.url, 'ws://localhost');
|
|
const token = url.searchParams.get('token');
|
|
if (token) {
|
|
return token.trim();
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Inject OpenClaw token into connect message
|
|
*/
|
|
function injectOpenClawToken(message) {
|
|
try {
|
|
const msg = JSON.parse(message);
|
|
|
|
// Only inject token into connect messages
|
|
if (msg.type === 'req' && msg.method === 'connect') {
|
|
console.log('[proxy] Injecting OpenClaw token into connect message');
|
|
|
|
if (!msg.params) {
|
|
msg.params = {};
|
|
}
|
|
|
|
if (!msg.params.auth) {
|
|
msg.params.auth = {};
|
|
}
|
|
|
|
// Add OpenClaw gateway token
|
|
msg.params.auth.token = config.openclawToken;
|
|
|
|
// Change webchat mode to cli to bypass origin validation
|
|
if (msg.params.client && msg.params.client.mode === 'webchat') {
|
|
console.log('[proxy] Changing mode from webchat to cli to bypass origin validation');
|
|
msg.params.client.mode = 'cli';
|
|
}
|
|
|
|
return JSON.stringify(msg);
|
|
}
|
|
|
|
// Pass through all other messages unchanged
|
|
return message;
|
|
} catch (err) {
|
|
// If it's not JSON, pass through unchanged
|
|
return message;
|
|
}
|
|
}
|
|
|
|
// Handle WebSocket connections
|
|
wss.on('connection', async (clientWs, req) => {
|
|
const clientIp = req.socket.remoteAddress;
|
|
console.log(`[proxy] New connection from ${clientIp}`);
|
|
|
|
// Extract and validate OAuth token
|
|
const oauthToken = extractToken(req);
|
|
|
|
if (!oauthToken && config.requireAuth) {
|
|
console.log(`[proxy] No OAuth token provided, rejecting connection`);
|
|
clientWs.close(1008, 'Authentication required');
|
|
return;
|
|
}
|
|
|
|
let clientUserId = null;
|
|
|
|
if (oauthToken) {
|
|
const validation = await validateAuthentikToken(oauthToken);
|
|
|
|
if (!validation.valid) {
|
|
console.log(`[proxy] Invalid OAuth token, rejecting connection`);
|
|
clientWs.close(1008, validation.error || 'Invalid token');
|
|
return;
|
|
}
|
|
|
|
const userId = extractUserId(validation.user);
|
|
console.log(`[proxy] Authenticated as ${userId} (${validation.user.email || validation.user.sub})`);
|
|
clientUserId = userId;
|
|
}
|
|
|
|
// Connect to OpenClaw
|
|
console.log(`[proxy] Connecting to OpenClaw at ${config.openclawUrl}`);
|
|
const openclawWs = new WebSocket(config.openclawUrl, {
|
|
headers: {
|
|
'Origin': `http://localhost:${config.port}`
|
|
}
|
|
});
|
|
|
|
let isAlive = true;
|
|
|
|
// Add client to connected clients set (for notification broadcasting)
|
|
// Store userId on the WebSocket client for filtering
|
|
clientWs.userId = clientUserId;
|
|
connectedClients.add(clientWs);
|
|
console.log(`[proxy] Client added to notification broadcast list (total: ${connectedClients.size})`);
|
|
|
|
// Proxy: Client → OpenClaw (inject token)
|
|
clientWs.on('message', (data) => {
|
|
const message = data.toString('utf8');
|
|
|
|
try {
|
|
// Parse message to check for special events
|
|
const parsed = JSON.parse(message);
|
|
|
|
// Handle FCM token registration
|
|
if (parsed.type === 'fcm.register') {
|
|
const fcmToken = parsed.token;
|
|
if (fcmToken && clientUserId) {
|
|
console.log(`[fcm] Registering token for user ${clientUserId}: ${fcmToken.substring(0, 20)}...`);
|
|
|
|
// Store token for this user
|
|
if (!fcmTokens.has(clientUserId)) {
|
|
fcmTokens.set(clientUserId, new Set());
|
|
}
|
|
fcmTokens.get(clientUserId).add(fcmToken);
|
|
|
|
console.log(`[fcm] User ${clientUserId} now has ${fcmTokens.get(clientUserId).size} token(s)`);
|
|
|
|
// Persist to disk
|
|
saveFcmTokens();
|
|
} else {
|
|
console.log('[fcm] Invalid FCM registration: missing token or user ID');
|
|
}
|
|
return; // Don't forward to OpenClaw
|
|
}
|
|
|
|
// Handle alarm dismiss - broadcast to all other clients
|
|
if (parsed.type === 'alarm.dismiss') {
|
|
console.log(`[proxy] Alarm dismiss received: ${parsed.alarmId}`);
|
|
|
|
const dismissEvent = {
|
|
type: 'event',
|
|
event: 'mobile.alarm.dismissed',
|
|
payload: {
|
|
alarmId: parsed.alarmId,
|
|
timestamp: parsed.timestamp || Date.now()
|
|
}
|
|
};
|
|
|
|
const dismissJson = JSON.stringify(dismissEvent);
|
|
let broadcastCount = 0;
|
|
|
|
// Broadcast to all clients (including sender for consistency)
|
|
connectedClients.forEach(client => {
|
|
if (client.readyState === WebSocket.OPEN) {
|
|
client.send(dismissJson);
|
|
broadcastCount++;
|
|
}
|
|
});
|
|
|
|
console.log(`[proxy] Broadcasted alarm dismiss to ${broadcastCount} client(s)`);
|
|
return; // Don't forward to OpenClaw
|
|
}
|
|
|
|
// Handle user preferences update
|
|
if (parsed.type === 'req' && parsed.method === 'user.preferences.update') {
|
|
console.log(`[prefs] Preference update request from user ${clientUserId}`);
|
|
|
|
if (!clientUserId) {
|
|
const errorResponse = {
|
|
type: 'res',
|
|
id: parsed.id,
|
|
ok: false,
|
|
error: 'User ID not available'
|
|
};
|
|
clientWs.send(JSON.stringify(errorResponse));
|
|
return;
|
|
}
|
|
|
|
const currentPrefs = getUserPreferences(clientUserId) || {};
|
|
const updatedPrefs = { ...currentPrefs, ...parsed.params };
|
|
|
|
if (saveUserPreferences(clientUserId, updatedPrefs)) {
|
|
const successResponse = {
|
|
type: 'res',
|
|
id: parsed.id,
|
|
ok: true,
|
|
payload: updatedPrefs
|
|
};
|
|
clientWs.send(JSON.stringify(successResponse));
|
|
console.log(`[prefs] Updated preferences for ${clientUserId}:`, updatedPrefs);
|
|
} else {
|
|
const errorResponse = {
|
|
type: 'res',
|
|
id: parsed.id,
|
|
ok: false,
|
|
error: 'Failed to save preferences'
|
|
};
|
|
clientWs.send(JSON.stringify(errorResponse));
|
|
}
|
|
return; // Don't forward to OpenClaw
|
|
}
|
|
|
|
// Handle user preferences get
|
|
if (parsed.type === 'req' && parsed.method === 'user.preferences.get') {
|
|
console.log(`[prefs] Preference get request from user ${clientUserId}`);
|
|
|
|
if (!clientUserId) {
|
|
const errorResponse = {
|
|
type: 'res',
|
|
id: parsed.id,
|
|
ok: false,
|
|
error: 'User ID not available'
|
|
};
|
|
clientWs.send(JSON.stringify(errorResponse));
|
|
return;
|
|
}
|
|
|
|
const prefs = getUserPreferences(clientUserId) || {};
|
|
const successResponse = {
|
|
type: 'res',
|
|
id: parsed.id,
|
|
ok: true,
|
|
payload: prefs
|
|
};
|
|
clientWs.send(JSON.stringify(successResponse));
|
|
console.log(`[prefs] Retrieved preferences for ${clientUserId}:`, prefs);
|
|
return; // Don't forward to OpenClaw
|
|
}
|
|
} catch (e) {
|
|
// Not JSON or not a special event - continue with normal flow
|
|
}
|
|
|
|
// Forward all other messages to OpenClaw
|
|
if (openclawWs.readyState === WebSocket.OPEN) {
|
|
console.log(`[proxy] Client → OpenClaw (full): ${message}`);
|
|
const modifiedMessage = injectOpenClawToken(message);
|
|
openclawWs.send(modifiedMessage);
|
|
}
|
|
});
|
|
|
|
// Proxy: OpenClaw → Client (inject user preferences into connect response + filter by session)
|
|
openclawWs.on('message', (data) => {
|
|
const message = data.toString('utf8');
|
|
console.log(`[proxy] OpenClaw → Client: ${message.substring(0, 100)}...`);
|
|
console.log(`[proxy] Client WebSocket state: ${clientWs.readyState} (1=OPEN)`);
|
|
|
|
let messageToSend = data;
|
|
let shouldForward = true;
|
|
|
|
// Try to filter and modify messages
|
|
try {
|
|
const parsed = JSON.parse(message);
|
|
|
|
// Filter chat events by sessionKey - only forward if it's for this user's session
|
|
if (parsed.type === 'event' && parsed.event === 'chat') {
|
|
const payloadSessionKey = parsed.payload?.sessionKey;
|
|
const expectedSessionKey = `agent:main:${clientUserId}`;
|
|
|
|
// Only forward if sessionKey matches user's session (or is main/unspecified for backwards compat)
|
|
if (payloadSessionKey && payloadSessionKey !== 'agent:main:main' && payloadSessionKey !== expectedSessionKey) {
|
|
console.log(`[proxy] Filtering out chat event: sessionKey=${payloadSessionKey}, expected=${expectedSessionKey}`);
|
|
shouldForward = false;
|
|
} else {
|
|
console.log(`[proxy] Forwarding chat event: sessionKey=${payloadSessionKey || 'main'} for user ${clientUserId}`);
|
|
}
|
|
}
|
|
|
|
// If this is a connect response, inject user preferences
|
|
if (parsed.type === 'res' && parsed.ok && parsed.id && parsed.id.startsWith('connect-')) {
|
|
console.log(`[prefs] Injecting user preferences into connect response for ${clientUserId}`);
|
|
|
|
const prefs = getUserPreferences(clientUserId) || {};
|
|
|
|
if (!parsed.payload) {
|
|
parsed.payload = {};
|
|
}
|
|
|
|
parsed.payload.userPreferences = prefs;
|
|
messageToSend = JSON.stringify(parsed);
|
|
console.log(`[prefs] Injected preferences:`, prefs);
|
|
|
|
// Write per-user CONTEXT.md for agent to read
|
|
try {
|
|
const workspacePath = '/home/jknapp/.openclaw/workspace';
|
|
const assistantName = prefs.assistantName || 'Alfred';
|
|
const voiceId = prefs.voiceId || 'default';
|
|
|
|
// 1. Write per-user file (persistent, multi-user)
|
|
const userDir = join(workspacePath, 'users', clientUserId);
|
|
const userContextPath = join(userDir, 'CONTEXT.md');
|
|
|
|
if (!existsSync(userDir)) {
|
|
mkdirSync(userDir, { recursive: true });
|
|
}
|
|
|
|
const userContext = `# CONTEXT.md - User Preferences for ${clientUserId}
|
|
|
|
**User ID:** ${clientUserId}
|
|
**Assistant Name:** ${assistantName}
|
|
**Voice ID:** ${voiceId}
|
|
|
|
## Instructions
|
|
|
|
**IMPORTANT:** This file contains THIS USER's preferences and overrides global defaults.
|
|
|
|
- **When this user addresses you:** Use the Assistant Name above (e.g., "${assistantName}")
|
|
- **Self-identification:** "I'm ${assistantName}" (not "Alfred" unless that's the name above)
|
|
- **Voice responses:** Use the Voice ID above for TTS
|
|
|
|
This file is dynamically updated on each user connection by alfred-proxy.
|
|
|
|
**Multi-user support:** Each user has their own CONTEXT.md in their users/{userId}/ directory.
|
|
`;
|
|
|
|
writeFileSync(userContextPath, userContext, 'utf8');
|
|
console.log(`[prefs] Wrote users/${clientUserId}/CONTEXT.md (name: ${assistantName})`);
|
|
|
|
} catch (err) {
|
|
console.warn(`[prefs] Failed to write user context:`, err.message);
|
|
}
|
|
}
|
|
} catch (e) {
|
|
// Not JSON or couldn't parse, send as-is
|
|
}
|
|
|
|
// Only forward if not filtered
|
|
if (shouldForward && clientWs.readyState === WebSocket.OPEN) {
|
|
try {
|
|
clientWs.send(messageToSend);
|
|
console.log(`[proxy] Message sent to client successfully`);
|
|
} catch (err) {
|
|
console.error(`[proxy] Failed to send to client:`, err.message);
|
|
}
|
|
} else if (!shouldForward) {
|
|
console.log(`[proxy] Message filtered, not forwarding to client`);
|
|
} else {
|
|
console.warn(`[proxy] Client WebSocket not open, cannot send (state=${clientWs.readyState})`);
|
|
}
|
|
});
|
|
|
|
// Handle OpenClaw connection
|
|
openclawWs.on('open', () => {
|
|
console.log('[proxy] Connected to OpenClaw');
|
|
isAlive = true;
|
|
});
|
|
|
|
openclawWs.on('error', (err) => {
|
|
console.error('[proxy] OpenClaw connection error:', err.message);
|
|
clientWs.close(1011, 'Upstream connection failed');
|
|
});
|
|
|
|
openclawWs.on('close', (code, reason) => {
|
|
console.log(`[proxy] OpenClaw closed: ${code} ${reason}`);
|
|
clearInterval(pingInterval);
|
|
clientWs.close(code, reason.toString('utf8'));
|
|
});
|
|
|
|
// Handle client disconnection
|
|
clientWs.on('close', (code, reason) => {
|
|
console.log(`[proxy] Client disconnected: ${code} ${reason}`);
|
|
connectedClients.delete(clientWs);
|
|
console.log(`[proxy] Client removed from notification broadcast list (total: ${connectedClients.size})`);
|
|
clearInterval(pingInterval);
|
|
openclawWs.close();
|
|
});
|
|
|
|
clientWs.on('error', (err) => {
|
|
console.error('[proxy] Client connection error:', err.message);
|
|
connectedClients.delete(clientWs);
|
|
clearInterval(pingInterval);
|
|
openclawWs.close();
|
|
});
|
|
|
|
// Ping/pong to keep connection alive
|
|
const pingInterval = setInterval(() => {
|
|
if (!isAlive) {
|
|
clearInterval(pingInterval);
|
|
openclawWs.terminate();
|
|
clientWs.terminate();
|
|
return;
|
|
}
|
|
|
|
isAlive = false;
|
|
if (openclawWs.readyState === WebSocket.OPEN) {
|
|
openclawWs.ping();
|
|
}
|
|
}, 30000);
|
|
|
|
openclawWs.on('pong', () => {
|
|
isAlive = true;
|
|
});
|
|
});
|
|
|
|
// Graceful shutdown
|
|
process.on('SIGTERM', () => {
|
|
console.log('[alfred-proxy] SIGTERM received, closing server...');
|
|
httpServer.close(() => {
|
|
console.log('[alfred-proxy] HTTP server closed');
|
|
process.exit(0);
|
|
});
|
|
});
|
|
|
|
process.on('SIGINT', () => {
|
|
console.log('[alfred-proxy] SIGINT received, closing server...');
|
|
httpServer.close(() => {
|
|
console.log('[alfred-proxy] HTTP server closed');
|
|
process.exit(0);
|
|
});
|
|
});
|
|
|
|
// Global error handlers to prevent crashes
|
|
process.on('uncaughtException', (err) => {
|
|
console.error('[alfred-proxy] Uncaught exception:', err);
|
|
// Don't exit - log and continue
|
|
});
|
|
|
|
process.on('unhandledRejection', (reason, promise) => {
|
|
console.error('[alfred-proxy] Unhandled rejection at:', promise, 'reason:', reason);
|
|
// Don't exit - log and continue
|
|
});
|
|
|
|
console.log('[alfred-proxy] Service started successfully');
|