Files
hpr-knowledge-base/server-http.js

850 lines
24 KiB
JavaScript
Raw Normal View History

#!/usr/bin/env node
import express from 'express';
import cors from 'cors';
import compression from 'compression';
import rateLimit from 'express-rate-limit';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import HPRDataLoader from './data-loader.js';
// Configuration
const PORT = process.env.PORT || 3000;
const MAX_CONCURRENT_REQUESTS = 10;
// Increased the timeout for the long-lived SSE connection connect() call
const REQUEST_TIMEOUT_MS = 60000; // 60 seconds (was 30s)
const RATE_LIMIT_WINDOW_MS = 60000; // 1 minute
const RATE_LIMIT_MAX_REQUESTS = 100; // 100 requests per minute per IP
const MEMORY_THRESHOLD_MB = 450;
const CIRCUIT_BREAKER_THRESHOLD = 5;
const CIRCUIT_BREAKER_TIMEOUT_MS = 60000; // 60 seconds (how long it stays OPEN)
const SSE_HEARTBEAT_INTERVAL_MS = 20000; // 20 seconds to prevent proxy timeout
// Initialize data loader
console.error('Loading HPR knowledge base data...');
const dataLoader = new HPRDataLoader();
await dataLoader.load();
console.error('Data loaded successfully!');
// Map to store active SSE transports, keyed by connectionId
const activeSseTransports = new Map();
// Circuit Breaker class for graceful degradation
class CircuitBreaker {
constructor(threshold = CIRCUIT_BREAKER_THRESHOLD, timeout = CIRCUIT_BREAKER_TIMEOUT_MS) {
this.failures = 0;
this.threshold = threshold;
this.timeout = timeout;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
this.nextAttempt = Date.now();
}
async execute(fn) {
if (this.state === 'OPEN') {
if (Date.now() < this.nextAttempt) {
throw new Error('Service temporarily unavailable. Please try again later.');
}
this.state = 'HALF_OPEN';
}
try {
const result = await fn();
if (this.state === 'HALF_OPEN') {
this.state = 'CLOSED';
this.failures = 0;
}
return result;
} catch (error) {
this.failures++;
if (this.failures >= this.threshold) {
this.state = 'OPEN';
this.nextAttempt = Date.now() + this.timeout;
console.error(`Circuit breaker opened after ${this.failures} failures`);
}
throw error;
}
}
reset() {
this.failures = 0;
this.state = 'CLOSED';
}
}
const circuitBreaker = new CircuitBreaker();
// Request timeout wrapper
function withTimeout(promise, timeoutMs = REQUEST_TIMEOUT_MS) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Request timeout')), timeoutMs)
),
]);
}
// Concurrent request limiter
let activeRequests = 0;
function checkConcurrency() {
if (activeRequests >= MAX_CONCURRENT_REQUESTS) {
throw new Error('Server at capacity. Please try again later.');
}
}
// Memory monitoring
let memoryWarning = false;
function checkMemory() {
const usage = process.memoryUsage();
const heapUsedMB = usage.heapUsed / 1024 / 1024;
if (heapUsedMB > MEMORY_THRESHOLD_MB) {
if (!memoryWarning) {
console.error(`High memory usage: ${heapUsedMB.toFixed(2)}MB`);
memoryWarning = true;
}
throw new Error('Server under high load. Please try again later.');
} else if (memoryWarning && heapUsedMB < MEMORY_THRESHOLD_MB * 0.8) {
memoryWarning = false;
}
}
setInterval(() => {
const usage = process.memoryUsage();
const heapUsedMB = usage.heapUsed / 1024 / 1024;
console.error(`Memory: ${heapUsedMB.toFixed(2)}MB, Active requests: ${activeRequests}`);
}, 30000);
// Helper function to strip HTML tags
function stripHtml(html) {
return html
.replace(/<[^>]*>/g, '')
.replace(/&nbsp;/g, ' ')
.replace(/&lt;/g, '<')
.replace(/&gt;/g, '>')
.replace(/&amp;/g, '&')
.replace(/&quot;/g, '"')
.trim();
}
// Helper to format episode for display
function formatEpisode(episode, includeNotes = false) {
const host = dataLoader.getHost(episode.hostid);
const seriesInfo = episode.series !== 0 ? dataLoader.getSeries(episode.series) : null;
let result = `# HPR${String(episode.id).padStart(4, '0')}: ${episode.title}
**Date:** ${episode.date}
**Host:** ${host?.host || 'Unknown'} (ID: ${episode.hostid})
**Duration:** ${Math.floor(episode.duration / 60)}:${String(episode.duration % 60).padStart(2, '0')}
**Tags:** ${episode.tags}
**License:** ${episode.license}
**Downloads:** ${episode.downloads}
## Summary
${episode.summary}`;
if (seriesInfo) {
result += `
## Series
**${seriesInfo.name}**: ${stripHtml(seriesInfo.description)}`;
}
if (includeNotes && episode.notes) {
result += `
## Host Notes
${stripHtml(episode.notes)}`;
}
return result;
}
// Create MCP server factory
function createMCPServer() {
const server = new Server(
{
name: 'hpr-knowledge-base',
version: '1.0.0',
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
// List available resources
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: 'hpr://stats',
mimeType: 'text/plain',
name: 'HPR Statistics',
description: 'Overall statistics about the HPR knowledge base',
},
{
uri: 'hpr://episodes/recent',
mimeType: 'text/plain',
name: 'Recent Episodes',
description: 'List of 50 most recent HPR episodes',
},
{
uri: 'hpr://hosts/all',
mimeType: 'text/plain',
name: 'All Hosts',
description: 'List of all HPR hosts',
},
{
uri: 'hpr://series/all',
mimeType: 'text/plain',
name: 'All Series',
description: 'List of all HPR series',
},
],
};
});
// Read a resource
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const uri = request.params.uri;
if (uri === 'hpr://stats') {
const stats = dataLoader.getStats();
return {
contents: [
{
uri,
mimeType: 'text/plain',
text: `# Hacker Public Radio Statistics
**Total Episodes:** ${stats.totalEpisodes}
**Total Hosts:** ${stats.totalHosts}
**Total Comments:** ${stats.totalComments}
**Total Series:** ${stats.totalSeries}
**Transcripts Available:** ${stats.totalTranscripts}
**Date Range:** ${stats.dateRange.earliest} to ${stats.dateRange.latest}
Hacker Public Radio is a community-driven podcast released under Creative Commons licenses.
All content is contributed by the community, for the community.`,
},
],
};
}
if (uri === 'hpr://episodes/recent') {
const recent = dataLoader.searchEpisodes('', { limit: 50 });
const text = recent.map(ep => {
const host = dataLoader.getHost(ep.hostid);
return `**HPR${String(ep.id).padStart(4, '0')}** (${ep.date}) - ${ep.title} by ${host?.host || 'Unknown'}`;
}).join('\n');
return {
contents: [
{
uri,
mimeType: 'text/plain',
text: `# Recent Episodes\n\n${text}`,
},
],
};
}
if (uri === 'hpr://hosts/all') {
const hosts = dataLoader.hosts
.filter(h => h.valid === 1)
.map(h => {
const episodeCount = dataLoader.getEpisodesByHost(h.hostid).length;
return `**${h.host}** (ID: ${h.hostid}) - ${episodeCount} episodes`;
})
.join('\n');
return {
contents: [
{
uri,
mimeType: 'text/plain',
text: `# All HPR Hosts\n\n${hosts}`,
},
],
};
}
if (uri === 'hpr://series/all') {
const series = dataLoader.series
.filter(s => s.valid === 1 && s.private === 0)
.map(s => {
const episodeCount = dataLoader.getEpisodesInSeries(s.id).length;
return `**${s.name}** (ID: ${s.id}) - ${episodeCount} episodes\n ${stripHtml(s.description)}`;
})
.join('\n\n');
return {
contents: [
{
uri,
mimeType: 'text/plain',
text: `# All HPR Series\n\n${series}`,
},
],
};
}
throw new Error(`Unknown resource: ${uri}`);
});
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'search_episodes',
description: 'Search HPR episodes by keywords in title, summary, tags, or host notes. Can filter by host, series, tags, and date range.',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'Search query (searches title, summary, tags, and notes)',
},
limit: {
type: 'number',
description: 'Maximum number of results to return (default: 20)',
},
hostId: {
type: 'number',
description: 'Filter by host ID',
},
seriesId: {
type: 'number',
description: 'Filter by series ID',
},
tag: {
type: 'string',
description: 'Filter by tag',
},
fromDate: {
type: 'string',
description: 'Filter episodes from this date (YYYY-MM-DD)',
},
toDate: {
type: 'string',
description: 'Filter episodes to this date (YYYY-MM-DD)',
},
},
required: [],
},
},
{
name: 'get_episode',
description: 'Get detailed information about a specific HPR episode including transcript if available',
inputSchema: {
type: 'object',
properties: {
episodeId: {
type: 'number',
description: 'Episode ID number',
},
includeTranscript: {
type: 'boolean',
description: 'Include full transcript if available (default: true)',
},
includeComments: {
type: 'boolean',
description: 'Include community comments (default: true)',
},
},
required: ['episodeId'],
},
},
{
name: 'search_transcripts',
description: 'Search through episode transcripts for specific keywords or phrases',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'Search query to find in transcripts',
},
limit: {
type: 'number',
description: 'Maximum number of episodes to return (default: 20)',
},
contextLines: {
type: 'number',
description: 'Number of lines of context around matches (default: 3)',
},
},
required: ['query'],
},
},
{
name: 'get_host_info',
description: 'Get information about an HPR host including all their episodes',
inputSchema: {
type: 'object',
properties: {
hostId: {
type: 'number',
description: 'Host ID number',
},
hostName: {
type: 'string',
description: 'Host name (will search if hostId not provided)',
},
includeEpisodes: {
type: 'boolean',
description: 'Include list of all episodes by this host (default: true)',
},
},
required: [],
},
},
{
name: 'get_series_info',
description: 'Get information about an HPR series including all episodes in the series',
inputSchema: {
type: 'object',
properties: {
seriesId: {
type: 'number',
description: 'Series ID number',
},
},
required: ['seriesId'],
},
},
],
};
});
// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
if (name === 'search_episodes') {
const results = dataLoader.searchEpisodes(args.query || '', {
limit: args.limit || 20,
hostId: args.hostId,
seriesId: args.seriesId,
tag: args.tag,
fromDate: args.fromDate,
toDate: args.toDate,
});
const text = results.length > 0
? results.map(ep => formatEpisode(ep, false)).join('\n\n---\n\n')
: 'No episodes found matching your search criteria.';
return {
content: [
{
type: 'text',
text: `# Search Results (${results.length} episodes found)\n\n${text}`,
},
],
};
}
if (name === 'get_episode') {
const episode = dataLoader.getEpisode(args.episodeId);
if (!episode) {
return {
content: [
{
type: 'text',
text: `Episode ${args.episodeId} not found.`,
},
],
};
}
let text = formatEpisode(episode, true);
// Add transcript if requested and available
if (args.includeTranscript !== false) {
const transcript = dataLoader.getTranscript(args.episodeId);
if (transcript) {
text += `\n\n## Transcript\n\n${transcript}`;
} else {
text += `\n\n## Transcript\n\n*No transcript available for this episode.*`;
}
}
// Add comments if requested
if (args.includeComments !== false) {
const comments = dataLoader.getCommentsForEpisode(args.episodeId);
if (comments.length > 0) {
text += `\n\n## Comments (${comments.length})\n\n`;
text += comments.map(c =>
`**${c.comment_author_name}** (${c.comment_timestamp})${c.comment_title ? ` - ${c.comment_title}` : ''}\n${c.comment_text}`
).join('\n\n---\n\n');
}
}
return {
content: [
{
type: 'text',
text,
},
],
};
}
if (name === 'search_transcripts') {
const results = dataLoader.searchTranscripts(args.query, {
limit: args.limit || 20,
contextLines: args.contextLines || 3,
});
if (results.length === 0) {
return {
content: [
{
type: 'text',
text: `No transcripts found containing "${args.query}".`,
},
],
};
}
const text = results.map(result => {
const { episode, matches } = result;
const host = dataLoader.getHost(episode.hostid);
let episodeText = `# HPR${String(episode.id).padStart(4, '0')}: ${episode.title}
**Host:** ${host?.host || 'Unknown'} | **Date:** ${episode.date}
**Matches found:** ${matches.length}
`;
matches.forEach(match => {
episodeText += `### Line ${match.lineNumber}
\`\`\`
${match.context}
\`\`\`
`;
});
return episodeText;
}).join('\n---\n\n');
return {
content: [
{
type: 'text',
text: `# Transcript Search Results (${results.length} episodes)\n\nSearching for: "${args.query}"\n\n${text}`,
},
],
};
}
if (name === 'get_host_info') {
let host;
if (args.hostId) {
host = dataLoader.getHost(args.hostId);
} else if (args.hostName) {
const hosts = dataLoader.searchHosts(args.hostName);
host = hosts[0];
}
if (!host) {
return {
content: [
{
type: 'text',
text: 'Host not found.',
},
],
};
}
let text = `# ${host.host}
**Host ID:** ${host.hostid}
**Email:** ${host.email}
**License:** ${host.license}
**Profile:** ${stripHtml(host.profile)}
`;
if (args.includeEpisodes !== false) {
const episodes = dataLoader.getEpisodesByHost(host.hostid);
text += `\n**Total Episodes:** ${episodes.length}\n\n## Episodes\n\n`;
// Sort by date (newest first)
episodes.sort((a, b) => b.date.localeCompare(a.date));
text += episodes.map(ep =>
`**HPR${String(ep.id).padStart(4, '0')}** (${ep.date}) - ${ep.title}\n ${ep.summary}`
).join('\n\n');
}
return {
content: [
{
type: 'text',
text,
},
],
};
}
if (name === 'get_series_info') {
const series = dataLoader.getSeries(args.seriesId);
if (!series) {
return {
content: [
{
type: 'text',
text: `Series ${args.seriesId} not found.`,
},
],
};
}
const episodes = dataLoader.getEpisodesInSeries(args.seriesId);
let text = `# ${series.name}
**Series ID:** ${series.id}
**Description:** ${stripHtml(series.description)}
**Total Episodes:** ${episodes.length}
## Episodes in Series
`;
// Sort by date
episodes.sort((a, b) => a.date.localeCompare(b.date));
text += episodes.map((ep, index) => {
const host = dataLoader.getHost(ep.hostid);
return `${index + 1}. **HPR${String(ep.id).padStart(4, '0')}** (${ep.date}) - ${ep.title} by ${host?.host || 'Unknown'}\n ${ep.summary}`;
}).join('\n\n');
return {
content: [
{
type: 'text',
text,
},
],
};
}
throw new Error(`Unknown tool: ${name}`);
} catch (error) {
return {
content: [
{
type: 'text',
text: `Error: ${error.message}`,
},
],
isError: true,
};
}
});
return server;
}
// Create Express app
const app = express();
// Create a single MCP server instance
const mcpServer = createMCPServer();
// Trust first proxy hop (Render/Heroku) without allowing arbitrary spoofing
app.set('trust proxy', 1);
// Enable CORS
app.use(cors());
// Enable compression
app.use(compression());
// Apply JSON body parsing globally for the SDK to read POST bodies.
2025-10-26 13:11:41 +00:00
app.use(express.json());
// Rate limiting
const limiter = rateLimit({
windowMs: RATE_LIMIT_WINDOW_MS,
max: RATE_LIMIT_MAX_REQUESTS,
message: 'Too many requests from this IP, please try again later.',
standardHeaders: true,
legacyHeaders: false,
});
app.use(limiter);
// Health check endpoint
app.get('/health', (req, res) => {
const usage = process.memoryUsage();
const heapUsedMB = usage.heapUsed / 1024 / 1024;
res.json({
status: 'ok',
memory: {
used: `${heapUsedMB.toFixed(2)}MB`,
threshold: `${MEMORY_THRESHOLD_MB}MB`,
},
activeRequests,
circuitBreaker: circuitBreaker.state,
});
});
// ⭐ NEW ENDPOINT: Circuit breaker reset
app.post('/reset', (req, res) => {
if (circuitBreaker.state === 'OPEN') {
circuitBreaker.reset();
console.error('Circuit breaker manually reset.');
res.json({ status: 'ok', message: 'Circuit breaker reset to CLOSED.' });
} else {
res.json({ status: 'ok', message: 'Circuit breaker already CLOSED.' });
}
});
// SSE endpoint for MCP
app.get('/sse', async (req, res) => {
let pingInterval = null;
let transport;
try {
2025-10-26 13:11:41 +00:00
// Check system health
checkMemory();
checkConcurrency();
activeRequests++;
console.error(`New SSE connection. Active requests: ${activeRequests}`);
// Create SSE transport, specifying the POST message path
transport = new SSEServerTransport('/message', res);
activeSseTransports.set(transport.sessionId, transport);
// Connect server with timeout and circuit breaker
// This calls transport.start() internally, which sets up headers and sends the endpoint event.
await circuitBreaker.execute(() => mcpServer.connect(transport));
// 2. Start the heartbeat/ping interval (after transport.start() has set up res.write)
pingInterval = setInterval(() => {
2025-10-26 13:11:41 +00:00
// Send a comment line every 20s to keep the proxy alive
res.write(':\n');
}, SSE_HEARTBEAT_INTERVAL_MS);
// Handle connection close (will execute when client closes the connection)
req.on('close', () => {
activeRequests--;
if (pingInterval) {
2025-10-26 13:11:41 +00:00
clearInterval(pingInterval);
}
if (transport) {
activeSseTransports.delete(transport.sessionId);
}
console.error(`SSE connection closed. Active requests: ${activeRequests}`);
// Ensure the server stream is ended gracefully if it hasn't already
if (!res.writableEnded) {
res.end();
}
});
} catch (error) {
// Handle error during connection establishment or connection timeout
activeRequests--;
if (pingInterval) {
2025-10-26 13:11:41 +00:00
clearInterval(pingInterval);
}
if (transport) {
activeSseTransports.delete(transport.sessionId);
}
console.error('SSE connection error:', error.message);
if (!res.headersSent) {
// Case 1: Error before SSE headers were flushed (e.g., checkMemory failed)
// We can still set the status code.
res.status(503).json({
error: error.message,
circuitBreaker: circuitBreaker.state,
});
} else {
// Case 2: Error after SSE headers were flushed (stream is open)
2025-10-26 13:11:41 +00:00
// Send an SSE 'error' event and end the connection.
const errorData = JSON.stringify({
message: error.message,
circuitBreaker: circuitBreaker.state
});
res.write(`event: error\ndata: ${errorData}\n\n`);
2025-10-26 13:11:41 +00:00
res.end();
}
}
});
// POST endpoint for MCP messages
app.post('/message', async (req, res) => {
const connectionId = req.headers['x-connection-id'];
const transport = activeSseTransports.get(connectionId);
2025-10-26 13:11:41 +00:00
if (transport) {
try {
await transport.handlePostMessage(req, res, req.body);
} catch (error) {
console.error('Error processing MCP message via POST:', error);
res.status(400).json({ error: 'Bad Request', message: error.message });
}
} else {
res.status(404).json({ error: 'Not Found', message: 'No active SSE connection for this ID.' });
}
});
// Error handling middleware
app.use((err, req, res, next) => {
console.error('Express error:', err);
res.status(500).json({
error: 'Internal server error',
message: err.message,
});
});
// Start server
app.listen(PORT, () => {
console.error(`HPR Knowledge Base MCP Server running on http://localhost:${PORT}`);
console.error(`SSE endpoint: http://localhost:${PORT}/sse`);
console.error(`Health check: http://localhost:${PORT}/health`);
console.error(`Configuration:`);
console.error(` - Max concurrent requests: ${MAX_CONCURRENT_REQUESTS}`);
console.error(` - Request timeout: ${REQUEST_TIMEOUT_MS}ms`);
console.error(` - Rate limit: ${RATE_LIMIT_MAX_REQUESTS} requests per ${RATE_LIMIT_WINDOW_MS / 1000}s`);
console.error(` - Memory threshold: ${MEMORY_THRESHOLD_MB}MB`);
console.error(` - SSE Heartbeat: ${SSE_HEARTBEAT_INTERVAL_MS / 1000}s`);
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.error('SIGTERM received, shutting down gracefully...');
process.exit(0);
});
process.on('SIGINT', () => {
console.error('SIGINT received, shutting down gracefully...');
process.exit(0);
});