manually send the initial SSE headers before the main server connection starts and add a background process to send keep-alive pings
This commit is contained in:
@@ -23,6 +23,7 @@ const RATE_LIMIT_MAX_REQUESTS = 50; // 50 requests per minute per IP
|
|||||||
const MEMORY_THRESHOLD_MB = 450;
|
const MEMORY_THRESHOLD_MB = 450;
|
||||||
const CIRCUIT_BREAKER_THRESHOLD = 5;
|
const CIRCUIT_BREAKER_THRESHOLD = 5;
|
||||||
const CIRCUIT_BREAKER_TIMEOUT_MS = 60000;
|
const CIRCUIT_BREAKER_TIMEOUT_MS = 60000;
|
||||||
|
const SSE_HEARTBEAT_INTERVAL_MS = 20000; // 20 seconds to prevent proxy timeout
|
||||||
|
|
||||||
// Initialize data loader
|
// Initialize data loader
|
||||||
console.error('Loading HPR knowledge base data...');
|
console.error('Loading HPR knowledge base data...');
|
||||||
@@ -662,7 +663,6 @@ ${match.context}
|
|||||||
const app = express();
|
const app = express();
|
||||||
|
|
||||||
// Trust proxy headers (required for Render, Heroku, etc.)
|
// Trust proxy headers (required for Render, Heroku, etc.)
|
||||||
// This allows rate limiting to work correctly behind reverse proxies
|
|
||||||
app.set('trust proxy', true);
|
app.set('trust proxy', true);
|
||||||
|
|
||||||
// Enable CORS
|
// Enable CORS
|
||||||
@@ -700,6 +700,8 @@ app.get('/health', (req, res) => {
|
|||||||
|
|
||||||
// SSE endpoint for MCP
|
// SSE endpoint for MCP
|
||||||
app.get('/sse', async (req, res) => {
|
app.get('/sse', async (req, res) => {
|
||||||
|
let pingInterval = null; // Declare pingInterval to be accessible in req.on('close')
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Check system health
|
// Check system health
|
||||||
checkMemory();
|
checkMemory();
|
||||||
@@ -708,6 +710,31 @@ app.get('/sse', async (req, res) => {
|
|||||||
activeRequests++;
|
activeRequests++;
|
||||||
console.error(`New SSE connection. Active requests: ${activeRequests}`);
|
console.error(`New SSE connection. Active requests: ${activeRequests}`);
|
||||||
|
|
||||||
|
// === START SSE FIXES FOR RENDER/PROXY TIMEOUTS ===
|
||||||
|
|
||||||
|
// 1. Send no-buffering headers and flush immediately
|
||||||
|
res.writeHead(200, {
|
||||||
|
'Content-Type': 'text/event-stream',
|
||||||
|
'Cache-Control': 'no-cache',
|
||||||
|
'Connection': 'keep-alive',
|
||||||
|
// CRITICAL: Tells proxies (like NGINX) not to buffer the response
|
||||||
|
'X-Accel-Buffering': 'no',
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send an initial comment (ping) to complete the HTTP handshake and flush headers
|
||||||
|
res.write(':\n');
|
||||||
|
res.flushHeaders();
|
||||||
|
|
||||||
|
// 2. Start the heartbeat/ping interval
|
||||||
|
pingInterval = setInterval(() => {
|
||||||
|
// Send a comment line (which is ignored by clients) every 20s
|
||||||
|
res.write(':\n');
|
||||||
|
// NOTE: In a real-world scenario, you might also need to call res.flush()
|
||||||
|
// or ensure your underlying HTTP library flushes the write.
|
||||||
|
}, SSE_HEARTBEAT_INTERVAL_MS);
|
||||||
|
|
||||||
|
// === END SSE FIXES ===
|
||||||
|
|
||||||
// Create a new MCP server instance for this connection
|
// Create a new MCP server instance for this connection
|
||||||
const server = createMCPServer();
|
const server = createMCPServer();
|
||||||
|
|
||||||
@@ -723,19 +750,28 @@ app.get('/sse', async (req, res) => {
|
|||||||
// Handle connection close
|
// Handle connection close
|
||||||
req.on('close', () => {
|
req.on('close', () => {
|
||||||
activeRequests--;
|
activeRequests--;
|
||||||
|
if (pingInterval) {
|
||||||
|
clearInterval(pingInterval); // STOP THE PING ON CLOSE
|
||||||
|
}
|
||||||
console.error(`SSE connection closed. Active requests: ${activeRequests}`);
|
console.error(`SSE connection closed. Active requests: ${activeRequests}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
activeRequests--;
|
activeRequests--;
|
||||||
|
if (pingInterval) {
|
||||||
|
clearInterval(pingInterval); // Stop ping on connection failure
|
||||||
|
}
|
||||||
console.error('SSE connection error:', error.message);
|
console.error('SSE connection error:', error.message);
|
||||||
|
|
||||||
if (!res.headersSent) {
|
if (!res.headersSent) {
|
||||||
|
// Send a 503 error if headers haven't been sent yet
|
||||||
res.status(503).json({
|
res.status(503).json({
|
||||||
error: error.message,
|
error: error.message,
|
||||||
circuitBreaker: circuitBreaker.state,
|
circuitBreaker: circuitBreaker.state,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
// If headers were sent (i.e., stream started but failed mid-way),
|
||||||
|
// simply let the connection close, which the client will see as an error.
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -769,6 +805,7 @@ app.listen(PORT, () => {
|
|||||||
console.error(` - Request timeout: ${REQUEST_TIMEOUT_MS}ms`);
|
console.error(` - Request timeout: ${REQUEST_TIMEOUT_MS}ms`);
|
||||||
console.error(` - Rate limit: ${RATE_LIMIT_MAX_REQUESTS} requests per ${RATE_LIMIT_WINDOW_MS / 1000}s`);
|
console.error(` - Rate limit: ${RATE_LIMIT_MAX_REQUESTS} requests per ${RATE_LIMIT_WINDOW_MS / 1000}s`);
|
||||||
console.error(` - Memory threshold: ${MEMORY_THRESHOLD_MB}MB`);
|
console.error(` - Memory threshold: ${MEMORY_THRESHOLD_MB}MB`);
|
||||||
|
console.error(` - SSE Heartbeat: ${SSE_HEARTBEAT_INTERVAL_MS / 1000}s`);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Graceful shutdown
|
// Graceful shutdown
|
||||||
@@ -780,4 +817,4 @@ process.on('SIGTERM', () => {
|
|||||||
process.on('SIGINT', () => {
|
process.on('SIGINT', () => {
|
||||||
console.error('SIGINT received, shutting down gracefully...');
|
console.error('SIGINT received, shutting down gracefully...');
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
});
|
});
|
||||||
Reference in New Issue
Block a user