Initial commit: Alfred Proxy with OAuth, TTS, and FCM push notifications
- Environment-based configuration (no hardcoded secrets) - OAuth authentication via Authentik - ElevenLabs TTS integration via SAG CLI - FCM push notification support - User preferences sync system - Multi-user support with per-user context files - No internal IPs or service accounts in tracked files
This commit is contained in:
937
server.js
Normal file
937
server.js
Normal file
@@ -0,0 +1,937 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import express from 'express';
|
||||
import { WebSocketServer, WebSocket } from 'ws';
|
||||
import jwt from 'jsonwebtoken';
|
||||
import fetch from 'node-fetch';
|
||||
import admin from 'firebase-admin';
|
||||
import { readFileSync, writeFileSync, existsSync, mkdirSync, statSync } from 'fs';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { dirname, join, basename } from 'path';
|
||||
import multer from 'multer';
|
||||
import { spawn } from 'child_process';
|
||||
|
||||
// Get current directory for ES modules
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = dirname(__filename);
|
||||
|
||||
// Initialize Firebase Admin SDK
|
||||
const serviceAccount = JSON.parse(
|
||||
readFileSync(join(__dirname, 'service-account.json'), 'utf8')
|
||||
);
|
||||
|
||||
admin.initializeApp({
|
||||
credential: admin.credential.cert(serviceAccount)
|
||||
});
|
||||
|
||||
console.log('[firebase] Firebase Admin SDK initialized');
|
||||
|
||||
// ElevenLabs API key for TTS
|
||||
const ELEVENLABS_API_KEY = process.env.ELEVENLABS_API_KEY || '';
|
||||
|
||||
// Configuration (use environment variables)
|
||||
const config = {
|
||||
port: parseInt(process.env.PROXY_PORT || '18790', 10),
|
||||
openclawUrl: process.env.OPENCLAW_URL || 'ws://127.0.0.1:18789',
|
||||
openclawToken: process.env.OPENCLAW_TOKEN || '',
|
||||
authentikUrl: process.env.AUTHENTIK_URL || '',
|
||||
authentikClientId: process.env.AUTHENTIK_CLIENT_ID || '',
|
||||
requireAuth: process.env.REQUIRE_AUTH !== 'false',
|
||||
};
|
||||
|
||||
// Health check endpoint and notification API
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
|
||||
app.get('/health', (req, res) => {
|
||||
res.json({ status: 'ok', service: 'alfred-proxy' });
|
||||
});
|
||||
|
||||
// Track connected mobile clients for notifications
|
||||
const connectedClients = new Set();
|
||||
|
||||
// Track FCM tokens by user ID
|
||||
const fcmTokens = new Map(); // userId -> Set of FCM tokens
|
||||
|
||||
// Token persistence file
|
||||
const tokensFile = join(__dirname, 'fcm-tokens.json');
|
||||
|
||||
// User preferences storage
|
||||
const usersDir = join(__dirname, 'users');
|
||||
const userPreferences = new Map(); // userId -> preferences object
|
||||
|
||||
// Ensure users directory exists
|
||||
if (!existsSync(usersDir)) {
|
||||
mkdirSync(usersDir, { recursive: true });
|
||||
console.log(`[prefs] Created users directory: ${usersDir}`);
|
||||
}
|
||||
|
||||
// Load user preferences from disk
|
||||
function loadUserPreferences(userId) {
|
||||
try {
|
||||
const userFile = join(usersDir, `${userId}.json`);
|
||||
if (existsSync(userFile)) {
|
||||
const data = JSON.parse(readFileSync(userFile, 'utf8'));
|
||||
userPreferences.set(userId, data);
|
||||
console.log(`[prefs] Loaded preferences for user ${userId}`);
|
||||
return data;
|
||||
} else {
|
||||
console.log(`[prefs] No preferences file for user ${userId}, using defaults`);
|
||||
return null;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`[prefs] Error loading preferences for user ${userId}:`, error.message);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Save user preferences to disk
|
||||
function saveUserPreferences(userId, preferences) {
|
||||
try {
|
||||
const userFile = join(usersDir, `${userId}.json`);
|
||||
writeFileSync(userFile, JSON.stringify(preferences, null, 2), 'utf8');
|
||||
userPreferences.set(userId, preferences);
|
||||
console.log(`[prefs] Saved preferences for user ${userId}`);
|
||||
|
||||
// Also write to workspace for agent visibility (optional)
|
||||
try {
|
||||
const workspaceUsersDir = '/home/jknapp/.openclaw/workspace/users';
|
||||
if (!existsSync(workspaceUsersDir)) {
|
||||
mkdirSync(workspaceUsersDir, { recursive: true });
|
||||
}
|
||||
const workspaceUserFile = join(workspaceUsersDir, `${userId}.json`);
|
||||
writeFileSync(workspaceUserFile, JSON.stringify(preferences, null, 2), 'utf8');
|
||||
console.log(`[prefs] Synced preferences to workspace for agent`);
|
||||
} catch (wsError) {
|
||||
console.warn(`[prefs] Could not sync to workspace:`, wsError.message);
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error(`[prefs] Error saving preferences for user ${userId}:`, error.message);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Get user preferences (from cache or load from disk)
|
||||
function getUserPreferences(userId) {
|
||||
if (userPreferences.has(userId)) {
|
||||
return userPreferences.get(userId);
|
||||
}
|
||||
return loadUserPreferences(userId);
|
||||
}
|
||||
|
||||
// Load FCM tokens from disk
|
||||
function loadFcmTokens() {
|
||||
try {
|
||||
if (existsSync(tokensFile)) {
|
||||
const data = JSON.parse(readFileSync(tokensFile, 'utf8'));
|
||||
Object.entries(data).forEach(([userId, tokens]) => {
|
||||
fcmTokens.set(userId, new Set(tokens));
|
||||
});
|
||||
const totalTokens = Array.from(fcmTokens.values()).reduce((sum, set) => sum + set.size, 0);
|
||||
console.log(`[fcm] Loaded ${totalTokens} token(s) for ${fcmTokens.size} user(s) from disk`);
|
||||
} else {
|
||||
console.log('[fcm] No persisted tokens found, starting fresh');
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[fcm] Error loading tokens from disk:', error.message);
|
||||
}
|
||||
}
|
||||
|
||||
// Save FCM tokens to disk
|
||||
function saveFcmTokens() {
|
||||
try {
|
||||
const data = {};
|
||||
fcmTokens.forEach((tokens, userId) => {
|
||||
data[userId] = Array.from(tokens);
|
||||
});
|
||||
writeFileSync(tokensFile, JSON.stringify(data, null, 2), 'utf8');
|
||||
console.log(`[fcm] Saved tokens to disk`);
|
||||
} catch (error) {
|
||||
console.error('[fcm] Error saving tokens to disk:', error.message);
|
||||
}
|
||||
}
|
||||
|
||||
// Load tokens on startup
|
||||
loadFcmTokens();
|
||||
|
||||
// Notification endpoint (for mobile-notify tool)
|
||||
app.post('/api/notify', (req, res) => {
|
||||
try {
|
||||
const {
|
||||
notificationType = 'alert',
|
||||
title = 'AI Assistant',
|
||||
message,
|
||||
priority = 'default',
|
||||
sound = true,
|
||||
vibrate = true,
|
||||
timestamp = Date.now(),
|
||||
action = null,
|
||||
userId = null
|
||||
} = req.body;
|
||||
|
||||
if (!message) {
|
||||
return res.status(400).json({ error: 'message is required' });
|
||||
}
|
||||
|
||||
console.log(`[notify] Broadcasting notification: type=${notificationType} title="${title}" message="${message}"`);
|
||||
|
||||
// Create notification event
|
||||
const notificationEvent = {
|
||||
type: 'event',
|
||||
event: 'mobile.notification',
|
||||
payload: {
|
||||
notificationType,
|
||||
title,
|
||||
message,
|
||||
priority,
|
||||
sound,
|
||||
vibrate,
|
||||
timestamp,
|
||||
action
|
||||
}
|
||||
};
|
||||
|
||||
// Broadcast to all connected clients
|
||||
let sentCount = 0;
|
||||
const notificationJson = JSON.stringify(notificationEvent);
|
||||
console.log(`[notify] Broadcast event: ${notificationJson}`);
|
||||
|
||||
connectedClients.forEach(client => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(notificationJson);
|
||||
console.log(`[notify] Sent to client (ready state: ${client.readyState})`);
|
||||
sentCount++;
|
||||
} else {
|
||||
console.log(`[notify] Skipped client (ready state: ${client.readyState})`);
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[notify] Sent notification to ${sentCount} connected client(s)`);
|
||||
|
||||
// Also send FCM notifications to devices that might be asleep
|
||||
let fcmSentCount = 0;
|
||||
const allTokens = [];
|
||||
|
||||
// Filter tokens by userId if specified
|
||||
if (userId) {
|
||||
console.log(`[fcm] Filtering notifications for user: ${userId}`);
|
||||
const userTokens = fcmTokens.get(userId);
|
||||
if (userTokens) {
|
||||
userTokens.forEach(token => allTokens.push(token));
|
||||
} else {
|
||||
console.log(`[fcm] No tokens found for user ${userId}`);
|
||||
}
|
||||
} else {
|
||||
// Send to all users if no userId specified
|
||||
fcmTokens.forEach((tokens, uid) => {
|
||||
tokens.forEach(token => allTokens.push(token));
|
||||
});
|
||||
}
|
||||
|
||||
if (allTokens.length > 0) {
|
||||
console.log(`[fcm] Sending push notification to ${allTokens.length} registered device(s)`);
|
||||
|
||||
// Send FCM notification (don't wait for it)
|
||||
admin.messaging().sendEachForMulticast({
|
||||
tokens: allTokens,
|
||||
data: {
|
||||
notificationType,
|
||||
title,
|
||||
message,
|
||||
priority,
|
||||
sound: sound.toString(),
|
||||
vibrate: vibrate.toString(),
|
||||
timestamp: timestamp.toString(),
|
||||
action: action || ''
|
||||
},
|
||||
android: {
|
||||
priority: notificationType === 'alarm' ? 'high' : 'normal',
|
||||
ttl: 60000 // 60 seconds
|
||||
}
|
||||
})
|
||||
.then(response => {
|
||||
console.log(`[fcm] Successfully sent ${response.successCount} message(s)`);
|
||||
if (response.failureCount > 0) {
|
||||
console.log(`[fcm] Failed to send ${response.failureCount} message(s)`);
|
||||
response.responses.forEach((resp, idx) => {
|
||||
if (!resp.success) {
|
||||
console.log(`[fcm] Error for token ${allTokens[idx].substring(0, 20)}: ${resp.error}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
fcmSentCount = response.successCount;
|
||||
})
|
||||
.catch(error => {
|
||||
console.error('[fcm] Error sending push notification:', error);
|
||||
});
|
||||
} else {
|
||||
console.log('[fcm] No FCM tokens registered, skipping push notification');
|
||||
}
|
||||
|
||||
res.json({ success: true, clients: sentCount, fcm: allTokens.length });
|
||||
} catch (err) {
|
||||
console.error('[notify] Error broadcasting notification:', err);
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Alarm dismissal endpoint - broadcast dismissal to all user's devices
|
||||
app.post('/api/alarm/dismiss', (req, res) => {
|
||||
try {
|
||||
const { userId, alarmId } = req.body;
|
||||
|
||||
if (!userId || !alarmId) {
|
||||
return res.status(400).json({ error: 'userId and alarmId are required' });
|
||||
}
|
||||
|
||||
console.log(`[alarm] Broadcasting dismissal for alarm ${alarmId} to user ${userId}`);
|
||||
|
||||
// Create dismissal event
|
||||
const dismissalEvent = {
|
||||
type: 'alarm_dismiss',
|
||||
alarmId: alarmId,
|
||||
timestamp: Date.now()
|
||||
};
|
||||
|
||||
// Broadcast to all connected WebSocket clients for this user
|
||||
let sentCount = 0;
|
||||
const dismissalJson = JSON.stringify(dismissalEvent);
|
||||
|
||||
connectedClients.forEach(client => {
|
||||
if (client.userId === userId && client.readyState === WebSocket.OPEN) {
|
||||
client.send(dismissalJson);
|
||||
console.log(`[alarm] Sent dismissal to WebSocket client for user ${userId}`);
|
||||
sentCount++;
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[alarm] Sent dismissal to ${sentCount} WebSocket client(s)`);
|
||||
|
||||
// Also send FCM to devices that might be offline/asleep
|
||||
const userTokens = fcmTokens.get(userId) || new Set();
|
||||
const tokens = Array.from(userTokens);
|
||||
|
||||
if (tokens.length > 0) {
|
||||
admin.messaging().sendEachForMulticast({
|
||||
tokens: tokens,
|
||||
data: {
|
||||
type: 'alarm_dismiss',
|
||||
alarmId: alarmId,
|
||||
timestamp: Date.now().toString()
|
||||
},
|
||||
android: {
|
||||
priority: 'high'
|
||||
}
|
||||
})
|
||||
.then(response => {
|
||||
console.log(`[fcm] Sent dismissal to ${response.successCount}/${tokens.length} FCM devices`);
|
||||
})
|
||||
.catch(error => {
|
||||
console.error('[fcm] Error sending dismissal via FCM:', error);
|
||||
});
|
||||
} else {
|
||||
console.log('[fcm] No FCM tokens for user, skipping push dismissal');
|
||||
}
|
||||
|
||||
res.json({ success: true, websocket: sentCount, fcm: tokens.length });
|
||||
} catch (err) {
|
||||
console.error('[alarm] Error broadcasting dismissal:', err);
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// File upload configuration
|
||||
const uploadsDir = join(__dirname, 'uploads');
|
||||
if (!existsSync(uploadsDir)) {
|
||||
mkdirSync(uploadsDir, { recursive: true });
|
||||
console.log(`[upload] Created uploads directory: ${uploadsDir}`);
|
||||
}
|
||||
|
||||
const storage = multer.diskStorage({
|
||||
destination: (req, file, cb) => {
|
||||
cb(null, uploadsDir);
|
||||
},
|
||||
filename: (req, file, cb) => {
|
||||
// Generate unique filename with timestamp
|
||||
const uniqueSuffix = `${Date.now()}-${Math.round(Math.random() * 1E9)}`;
|
||||
const ext = file.originalname.split('.').pop();
|
||||
cb(null, `${file.fieldname}-${uniqueSuffix}.${ext}`);
|
||||
}
|
||||
});
|
||||
|
||||
const upload = multer({
|
||||
storage: storage,
|
||||
limits: {
|
||||
fileSize: 100 * 1024 * 1024 // 100MB limit
|
||||
}
|
||||
});
|
||||
|
||||
// File upload endpoint
|
||||
app.post('/api/upload', upload.single('file'), (req, res) => {
|
||||
try {
|
||||
if (!req.file) {
|
||||
return res.status(400).json({ error: 'No file uploaded' });
|
||||
}
|
||||
|
||||
const filePath = join(uploadsDir, req.file.filename);
|
||||
const fileUrl = `/uploads/${req.file.filename}`;
|
||||
|
||||
console.log(`[upload] File uploaded: ${req.file.originalname} -> ${req.file.filename}`);
|
||||
console.log(`[upload] Size: ${(req.file.size / 1024 / 1024).toFixed(2)} MB`);
|
||||
console.log(`[upload] Type: ${req.file.mimetype}`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
filename: req.file.filename,
|
||||
originalName: req.file.originalname,
|
||||
size: req.file.size,
|
||||
mimeType: req.file.mimetype,
|
||||
path: filePath,
|
||||
url: fileUrl
|
||||
});
|
||||
} catch (err) {
|
||||
console.error('[upload] Error handling file upload:', err);
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Text-to-speech endpoint using SAG (file-based)
|
||||
*/
|
||||
app.post('/api/tts', async (req, res) => {
|
||||
const { text, voiceId } = req.body;
|
||||
|
||||
if (!text) {
|
||||
return res.status(400).json({ error: 'text is required' });
|
||||
}
|
||||
|
||||
try {
|
||||
// Create temp file for audio output
|
||||
const tempFile = join(uploadsDir, `tts-${Date.now()}.mp3`);
|
||||
|
||||
// Build SAG command
|
||||
const args = ['-o', tempFile];
|
||||
if (voiceId) {
|
||||
args.push('-v', voiceId);
|
||||
}
|
||||
|
||||
console.log(`[tts] Generating speech for text: "${text.substring(0, 50)}..." voice: ${voiceId || 'default'}`);
|
||||
|
||||
// Spawn SAG process with 120 second timeout for long responses
|
||||
const sagPath = '/home/linuxbrew/.linuxbrew/bin/sag';
|
||||
const sagProcess = spawn(sagPath, args, {
|
||||
timeout: 120000, // 2 minutes for very long responses
|
||||
env: {
|
||||
...process.env,
|
||||
ELEVENLABS_API_KEY: ELEVENLABS_API_KEY
|
||||
}
|
||||
});
|
||||
|
||||
let stderr = '';
|
||||
|
||||
sagProcess.stderr.on('data', (data) => {
|
||||
stderr += data.toString();
|
||||
});
|
||||
|
||||
// Write text to stdin
|
||||
sagProcess.stdin.write(text);
|
||||
sagProcess.stdin.end();
|
||||
|
||||
sagProcess.on('close', (code) => {
|
||||
if (code !== 0) {
|
||||
console.error(`[tts] SAG failed with code ${code}: ${stderr}`);
|
||||
return res.status(500).json({ error: `TTS failed: ${stderr}` });
|
||||
}
|
||||
|
||||
// Check if file was created
|
||||
if (!existsSync(tempFile)) {
|
||||
console.error('[tts] SAG succeeded but no audio file created');
|
||||
return res.status(500).json({ error: 'No audio file generated' });
|
||||
}
|
||||
|
||||
const audioUrl = `/uploads/${basename(tempFile)}`;
|
||||
|
||||
console.log(`[tts] Generated audio: ${basename(tempFile)} (${statSync(tempFile).size} bytes)`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
audioUrl: audioUrl,
|
||||
filename: basename(tempFile)
|
||||
});
|
||||
});
|
||||
|
||||
sagProcess.on('error', (err) => {
|
||||
console.error('[tts] SAG spawn error:', err);
|
||||
res.status(500).json({ error: `Failed to spawn SAG: ${err.message}` });
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
console.error('[tts] Error:', err);
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Streaming endpoint removed - SAG doesn't easily support stdout streaming
|
||||
// The file-based endpoint with extended timeout is sufficient for now
|
||||
|
||||
// Serve uploaded files
|
||||
app.use('/uploads', express.static(uploadsDir));
|
||||
|
||||
const httpServer = app.listen(config.port, () => {
|
||||
console.log(`[alfred-proxy] HTTP server listening on port ${config.port}`);
|
||||
console.log(`[alfred-proxy] WebSocket endpoint: ws://localhost:${config.port}`);
|
||||
console.log(`[alfred-proxy] OpenClaw target: ${config.openclawUrl}`);
|
||||
console.log(`[alfred-proxy] Auth required: ${config.requireAuth}`);
|
||||
});
|
||||
|
||||
// WebSocket server
|
||||
const wss = new WebSocketServer({ server: httpServer });
|
||||
|
||||
/**
|
||||
* Extract stable user ID from userInfo.
|
||||
* Uses same logic as Android app: preferred_username > email > sub
|
||||
*/
|
||||
function extractUserId(userInfo) {
|
||||
return userInfo.preferred_username || userInfo.email || userInfo.sub || 'unknown';
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate OAuth token with Authentik's userinfo endpoint
|
||||
*/
|
||||
async function validateAuthentikToken(token) {
|
||||
if (!config.requireAuth) {
|
||||
console.log('[auth] Auth disabled, skipping validation');
|
||||
return { valid: true, user: { sub: 'dev-user', email: 'dev@local' } };
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(`${config.authentikUrl}/application/o/userinfo/`, {
|
||||
headers: {
|
||||
'Authorization': `Bearer ${token}`,
|
||||
},
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text();
|
||||
console.error(`[auth] Authentik validation failed: ${response.status} - ${errorText}`);
|
||||
console.error(`[auth] Token (first 20 chars): ${token.substring(0, 20)}...`);
|
||||
return { valid: false, error: 'Invalid token' };
|
||||
}
|
||||
|
||||
const userInfo = await response.json();
|
||||
const userId = extractUserId(userInfo);
|
||||
console.log(`[auth] Token validated for user: ${userId} (${userInfo.email || userInfo.sub})`);
|
||||
|
||||
return { valid: true, user: userInfo };
|
||||
} catch (err) {
|
||||
console.error('[auth] Authentik validation error:', err.message);
|
||||
return { valid: false, error: err.message };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract OAuth token from WebSocket upgrade request
|
||||
*/
|
||||
function extractToken(req) {
|
||||
// Check Authorization header
|
||||
const authHeader = req.headers['authorization'];
|
||||
if (authHeader && authHeader.startsWith('Bearer ')) {
|
||||
return authHeader.slice(7).trim();
|
||||
}
|
||||
|
||||
// Check query parameter (for testing)
|
||||
const url = new URL(req.url, 'ws://localhost');
|
||||
const token = url.searchParams.get('token');
|
||||
if (token) {
|
||||
return token.trim();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inject OpenClaw token into connect message
|
||||
*/
|
||||
function injectOpenClawToken(message) {
|
||||
try {
|
||||
const msg = JSON.parse(message);
|
||||
|
||||
// Only inject token into connect messages
|
||||
if (msg.type === 'req' && msg.method === 'connect') {
|
||||
console.log('[proxy] Injecting OpenClaw token into connect message');
|
||||
|
||||
if (!msg.params) {
|
||||
msg.params = {};
|
||||
}
|
||||
|
||||
if (!msg.params.auth) {
|
||||
msg.params.auth = {};
|
||||
}
|
||||
|
||||
// Add OpenClaw gateway token
|
||||
msg.params.auth.token = config.openclawToken;
|
||||
|
||||
// Change webchat mode to cli to bypass origin validation
|
||||
if (msg.params.client && msg.params.client.mode === 'webchat') {
|
||||
console.log('[proxy] Changing mode from webchat to cli to bypass origin validation');
|
||||
msg.params.client.mode = 'cli';
|
||||
}
|
||||
|
||||
return JSON.stringify(msg);
|
||||
}
|
||||
|
||||
// Pass through all other messages unchanged
|
||||
return message;
|
||||
} catch (err) {
|
||||
// If it's not JSON, pass through unchanged
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle WebSocket connections
|
||||
wss.on('connection', async (clientWs, req) => {
|
||||
const clientIp = req.socket.remoteAddress;
|
||||
console.log(`[proxy] New connection from ${clientIp}`);
|
||||
|
||||
// Extract and validate OAuth token
|
||||
const oauthToken = extractToken(req);
|
||||
|
||||
if (!oauthToken && config.requireAuth) {
|
||||
console.log(`[proxy] No OAuth token provided, rejecting connection`);
|
||||
clientWs.close(1008, 'Authentication required');
|
||||
return;
|
||||
}
|
||||
|
||||
let clientUserId = null;
|
||||
|
||||
if (oauthToken) {
|
||||
const validation = await validateAuthentikToken(oauthToken);
|
||||
|
||||
if (!validation.valid) {
|
||||
console.log(`[proxy] Invalid OAuth token, rejecting connection`);
|
||||
clientWs.close(1008, validation.error || 'Invalid token');
|
||||
return;
|
||||
}
|
||||
|
||||
const userId = extractUserId(validation.user);
|
||||
console.log(`[proxy] Authenticated as ${userId} (${validation.user.email || validation.user.sub})`);
|
||||
clientUserId = userId;
|
||||
}
|
||||
|
||||
// Connect to OpenClaw
|
||||
console.log(`[proxy] Connecting to OpenClaw at ${config.openclawUrl}`);
|
||||
const openclawWs = new WebSocket(config.openclawUrl, {
|
||||
headers: {
|
||||
'Origin': `http://localhost:${config.port}`
|
||||
}
|
||||
});
|
||||
|
||||
let isAlive = true;
|
||||
|
||||
// Add client to connected clients set (for notification broadcasting)
|
||||
// Store userId on the WebSocket client for filtering
|
||||
clientWs.userId = clientUserId;
|
||||
connectedClients.add(clientWs);
|
||||
console.log(`[proxy] Client added to notification broadcast list (total: ${connectedClients.size})`);
|
||||
|
||||
// Proxy: Client → OpenClaw (inject token)
|
||||
clientWs.on('message', (data) => {
|
||||
const message = data.toString('utf8');
|
||||
|
||||
try {
|
||||
// Parse message to check for special events
|
||||
const parsed = JSON.parse(message);
|
||||
|
||||
// Handle FCM token registration
|
||||
if (parsed.type === 'fcm.register') {
|
||||
const fcmToken = parsed.token;
|
||||
if (fcmToken && clientUserId) {
|
||||
console.log(`[fcm] Registering token for user ${clientUserId}: ${fcmToken.substring(0, 20)}...`);
|
||||
|
||||
// Store token for this user
|
||||
if (!fcmTokens.has(clientUserId)) {
|
||||
fcmTokens.set(clientUserId, new Set());
|
||||
}
|
||||
fcmTokens.get(clientUserId).add(fcmToken);
|
||||
|
||||
console.log(`[fcm] User ${clientUserId} now has ${fcmTokens.get(clientUserId).size} token(s)`);
|
||||
|
||||
// Persist to disk
|
||||
saveFcmTokens();
|
||||
} else {
|
||||
console.log('[fcm] Invalid FCM registration: missing token or user ID');
|
||||
}
|
||||
return; // Don't forward to OpenClaw
|
||||
}
|
||||
|
||||
// Handle alarm dismiss - broadcast to all other clients
|
||||
if (parsed.type === 'alarm.dismiss') {
|
||||
console.log(`[proxy] Alarm dismiss received: ${parsed.alarmId}`);
|
||||
|
||||
const dismissEvent = {
|
||||
type: 'event',
|
||||
event: 'mobile.alarm.dismissed',
|
||||
payload: {
|
||||
alarmId: parsed.alarmId,
|
||||
timestamp: parsed.timestamp || Date.now()
|
||||
}
|
||||
};
|
||||
|
||||
const dismissJson = JSON.stringify(dismissEvent);
|
||||
let broadcastCount = 0;
|
||||
|
||||
// Broadcast to all clients (including sender for consistency)
|
||||
connectedClients.forEach(client => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(dismissJson);
|
||||
broadcastCount++;
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[proxy] Broadcasted alarm dismiss to ${broadcastCount} client(s)`);
|
||||
return; // Don't forward to OpenClaw
|
||||
}
|
||||
|
||||
// Handle user preferences update
|
||||
if (parsed.type === 'req' && parsed.method === 'user.preferences.update') {
|
||||
console.log(`[prefs] Preference update request from user ${clientUserId}`);
|
||||
|
||||
if (!clientUserId) {
|
||||
const errorResponse = {
|
||||
type: 'res',
|
||||
id: parsed.id,
|
||||
ok: false,
|
||||
error: 'User ID not available'
|
||||
};
|
||||
clientWs.send(JSON.stringify(errorResponse));
|
||||
return;
|
||||
}
|
||||
|
||||
const currentPrefs = getUserPreferences(clientUserId) || {};
|
||||
const updatedPrefs = { ...currentPrefs, ...parsed.params };
|
||||
|
||||
if (saveUserPreferences(clientUserId, updatedPrefs)) {
|
||||
const successResponse = {
|
||||
type: 'res',
|
||||
id: parsed.id,
|
||||
ok: true,
|
||||
payload: updatedPrefs
|
||||
};
|
||||
clientWs.send(JSON.stringify(successResponse));
|
||||
console.log(`[prefs] Updated preferences for ${clientUserId}:`, updatedPrefs);
|
||||
} else {
|
||||
const errorResponse = {
|
||||
type: 'res',
|
||||
id: parsed.id,
|
||||
ok: false,
|
||||
error: 'Failed to save preferences'
|
||||
};
|
||||
clientWs.send(JSON.stringify(errorResponse));
|
||||
}
|
||||
return; // Don't forward to OpenClaw
|
||||
}
|
||||
|
||||
// Handle user preferences get
|
||||
if (parsed.type === 'req' && parsed.method === 'user.preferences.get') {
|
||||
console.log(`[prefs] Preference get request from user ${clientUserId}`);
|
||||
|
||||
if (!clientUserId) {
|
||||
const errorResponse = {
|
||||
type: 'res',
|
||||
id: parsed.id,
|
||||
ok: false,
|
||||
error: 'User ID not available'
|
||||
};
|
||||
clientWs.send(JSON.stringify(errorResponse));
|
||||
return;
|
||||
}
|
||||
|
||||
const prefs = getUserPreferences(clientUserId) || {};
|
||||
const successResponse = {
|
||||
type: 'res',
|
||||
id: parsed.id,
|
||||
ok: true,
|
||||
payload: prefs
|
||||
};
|
||||
clientWs.send(JSON.stringify(successResponse));
|
||||
console.log(`[prefs] Retrieved preferences for ${clientUserId}:`, prefs);
|
||||
return; // Don't forward to OpenClaw
|
||||
}
|
||||
} catch (e) {
|
||||
// Not JSON or not a special event - continue with normal flow
|
||||
}
|
||||
|
||||
// Forward all other messages to OpenClaw
|
||||
if (openclawWs.readyState === WebSocket.OPEN) {
|
||||
console.log(`[proxy] Client → OpenClaw (full): ${message}`);
|
||||
const modifiedMessage = injectOpenClawToken(message);
|
||||
openclawWs.send(modifiedMessage);
|
||||
}
|
||||
});
|
||||
|
||||
// Proxy: OpenClaw → Client (inject user preferences into connect response)
|
||||
openclawWs.on('message', (data) => {
|
||||
const message = data.toString('utf8');
|
||||
console.log(`[proxy] OpenClaw → Client: ${message.substring(0, 100)}...`);
|
||||
console.log(`[proxy] Client WebSocket state: ${clientWs.readyState} (1=OPEN)`);
|
||||
|
||||
let messageToSend = data;
|
||||
|
||||
// Try to inject user preferences into connect response
|
||||
try {
|
||||
const parsed = JSON.parse(message);
|
||||
|
||||
// If this is a connect response, inject user preferences
|
||||
if (parsed.type === 'res' && parsed.ok && parsed.id && parsed.id.startsWith('connect-')) {
|
||||
console.log(`[prefs] Injecting user preferences into connect response for ${clientUserId}`);
|
||||
|
||||
const prefs = getUserPreferences(clientUserId) || {};
|
||||
|
||||
if (!parsed.payload) {
|
||||
parsed.payload = {};
|
||||
}
|
||||
|
||||
parsed.payload.userPreferences = prefs;
|
||||
messageToSend = JSON.stringify(parsed);
|
||||
console.log(`[prefs] Injected preferences:`, prefs);
|
||||
|
||||
// Write per-user CONTEXT.md for agent to read
|
||||
try {
|
||||
const workspacePath = '/home/jknapp/.openclaw/workspace';
|
||||
const assistantName = prefs.assistantName || 'Alfred';
|
||||
const voiceId = prefs.voiceId || 'default';
|
||||
|
||||
// 1. Write per-user file (persistent, multi-user)
|
||||
const userDir = join(workspacePath, 'users', clientUserId);
|
||||
const userContextPath = join(userDir, 'CONTEXT.md');
|
||||
|
||||
if (!existsSync(userDir)) {
|
||||
mkdirSync(userDir, { recursive: true });
|
||||
}
|
||||
|
||||
const userContext = `# CONTEXT.md - User Preferences for ${clientUserId}
|
||||
|
||||
**User ID:** ${clientUserId}
|
||||
**Assistant Name:** ${assistantName}
|
||||
**Voice ID:** ${voiceId}
|
||||
|
||||
## Instructions
|
||||
|
||||
**IMPORTANT:** This file contains THIS USER's preferences and overrides global defaults.
|
||||
|
||||
- **When this user addresses you:** Use the Assistant Name above (e.g., "${assistantName}")
|
||||
- **Self-identification:** "I'm ${assistantName}" (not "Alfred" unless that's the name above)
|
||||
- **Voice responses:** Use the Voice ID above for TTS
|
||||
|
||||
This file is dynamically updated on each user connection by alfred-proxy.
|
||||
|
||||
**Multi-user support:** Each user has their own CONTEXT.md in their users/{userId}/ directory.
|
||||
`;
|
||||
|
||||
writeFileSync(userContextPath, userContext, 'utf8');
|
||||
console.log(`[prefs] Wrote users/${clientUserId}/CONTEXT.md (name: ${assistantName})`);
|
||||
|
||||
} catch (err) {
|
||||
console.warn(`[prefs] Failed to write user context:`, err.message);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Not JSON or couldn't parse, send as-is
|
||||
}
|
||||
|
||||
if (clientWs.readyState === WebSocket.OPEN) {
|
||||
try {
|
||||
clientWs.send(messageToSend);
|
||||
console.log(`[proxy] Message sent to client successfully`);
|
||||
} catch (err) {
|
||||
console.error(`[proxy] Failed to send to client:`, err.message);
|
||||
}
|
||||
} else {
|
||||
console.warn(`[proxy] Client WebSocket not open, cannot send (state=${clientWs.readyState})`);
|
||||
}
|
||||
});
|
||||
|
||||
// Handle OpenClaw connection
|
||||
openclawWs.on('open', () => {
|
||||
console.log('[proxy] Connected to OpenClaw');
|
||||
isAlive = true;
|
||||
});
|
||||
|
||||
openclawWs.on('error', (err) => {
|
||||
console.error('[proxy] OpenClaw connection error:', err.message);
|
||||
clientWs.close(1011, 'Upstream connection failed');
|
||||
});
|
||||
|
||||
openclawWs.on('close', (code, reason) => {
|
||||
console.log(`[proxy] OpenClaw closed: ${code} ${reason}`);
|
||||
clearInterval(pingInterval);
|
||||
clientWs.close(code, reason.toString('utf8'));
|
||||
});
|
||||
|
||||
// Handle client disconnection
|
||||
clientWs.on('close', (code, reason) => {
|
||||
console.log(`[proxy] Client disconnected: ${code} ${reason}`);
|
||||
connectedClients.delete(clientWs);
|
||||
console.log(`[proxy] Client removed from notification broadcast list (total: ${connectedClients.size})`);
|
||||
clearInterval(pingInterval);
|
||||
openclawWs.close();
|
||||
});
|
||||
|
||||
clientWs.on('error', (err) => {
|
||||
console.error('[proxy] Client connection error:', err.message);
|
||||
connectedClients.delete(clientWs);
|
||||
clearInterval(pingInterval);
|
||||
openclawWs.close();
|
||||
});
|
||||
|
||||
// Ping/pong to keep connection alive
|
||||
const pingInterval = setInterval(() => {
|
||||
if (!isAlive) {
|
||||
clearInterval(pingInterval);
|
||||
openclawWs.terminate();
|
||||
clientWs.terminate();
|
||||
return;
|
||||
}
|
||||
|
||||
isAlive = false;
|
||||
if (openclawWs.readyState === WebSocket.OPEN) {
|
||||
openclawWs.ping();
|
||||
}
|
||||
}, 30000);
|
||||
|
||||
openclawWs.on('pong', () => {
|
||||
isAlive = true;
|
||||
});
|
||||
});
|
||||
|
||||
// Graceful shutdown
|
||||
process.on('SIGTERM', () => {
|
||||
console.log('[alfred-proxy] SIGTERM received, closing server...');
|
||||
httpServer.close(() => {
|
||||
console.log('[alfred-proxy] HTTP server closed');
|
||||
process.exit(0);
|
||||
});
|
||||
});
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
console.log('[alfred-proxy] SIGINT received, closing server...');
|
||||
httpServer.close(() => {
|
||||
console.log('[alfred-proxy] HTTP server closed');
|
||||
process.exit(0);
|
||||
});
|
||||
});
|
||||
|
||||
// Global error handlers to prevent crashes
|
||||
process.on('uncaughtException', (err) => {
|
||||
console.error('[alfred-proxy] Uncaught exception:', err);
|
||||
// Don't exit - log and continue
|
||||
});
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
console.error('[alfred-proxy] Unhandled rejection at:', promise, 'reason:', reason);
|
||||
// Don't exit - log and continue
|
||||
});
|
||||
|
||||
console.log('[alfred-proxy] Service started successfully');
|
||||
Reference in New Issue
Block a user