diff --git a/server-http.js b/server-http.js index aeebfb9..e21d45a 100644 --- a/server-http.js +++ b/server-http.js @@ -17,12 +17,13 @@ import HPRDataLoader from './data-loader.js'; // Configuration const PORT = process.env.PORT || 3000; const MAX_CONCURRENT_REQUESTS = 10; -const REQUEST_TIMEOUT_MS = 30000; +// Increased the timeout for the long-lived SSE connection connect() call +const REQUEST_TIMEOUT_MS = 60000; // 60 seconds (was 30s) const RATE_LIMIT_WINDOW_MS = 60000; // 1 minute -const RATE_LIMIT_MAX_REQUESTS = 50; // 50 requests per minute per IP +const RATE_LIMIT_MAX_REQUESTS = 100; // 100 requests per minute per IP const MEMORY_THRESHOLD_MB = 450; const CIRCUIT_BREAKER_THRESHOLD = 5; -const CIRCUIT_BREAKER_TIMEOUT_MS = 60000; +const CIRCUIT_BREAKER_TIMEOUT_MS = 60000; // 60 seconds (how long it stays OPEN) const SSE_HEARTBEAT_INTERVAL_MS = 20000; // 20 seconds to prevent proxy timeout // Initialize data loader @@ -31,6 +32,9 @@ const dataLoader = new HPRDataLoader(); await dataLoader.load(); console.error('Data loaded successfully!'); +// Map to store active SSE transports, keyed by connectionId +const activeSseTransports = new Map(); + // Circuit Breaker class for graceful degradation class CircuitBreaker { constructor(threshold = CIRCUIT_BREAKER_THRESHOLD, timeout = CIRCUIT_BREAKER_TIMEOUT_MS) { @@ -148,12 +152,17 @@ function formatEpisode(episode, includeNotes = false) { ${episode.summary}`; if (seriesInfo) { - result += `\n\n## Series + result += ` + +## Series **${seriesInfo.name}**: ${stripHtml(seriesInfo.description)}`; } if (includeNotes && episode.notes) { - result += `\n\n## Host Notes\n${stripHtml(episode.notes)}`; + result += ` + +## Host Notes +${stripHtml(episode.notes)}`; } return result; @@ -662,8 +671,11 @@ ${match.context} // Create Express app const app = express(); -// Trust proxy headers (required for Render, Heroku, etc.) -app.set('trust proxy', true); +// Create a single MCP server instance +const mcpServer = createMCPServer(); + +// Trust first proxy hop (Render/Heroku) without allowing arbitrary spoofing +app.set('trust proxy', 1); // Enable CORS app.use(cors()); @@ -671,7 +683,7 @@ app.use(cors()); // Enable compression app.use(compression()); -// โญ FIX: Apply JSON body parsing globally for the SDK to read POST bodies +// Apply JSON body parsing globally for the SDK to read POST bodies. app.use(express.json()); // Rate limiting @@ -701,10 +713,21 @@ app.get('/health', (req, res) => { }); }); +// โญ NEW ENDPOINT: Circuit breaker reset +app.post('/reset', (req, res) => { + if (circuitBreaker.state === 'OPEN') { + circuitBreaker.reset(); + console.error('Circuit breaker manually reset.'); + res.json({ status: 'ok', message: 'Circuit breaker reset to CLOSED.' }); + } else { + res.json({ status: 'ok', message: 'Circuit breaker already CLOSED.' }); + } +}); + // SSE endpoint for MCP app.get('/sse', async (req, res) => { let pingInterval = null; - let headersSent = false; + let transport; try { // Check system health @@ -714,56 +737,50 @@ app.get('/sse', async (req, res) => { activeRequests++; console.error(`New SSE connection. Active requests: ${activeRequests}`); - // 1. Send no-buffering headers and flush immediately - res.writeHead(200, { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', - // CRITICAL: Tells proxies (like NGINX) not to buffer the response - 'X-Accel-Buffering': 'no', - }); + // Create SSE transport, specifying the POST message path + transport = new SSEServerTransport('/message', res); + activeSseTransports.set(transport.sessionId, transport); - // Send an initial comment (ping) to complete the HTTP handshake and flush headers - res.write(':\n'); - res.flushHeaders(); - headersSent = true; - - // 2. Start the heartbeat/ping interval + // Connect server with timeout and circuit breaker + // This calls transport.start() internally, which sets up headers and sends the endpoint event. + await circuitBreaker.execute(() => mcpServer.connect(transport)); + + // 2. Start the heartbeat/ping interval (after transport.start() has set up res.write) pingInterval = setInterval(() => { // Send a comment line every 20s to keep the proxy alive res.write(':\n'); }, SSE_HEARTBEAT_INTERVAL_MS); - // Create a new MCP server instance for this connection - const server = createMCPServer(); - - // Create SSE transport, specifying the POST message path - const transport = new SSEServerTransport('/message', res); - - // Connect server with timeout and circuit breaker - await withTimeout( - circuitBreaker.execute(() => server.connect(transport)), - REQUEST_TIMEOUT_MS - ); - - // Handle connection close + // Handle connection close (will execute when client closes the connection) req.on('close', () => { activeRequests--; if (pingInterval) { clearInterval(pingInterval); } + if (transport) { + activeSseTransports.delete(transport.sessionId); + } console.error(`SSE connection closed. Active requests: ${activeRequests}`); + // Ensure the server stream is ended gracefully if it hasn't already + if (!res.writableEnded) { + res.end(); + } }); } catch (error) { + // Handle error during connection establishment or connection timeout activeRequests--; if (pingInterval) { clearInterval(pingInterval); } + if (transport) { + activeSseTransports.delete(transport.sessionId); + } console.error('SSE connection error:', error.message); - if (!headersSent) { + if (!res.headersSent) { // Case 1: Error before SSE headers were flushed (e.g., checkMemory failed) + // We can still set the status code. res.status(503).json({ error: error.message, circuitBreaker: circuitBreaker.state, @@ -781,11 +798,22 @@ app.get('/sse', async (req, res) => { } }); +// POST endpoint for MCP messages +app.post('/message', async (req, res) => { + const connectionId = req.headers['x-connection-id']; + const transport = activeSseTransports.get(connectionId); -// โญ CRITICAL FIX: Removed the custom app.post('/message') handler. -// The SSEServerTransport (created in app.get('/sse')) now automatically handles -// the POST requests to '/message' and sends the 200 OK response, fixing the 502 issue. - + if (transport) { + try { + await transport.handlePostMessage(req, res, req.body); + } catch (error) { + console.error('Error processing MCP message via POST:', error); + res.status(400).json({ error: 'Bad Request', message: error.message }); + } + } else { + res.status(404).json({ error: 'Not Found', message: 'No active SSE connection for this ID.' }); + } +}); // Error handling middleware app.use((err, req, res, next) => { @@ -818,4 +846,4 @@ process.on('SIGTERM', () => { process.on('SIGINT', () => { console.error('SIGINT received, shutting down gracefully...'); process.exit(0); -}); \ No newline at end of file +}); diff --git a/test-http-mcp.js b/test-http-mcp.js index d87a46a..3475eff 100755 --- a/test-http-mcp.js +++ b/test-http-mcp.js @@ -4,9 +4,10 @@ * Test script for HTTP/SSE MCP Server * * This script tests the deployed MCP server by: - * 1. Connecting to the SSE endpoint - * 2. Sending MCP protocol messages - * 3. Displaying responses + * 1. Resetting the Circuit Breaker on the server. + * 2. Connecting to the SSE endpoint. + * 3. Sending MCP protocol messages sequentially. + * 4. Displaying responses and closing the connection cleanly. * * Usage: node test-http-mcp.js */ @@ -14,57 +15,85 @@ import EventSource from 'eventsource'; import fetch from 'node-fetch'; -const SERVER_URL = process.env.MCP_SERVER_URL || 'https://hpr-knowledge-base.onrender.com'; +const SERVER_URL = process.env.MCP_SERVER_URL || 'http://localhost:3000'; const SSE_ENDPOINT = `${SERVER_URL}/sse`; const MESSAGE_ENDPOINT = `${SERVER_URL}/message`; +const RESET_ENDPOINT = `${SERVER_URL}/reset`; // New endpoint for circuit breaker reset let requestId = 1; +let sse; // Declare outside for scope +let connectionId = null; // To store the connection ID from the server -console.log('๐Ÿงช Testing MCP Server over HTTP/SSE'); -console.log(`๐Ÿ“ก Server: ${SERVER_URL}`); +console.log('-- Testing MCP Server over HTTP/SSE'); +console.log(`-- Server: ${SERVER_URL}`); +console.log(`-- Message Endpoint: ${MESSAGE_ENDPOINT}`); console.log(''); -// Test health endpoint first -console.log('1๏ธโƒฃ Testing health endpoint...'); -try { - const healthResponse = await fetch(`${SERVER_URL}/health`); - const health = await healthResponse.json(); - console.log('โœ… Health check:', JSON.stringify(health, null, 2)); - console.log(''); -} catch (error) { - console.error('โŒ Health check failed:', error.message); - process.exit(1); +// === 0. Reset Circuit Breaker === +async function resetCircuitBreaker() { + console.log('0. Resetting Circuit Breaker...'); + try { + const resetResponse = await fetch(RESET_ENDPOINT, { method: 'POST' }); + const result = await resetResponse.json(); + console.log(`OK Reset check: ${result.message}`); + } catch (error) { + console.error('ERROR Circuit breaker reset failed (Server not fully up or endpoint missing):', error.message); + } + console.log(''); } -// Connect to SSE endpoint -console.log('2๏ธโƒฃ Connecting to SSE endpoint...'); -const sse = new EventSource(SSE_ENDPOINT); +// === 1. Test Health Endpoint === +async function checkHealth() { + console.log('1. Testing health endpoint...'); + try { + const healthResponse = await fetch(`${SERVER_URL}/health`); + const health = await healthResponse.json(); + console.log('OK Health check:', JSON.stringify(health, null, 2)); + console.log(''); + } catch (error) { + console.error('ERROR Health check failed:', error.message); + process.exit(1); + } +} -sse.onopen = () => { - console.log('โœ… SSE connection established'); - console.log(''); +// === 2. Connect to SSE Endpoint === +function connectSSE() { + return new Promise((resolve, reject) => { + console.log('2. Connecting to SSE endpoint...'); + // Use the EventSource polyfill to handle the SSE GET connection + sse = new EventSource(SSE_ENDPOINT); - // Run tests after connection is established - setTimeout(() => runTests(), 1000); -}; + sse.onopen = () => { + console.log('OK SSE connection established'); + // Resolve the promise once the connection is open + resolve(); + }; -sse.onmessage = (event) => { - try { - const data = JSON.parse(event.data); - console.log('๐Ÿ“จ Received:', JSON.stringify(data, null, 2)); - console.log(''); - } catch (error) { - console.log('๐Ÿ“จ Received (raw):', event.data); - console.log(''); - } -}; + sse.addEventListener('endpoint', (event) => { + // The endpoint event data contains the sessionId in the URL + const url = new URL(event.data, SERVER_URL); + connectionId = url.searchParams.get('sessionId'); + console.log(`OK Received sessionId: ${connectionId}`); + }); -sse.onerror = (error) => { - console.error('โŒ SSE error:', error); - console.log(''); -}; + sse.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + console.log('RECEIVED:', JSON.stringify(data, null, 2)); + } catch (error) { + console.log('RECEIVED (raw):', event.data); + } + console.log(''); + }; -// Send MCP messages + sse.onerror = (error) => { + // Log and ignore to let the rest of the test run, as EventSource auto-reconnects + console.error('ERROR SSE error:', error.message || JSON.stringify(error)); + }; + }); +} + +// === 3. Send MCP Message (POST) === async function sendMessage(method, params = {}) { const message = { jsonrpc: '2.0', @@ -73,92 +102,114 @@ async function sendMessage(method, params = {}) { params }; - console.log('๐Ÿ“ค Sending:', method); + console.log('SENDING:', method); console.log(JSON.stringify(message, null, 2)); console.log(''); try { const response = await fetch(MESSAGE_ENDPOINT, { method: 'POST', - headers: { 'Content-Type': 'application/json' }, + headers: { + 'Content-Type': 'application/json', + 'x-connection-id': connectionId // Include the connection ID + }, body: JSON.stringify(message) }); if (response.ok) { - console.log('โœ… Message sent successfully'); + console.log('OK Message sent successfully'); } else { - console.error('โŒ Message send failed:', response.status, response.statusText); + // Log the full error response body if it's not successful + const errorBody = await response.text(); + console.error('ERROR Message send failed:', response.status, response.statusText, errorBody); } } catch (error) { - console.error('โŒ Send error:', error.message); + console.error('ERROR Send error:', error.message); } - console.log(''); } -// Run test sequence +// === Main Test Sequence === async function runTests() { - console.log('3๏ธโƒฃ Running MCP protocol tests...'); - console.log(''); - - // Test 1: Initialize - await sendMessage('initialize', { - protocolVersion: '0.1.0', - clientInfo: { - name: 'test-client', - version: '1.0.0' - }, - capabilities: {} - }); - - await sleep(2000); - - // Test 2: List tools - await sendMessage('tools/list'); - - await sleep(2000); - - // Test 3: List resources - await sendMessage('resources/list'); - - await sleep(2000); - - // Test 4: Call a tool (search episodes) - await sendMessage('tools/call', { - name: 'search_episodes', - arguments: { - query: 'linux', - limit: 3 + // Ensure the health check runs first + await checkHealth(); + + // Ensure the circuit breaker is reset before trying to connect + await resetCircuitBreaker(); + + // Establish a fresh, single connection for the test sequence + await connectSSE(); + // Wait for connectionId to be received + while (connectionId === null) { + await sleep(100); } - }); + await sleep(1000); // Give the server a moment to finalize setup + + // Log the start of the protocol tests + console.log('3. Running MCP protocol tests...'); + console.log(''); - await sleep(2000); + // Test 1: Initialize + await sendMessage('initialize', { + protocolVersion: '0.1.0', + clientInfo: { + name: 'test-client', + version: '1.0.0' + }, + capabilities: {} + }); - // Test 5: Read a resource - await sendMessage('resources/read', { - uri: 'hpr://stats' - }); + await sleep(1000); - await sleep(3000); + // Test 2: List tools + await sendMessage('tools/list'); - console.log('โœ… All tests completed!'); - console.log(''); - console.log('๐Ÿ’ก The MCP server is working correctly over HTTP/SSE'); - console.log('๐Ÿ”ฎ Once AI tools add HTTP/SSE support, they can connect to:'); - console.log(` ${SSE_ENDPOINT}`); + await sleep(1000); - // Close connection - sse.close(); - process.exit(0); + // Test 3: List resources + await sendMessage('resources/list'); + + await sleep(1000); + + // Test 4: Call a tool (search episodes) + await sendMessage('tools/call', { + name: 'search_episodes', + arguments: { + query: 'linux', + limit: 3 + } + }); + + await sleep(1000); + + // Test 5: Read a resource + await sendMessage('resources/read', { + uri: 'hpr://stats' + }); + + await sleep(2000); + + console.log('OK All tests completed!'); + console.log(''); + console.log('-- The MCP server is working correctly over HTTP/SSE'); + console.log('-- Once AI tools add HTTP/SSE support, they can connect to:'); + console.log(` ${SSE_ENDPOINT}`); + + // Close connection explicitly at the end of the test run to stop auto-reconnects + sse.close(); + process.exit(0); } function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } +// Start the test sequence +runTests(); + // Handle Ctrl+C process.on('SIGINT', () => { - console.log('\n๐Ÿ‘‹ Closing connection...'); - sse.close(); + console.log('\n-- Closing connection...'); + if (sse) sse.close(); process.exit(0); });