Files
OpenWebUI-Discordbot/scripts/discordbot.py
Josh Knapp 240330cf3b Refactor to use LiteLLM Responses API for automatic MCP tool execution
Major refactoring to properly integrate with LiteLLM's Responses API, which handles
MCP tool execution automatically instead of requiring manual tool call loops.

Key changes:
- Switched from chat.completions.create() to client.responses.create()
- Use "server_url": "litellm_proxy" to leverage LiteLLM as MCP gateway
- Set "require_approval": "never" for fully automatic tool execution
- Simplified get_available_mcp_tools() to get_available_mcp_servers()
- Removed manual OpenAI tool format conversion (LiteLLM handles this)
- Updated response extraction to use output[0].content[0].text format
- Convert system prompts to user role for Responses API compatibility

Technical improvements:
- LiteLLM now handles the complete tool calling loop automatically
- No more placeholder responses - actual MCP tools will execute
- Cleaner code with ~100 fewer lines
- Better separation between tools-enabled and tools-disabled paths
- Proper error handling for Responses API format

Responses API benefits:
- Single API call returns final response with tool results integrated
- Automatic tool discovery, execution, and result formatting
- No manual tracking of tool_call_ids or conversation state
- Native MCP support via server_label configuration

Documentation:
- Added comprehensive litellm-mcp-research.md with API examples
- Documented Responses API vs chat.completions differences
- Included Discord bot migration patterns
- Covered authentication, streaming, and tool restrictions

Next steps:
- Test with actual Discord interactions
- Verify GitHub MCP tools execute correctly
- Monitor response extraction for edge cases

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-12 10:32:04 -08:00

476 lines
19 KiB
Python

import os
import discord
from discord.ext import commands
from openai import OpenAI
import base64
from dotenv import load_dotenv
import aiohttp
from typing import Dict, Any, List
import tiktoken
import httpx
# Load environment variables
load_dotenv()
# Get environment variables
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN')
LITELLM_API_KEY = os.getenv('LITELLM_API_KEY')
LITELLM_API_BASE = os.getenv('LITELLM_API_BASE')
MODEL_NAME = os.getenv('MODEL_NAME')
SYSTEM_PROMPT_FILE = os.getenv('SYSTEM_PROMPT_FILE', './system_prompt.txt')
MAX_HISTORY_TOKENS = int(os.getenv('MAX_HISTORY_TOKENS', '3000'))
DEBUG_LOGGING = os.getenv('DEBUG_LOGGING', 'false').lower() == 'true'
ENABLE_TOOLS = os.getenv('ENABLE_TOOLS', 'false').lower() == 'true'
def debug_log(message: str):
"""Print debug message if DEBUG_LOGGING is enabled"""
if DEBUG_LOGGING:
print(f"[DEBUG] {message}")
# Load system prompt from file
def load_system_prompt():
"""Load system prompt from file, with fallback to default"""
try:
with open(SYSTEM_PROMPT_FILE, 'r', encoding='utf-8') as f:
return f.read().strip()
except FileNotFoundError:
return "You are a helpful AI assistant integrated into Discord."
SYSTEM_PROMPT = load_system_prompt()
# Configure OpenAI client to point to LiteLLM
client = OpenAI(
api_key=LITELLM_API_KEY,
base_url=LITELLM_API_BASE # e.g., "http://localhost:4000"
)
# Initialize tokenizer for token counting
try:
encoding = tiktoken.encoding_for_model("gpt-4")
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
# Initialize Discord bot
intents = discord.Intents.default()
intents.message_content = True
intents.messages = True
bot = commands.Bot(command_prefix='!', intents=intents)
# Message history cache - stores recent conversations per channel
channel_history: Dict[int, List[Dict[str, Any]]] = {}
def count_tokens(text: str) -> int:
"""Count tokens in a text string"""
try:
return len(encoding.encode(text))
except Exception:
# Fallback: rough estimate (1 token ≈ 4 characters)
return len(text) // 4
async def download_image(url: str) -> str | None:
"""Download image and convert to base64 using async aiohttp"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
image_data = await response.read()
base64_image = base64.b64encode(image_data).decode('utf-8')
return base64_image
except Exception as e:
print(f"Error downloading image from {url}: {e}")
return None
async def get_available_mcp_servers():
"""Query LiteLLM for available MCP servers (used with Responses API)"""
try:
base_url = LITELLM_API_BASE.rstrip('/')
headers = {"x-litellm-api-key": LITELLM_API_KEY}
async with httpx.AsyncClient(timeout=30.0) as http_client:
# Get MCP server configuration
server_response = await http_client.get(
f"{base_url}/v1/mcp/server",
headers=headers
)
if server_response.status_code == 200:
server_info = server_response.json()
server_count = len(server_info) if isinstance(server_info, list) else 0
debug_log(f"MCP server info: found {server_count} servers")
if server_count > 0:
# Log server names for visibility
server_names = [s.get("server_name") for s in server_info if isinstance(s, dict) and s.get("server_name")]
debug_log(f"Available MCP servers: {server_names}")
return {"server": server_info}
else:
debug_log(f"MCP server endpoint returned {server_response.status_code}: {server_response.text}")
except Exception as e:
debug_log(f"Error fetching MCP servers: {e}")
return None
async def get_chat_history(channel, bot_user_id: int, limit: int = 50) -> List[Dict[str, Any]]:
"""
Retrieve chat history and format as proper conversation messages.
Only includes messages relevant to bot conversations.
Returns list of message dicts with proper role attribution.
Supports both regular channels and threads.
"""
messages = []
total_tokens = 0
# Check if this is a thread
is_thread = isinstance(channel, discord.Thread)
debug_log(f"Fetching history - is_thread: {is_thread}, channel: {channel.name if hasattr(channel, 'name') else 'DM'}")
# For threads, we want ALL messages in the thread (not just bot-related)
# For channels, we only want bot-related messages
message_count = 0
skipped_system = 0
# For threads, fetch the context including parent message if it exists
if is_thread:
try:
# Get the starter message (first message in thread)
if channel.starter_message:
starter = channel.starter_message
else:
starter = await channel.fetch_message(channel.id)
# If the starter message is replying to another message, fetch that parent
if starter and starter.reference and starter.reference.message_id:
try:
parent_message = await channel.parent.fetch_message(starter.reference.message_id)
if parent_message and (parent_message.type == discord.MessageType.default or parent_message.type == discord.MessageType.reply):
is_bot_parent = parent_message.author.id == bot_user_id
role = "assistant" if is_bot_parent else "user"
content = f"{parent_message.author.display_name}: {parent_message.content}" if not is_bot_parent else parent_message.content
# Remove bot mention if present
if not is_bot_parent and bot_user_id:
content = content.replace(f'<@{bot_user_id}>', '').strip()
msg = {"role": role, "content": content}
msg_tokens = count_tokens(content)
if msg_tokens <= MAX_HISTORY_TOKENS:
messages.append(msg)
total_tokens += msg_tokens
message_count += 1
debug_log(f"Added parent message: role={role}, content_preview={content[:50]}...")
except Exception as e:
debug_log(f"Could not fetch parent message: {e}")
# Add the starter message itself
if starter and (starter.type == discord.MessageType.default or starter.type == discord.MessageType.reply):
is_bot_starter = starter.author.id == bot_user_id
role = "assistant" if is_bot_starter else "user"
content = f"{starter.author.display_name}: {starter.content}" if not is_bot_starter else starter.content
# Remove bot mention if present
if not is_bot_starter and bot_user_id:
content = content.replace(f'<@{bot_user_id}>', '').strip()
msg = {"role": role, "content": content}
msg_tokens = count_tokens(content)
if total_tokens + msg_tokens <= MAX_HISTORY_TOKENS:
messages.append(msg)
total_tokens += msg_tokens
message_count += 1
debug_log(f"Added thread starter: role={role}, content_preview={content[:50]}...")
except Exception as e:
debug_log(f"Could not fetch thread messages: {e}")
# Fetch history from the channel/thread
async for message in channel.history(limit=limit):
message_count += 1
# Skip system messages (thread starters, pins, etc.)
if message.type != discord.MessageType.default and message.type != discord.MessageType.reply:
skipped_system += 1
debug_log(f"Skipping system message type: {message.type}")
continue
# Determine if we should include this message
is_bot_message = message.author.id == bot_user_id
is_bot_mentioned = any(mention.id == bot_user_id for mention in message.mentions)
is_dm = isinstance(channel, discord.DMChannel)
# In threads: include ALL messages for full context
# In regular channels: only include bot-related messages
# In DMs: include all messages
if is_thread or is_dm:
should_include = True
else:
should_include = is_bot_message or is_bot_mentioned
if not should_include:
continue
# Determine role
role = "assistant" if is_bot_message else "user"
# Build content with author name in threads for multi-user context
if is_thread and not is_bot_message:
# Include username in threads for clarity
content = f"{message.author.display_name}: {message.content}"
else:
content = message.content
# Remove bot mention from user messages
if not is_bot_message and is_bot_mentioned:
content = content.replace(f'<@{bot_user_id}>', '').strip()
# Note: We'll handle images separately in the main flow
# For history, we just note that images were present
if message.attachments:
image_count = sum(1 for att in message.attachments
if any(att.filename.lower().endswith(ext)
for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']))
if image_count > 0:
content += f" [attached {image_count} image(s)]"
# Add to messages with token counting
msg = {"role": role, "content": content}
msg_tokens = count_tokens(content)
# Check if adding this message would exceed token limit
if total_tokens + msg_tokens > MAX_HISTORY_TOKENS:
break
messages.append(msg)
total_tokens += msg_tokens
debug_log(f"Added message: role={role}, content_preview={content[:50]}...")
# Reverse to get chronological order (oldest first)
debug_log(f"Processed {message_count} messages, skipped {skipped_system} system messages")
debug_log(f"Total messages collected: {len(messages)}, total tokens: {total_tokens}")
return list(reversed(messages))
async def get_ai_response(history_messages: List[Dict[str, Any]], user_message: str, image_urls: List[str] = None) -> str:
"""
Get AI response using LiteLLM Responses API with automatic MCP tool execution.
Args:
history_messages: List of previous conversation messages with roles
user_message: Current user message
image_urls: Optional list of image URLs to include
Returns:
AI response string
"""
try:
# When tools are enabled, use Responses API with MCP for automatic tool execution
if ENABLE_TOOLS:
debug_log("Tools enabled - using Responses API with MCP auto-execution")
# Query MCP server info to get server_label
mcp_info = await get_available_mcp_servers()
# Build input array with system prompt, history, and current message
input_messages = []
# Add system prompt as developer role (newer models) or user role
input_messages.append({
"role": "user", # System messages converted to user for Responses API
"content": f"[System Instructions]\n{SYSTEM_PROMPT}"
})
# Add conversation history
for msg in history_messages:
input_messages.append({
"role": msg["role"],
"content": msg["content"]
})
# Build current user message
if image_urls:
# Multi-modal message with text and images
content_parts = [{"type": "text", "text": user_message}]
for url in image_urls:
base64_image = await download_image(url)
if base64_image:
content_parts.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
})
input_messages.append({"role": "user", "content": content_parts})
else:
input_messages.append({"role": "user", "content": user_message})
# Build MCP tools configuration
tools_config = []
if mcp_info and isinstance(mcp_info, dict):
server_list = mcp_info.get("server", [])
if isinstance(server_list, list) and len(server_list) > 0:
for server_info in server_list:
server_name = server_info.get("server_name")
if server_name:
tools_config.append({
"type": "mcp",
"server_label": server_name,
"server_url": "litellm_proxy", # Use LiteLLM as MCP gateway
"require_approval": "never" # Automatic tool execution
})
debug_log(f"Added MCP server '{server_name}' with auto-execution")
if not tools_config:
debug_log("No MCP servers found, falling back to standard chat completions")
# Fall through to standard chat completions below
else:
# Use Responses API with MCP tools
debug_log(f"Calling Responses API with {len(tools_config)} MCP servers")
response = client.responses.create(
model=MODEL_NAME,
input=input_messages,
tools=tools_config,
stream=False
)
debug_log(f"Response status: {response.status}")
# Extract text from Responses API format
if hasattr(response, 'output') and len(response.output) > 0:
for output in response.output:
if hasattr(output, 'type') and output.type == "message":
if hasattr(output, 'content') and len(output.content) > 0:
for content in output.content:
if hasattr(content, 'type') and content.type == "output_text":
return content.text
debug_log(f"Unexpected response format: {response}")
return "I received a response but couldn't extract the text. Please try again."
# Standard chat completions (when tools disabled or MCP not available)
debug_log("Using standard chat completions")
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(history_messages)
if image_urls:
content_parts = [{"type": "text", "text": user_message}]
for url in image_urls:
base64_image = await download_image(url)
if base64_image:
content_parts.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
})
messages.append({"role": "user", "content": content_parts})
else:
messages.append({"role": "user", "content": user_message})
response = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
error_msg = f"Error calling LiteLLM API: {str(e)}"
print(error_msg)
debug_log(f"Exception details: {e}")
return error_msg
@bot.event
async def on_message(message):
# Ignore messages from the bot itself
if message.author == bot.user:
return
# Ignore system messages (thread starter, pins, etc.)
if message.type != discord.MessageType.default and message.type != discord.MessageType.reply:
return
should_respond = False
# Check if bot was mentioned
if bot.user in message.mentions:
should_respond = True
# Check if message is a DM
if isinstance(message.channel, discord.DMChannel):
should_respond = True
# Check if message is in a thread
if isinstance(message.channel, discord.Thread):
# Check if thread was started from a bot message
try:
starter = message.channel.starter_message
if not starter:
starter = await message.channel.fetch_message(message.channel.id)
# If thread was started from bot's message, auto-respond
if starter and starter.author.id == bot.user.id:
should_respond = True
debug_log("Thread started by bot - auto-responding")
# If thread started from user message, only respond if mentioned
elif bot.user in message.mentions:
should_respond = True
debug_log("Thread started by user - responding due to mention")
except Exception as e:
debug_log(f"Could not determine thread starter: {e}")
# Default: only respond if mentioned
if bot.user in message.mentions:
should_respond = True
if should_respond:
async with message.channel.typing():
# Get chat history with proper conversation format
history_messages = await get_chat_history(message.channel, bot.user.id)
# Remove bot mention from the message
user_message = message.content.replace(f'<@{bot.user.id}>', '').strip()
# Collect image URLs from the message
image_urls = []
for attachment in message.attachments:
if any(attachment.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']):
image_urls.append(attachment.url)
# Get AI response with proper conversation history
response = await get_ai_response(history_messages, user_message, image_urls if image_urls else None)
# Send response (split if too long for Discord's 2000 char limit)
if len(response) > 2000:
# Split into chunks
chunks = [response[i:i+2000] for i in range(0, len(response), 2000)]
for chunk in chunks:
await message.reply(chunk)
else:
await message.reply(response)
await bot.process_commands(message)
@bot.event
async def on_ready():
print(f'{bot.user} has connected to Discord!')
def main():
if not all([DISCORD_TOKEN, LITELLM_API_KEY, LITELLM_API_BASE, MODEL_NAME]):
print("Error: Missing required environment variables")
print(f"DISCORD_TOKEN: {'' if DISCORD_TOKEN else ''}")
print(f"LITELLM_API_KEY: {'' if LITELLM_API_KEY else ''}")
print(f"LITELLM_API_BASE: {'' if LITELLM_API_BASE else ''}")
print(f"MODEL_NAME: {'' if MODEL_NAME else ''}")
return
print(f"System Prompt loaded from: {SYSTEM_PROMPT_FILE}")
print(f"Max history tokens: {MAX_HISTORY_TOKENS}")
bot.run(DISCORD_TOKEN)
if __name__ == "__main__":
main()