import os import discord from discord.ext import commands from openai import OpenAI import base64 from dotenv import load_dotenv import aiohttp from typing import Dict, Any, List import tiktoken import httpx # Load environment variables load_dotenv() # Get environment variables DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') LITELLM_API_KEY = os.getenv('LITELLM_API_KEY') LITELLM_API_BASE = os.getenv('LITELLM_API_BASE') MODEL_NAME = os.getenv('MODEL_NAME') SYSTEM_PROMPT_FILE = os.getenv('SYSTEM_PROMPT_FILE', './system_prompt.txt') MAX_HISTORY_TOKENS = int(os.getenv('MAX_HISTORY_TOKENS', '3000')) DEBUG_LOGGING = os.getenv('DEBUG_LOGGING', 'false').lower() == 'true' ENABLE_TOOLS = os.getenv('ENABLE_TOOLS', 'false').lower() == 'true' def debug_log(message: str): """Print debug message if DEBUG_LOGGING is enabled""" if DEBUG_LOGGING: print(f"[DEBUG] {message}") # Load system prompt from file def load_system_prompt(): """Load system prompt from file, with fallback to default""" try: with open(SYSTEM_PROMPT_FILE, 'r', encoding='utf-8') as f: return f.read().strip() except FileNotFoundError: return "You are a helpful AI assistant integrated into Discord." SYSTEM_PROMPT = load_system_prompt() # Configure OpenAI client to point to LiteLLM client = OpenAI( api_key=LITELLM_API_KEY, base_url=LITELLM_API_BASE # e.g., "http://localhost:4000" ) # Initialize tokenizer for token counting try: encoding = tiktoken.encoding_for_model("gpt-4") except KeyError: encoding = tiktoken.get_encoding("cl100k_base") # Initialize Discord bot intents = discord.Intents.default() intents.message_content = True intents.messages = True bot = commands.Bot(command_prefix='!', intents=intents) # Message history cache - stores recent conversations per channel channel_history: Dict[int, List[Dict[str, Any]]] = {} def count_tokens(text: str) -> int: """Count tokens in a text string""" try: return len(encoding.encode(text)) except Exception: # Fallback: rough estimate (1 token ≈ 4 characters) return len(text) // 4 async def download_image(url: str) -> str | None: """Download image and convert to base64 using async aiohttp""" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: if response.status == 200: image_data = await response.read() base64_image = base64.b64encode(image_data).decode('utf-8') return base64_image except Exception as e: print(f"Error downloading image from {url}: {e}") return None async def get_available_mcp_tools(): """Query LiteLLM for available MCP servers and tools, convert to OpenAI format""" try: base_url = LITELLM_API_BASE.rstrip('/') headers = {"x-litellm-api-key": LITELLM_API_KEY} async with httpx.AsyncClient(timeout=30.0) as http_client: # Get MCP server configuration server_response = await http_client.get( f"{base_url}/v1/mcp/server", headers=headers ) if server_response.status_code == 200: server_info = server_response.json() debug_log(f"MCP server info: found {len(server_info) if isinstance(server_info, list) else 0} servers") # Get available MCP tools tools_response = await http_client.get( f"{base_url}/v1/mcp/tools", headers=headers ) if tools_response.status_code == 200: tools_data = tools_response.json() # Tools come in format: {"tools": [...]} mcp_tools = tools_data.get("tools", []) if isinstance(tools_data, dict) else tools_data debug_log(f"Found {len(mcp_tools) if isinstance(mcp_tools, list) else 0} MCP tools") # Convert MCP tools to OpenAI function calling format openai_tools = [] for tool in mcp_tools[:50]: # Limit to first 50 tools to avoid overwhelming the model if isinstance(tool, dict) and "name" in tool: openai_tool = { "type": "function", "function": { "name": tool["name"], "description": tool.get("description", ""), "parameters": tool.get("inputSchema", {}) } } openai_tools.append(openai_tool) debug_log(f"Converted {len(openai_tools)} tools to OpenAI format") # Return both server info and converted tools return { "server": server_info, "tools": openai_tools, "tool_count": len(openai_tools) } else: debug_log(f"MCP tools endpoint returned {tools_response.status_code}: {tools_response.text}") else: debug_log(f"MCP server endpoint returned {server_response.status_code}: {server_response.text}") except Exception as e: debug_log(f"Error fetching MCP tools: {e}") return None async def get_chat_history(channel, bot_user_id: int, limit: int = 50) -> List[Dict[str, Any]]: """ Retrieve chat history and format as proper conversation messages. Only includes messages relevant to bot conversations. Returns list of message dicts with proper role attribution. Supports both regular channels and threads. """ messages = [] total_tokens = 0 # Check if this is a thread is_thread = isinstance(channel, discord.Thread) debug_log(f"Fetching history - is_thread: {is_thread}, channel: {channel.name if hasattr(channel, 'name') else 'DM'}") # For threads, we want ALL messages in the thread (not just bot-related) # For channels, we only want bot-related messages message_count = 0 skipped_system = 0 # For threads, fetch the context including parent message if it exists if is_thread: try: # Get the starter message (first message in thread) if channel.starter_message: starter = channel.starter_message else: starter = await channel.fetch_message(channel.id) # If the starter message is replying to another message, fetch that parent if starter and starter.reference and starter.reference.message_id: try: parent_message = await channel.parent.fetch_message(starter.reference.message_id) if parent_message and (parent_message.type == discord.MessageType.default or parent_message.type == discord.MessageType.reply): is_bot_parent = parent_message.author.id == bot_user_id role = "assistant" if is_bot_parent else "user" content = f"{parent_message.author.display_name}: {parent_message.content}" if not is_bot_parent else parent_message.content # Remove bot mention if present if not is_bot_parent and bot_user_id: content = content.replace(f'<@{bot_user_id}>', '').strip() msg = {"role": role, "content": content} msg_tokens = count_tokens(content) if msg_tokens <= MAX_HISTORY_TOKENS: messages.append(msg) total_tokens += msg_tokens message_count += 1 debug_log(f"Added parent message: role={role}, content_preview={content[:50]}...") except Exception as e: debug_log(f"Could not fetch parent message: {e}") # Add the starter message itself if starter and (starter.type == discord.MessageType.default or starter.type == discord.MessageType.reply): is_bot_starter = starter.author.id == bot_user_id role = "assistant" if is_bot_starter else "user" content = f"{starter.author.display_name}: {starter.content}" if not is_bot_starter else starter.content # Remove bot mention if present if not is_bot_starter and bot_user_id: content = content.replace(f'<@{bot_user_id}>', '').strip() msg = {"role": role, "content": content} msg_tokens = count_tokens(content) if total_tokens + msg_tokens <= MAX_HISTORY_TOKENS: messages.append(msg) total_tokens += msg_tokens message_count += 1 debug_log(f"Added thread starter: role={role}, content_preview={content[:50]}...") except Exception as e: debug_log(f"Could not fetch thread messages: {e}") # Fetch history from the channel/thread async for message in channel.history(limit=limit): message_count += 1 # Skip system messages (thread starters, pins, etc.) if message.type != discord.MessageType.default and message.type != discord.MessageType.reply: skipped_system += 1 debug_log(f"Skipping system message type: {message.type}") continue # Determine if we should include this message is_bot_message = message.author.id == bot_user_id is_bot_mentioned = any(mention.id == bot_user_id for mention in message.mentions) is_dm = isinstance(channel, discord.DMChannel) # In threads: include ALL messages for full context # In regular channels: only include bot-related messages # In DMs: include all messages if is_thread or is_dm: should_include = True else: should_include = is_bot_message or is_bot_mentioned if not should_include: continue # Determine role role = "assistant" if is_bot_message else "user" # Build content with author name in threads for multi-user context if is_thread and not is_bot_message: # Include username in threads for clarity content = f"{message.author.display_name}: {message.content}" else: content = message.content # Remove bot mention from user messages if not is_bot_message and is_bot_mentioned: content = content.replace(f'<@{bot_user_id}>', '').strip() # Note: We'll handle images separately in the main flow # For history, we just note that images were present if message.attachments: image_count = sum(1 for att in message.attachments if any(att.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp'])) if image_count > 0: content += f" [attached {image_count} image(s)]" # Add to messages with token counting msg = {"role": role, "content": content} msg_tokens = count_tokens(content) # Check if adding this message would exceed token limit if total_tokens + msg_tokens > MAX_HISTORY_TOKENS: break messages.append(msg) total_tokens += msg_tokens debug_log(f"Added message: role={role}, content_preview={content[:50]}...") # Reverse to get chronological order (oldest first) debug_log(f"Processed {message_count} messages, skipped {skipped_system} system messages") debug_log(f"Total messages collected: {len(messages)}, total tokens: {total_tokens}") return list(reversed(messages)) async def get_ai_response(history_messages: List[Dict[str, Any]], user_message: str, image_urls: List[str] = None) -> str: """ Get AI response using LiteLLM with proper conversation history and tool calling support. Args: history_messages: List of previous conversation messages with roles user_message: Current user message image_urls: Optional list of image URLs to include Returns: AI response string """ # Start with system prompt messages = [{"role": "system", "content": SYSTEM_PROMPT}] # Add conversation history messages.extend(history_messages) # Build current user message if image_urls: # Multi-modal message with text and images content_parts = [{"type": "text", "text": user_message}] for url in image_urls: base64_image = await download_image(url) if base64_image: content_parts.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } }) messages.append({"role": "user", "content": content_parts}) else: # Text-only message messages.append({"role": "user", "content": user_message}) try: # Build request parameters request_params = { "model": MODEL_NAME, "messages": messages, "temperature": 0.7, } # Add MCP tools if enabled if ENABLE_TOOLS: debug_log("Tools enabled - fetching and converting MCP tools") # Query and convert MCP tools to OpenAI format mcp_info = await get_available_mcp_tools() if mcp_info and isinstance(mcp_info, dict): openai_tools = mcp_info.get("tools", []) if openai_tools and isinstance(openai_tools, list) and len(openai_tools) > 0: request_params["tools"] = openai_tools request_params["tool_choice"] = "auto" debug_log(f"Added {len(openai_tools)} tools to request") else: debug_log("No tools available to add to request") else: debug_log("Failed to fetch MCP tools") debug_log(f"Calling chat completions with {len(request_params.get('tools', []))} tools") response = client.chat.completions.create(**request_params) # Handle tool calls if present response_message = response.choices[0].message tool_calls = getattr(response_message, 'tool_calls', None) if tool_calls and len(tool_calls) > 0: debug_log(f"Model requested {len(tool_calls)} tool calls") # Add assistant's response with tool calls to messages messages.append(response_message) # Execute each tool call - add placeholder responses # TODO: Implement actual MCP tool execution via LiteLLM proxy for tool_call in tool_calls: function_name = tool_call.function.name function_args = tool_call.function.arguments debug_log(f"Tool call requested: {function_name} with args: {function_args}") # Placeholder response - in production this would execute via MCP messages.append({ "role": "tool", "tool_call_id": tool_call.id, "name": function_name, "content": f"Tool execution via MCP is being set up. Tool {function_name} was called with arguments: {function_args}" }) # Get final response from model after tool execution debug_log("Getting final response after tool execution") final_response = client.chat.completions.create(**request_params) return final_response.choices[0].message.content return response.choices[0].message.content except Exception as e: error_msg = f"Error calling LiteLLM API: {str(e)}" print(error_msg) debug_log(f"Exception details: {e}") return error_msg @bot.event async def on_message(message): # Ignore messages from the bot itself if message.author == bot.user: return # Ignore system messages (thread starter, pins, etc.) if message.type != discord.MessageType.default and message.type != discord.MessageType.reply: return should_respond = False # Check if bot was mentioned if bot.user in message.mentions: should_respond = True # Check if message is a DM if isinstance(message.channel, discord.DMChannel): should_respond = True # Check if message is in a thread if isinstance(message.channel, discord.Thread): # Check if thread was started from a bot message try: starter = message.channel.starter_message if not starter: starter = await message.channel.fetch_message(message.channel.id) # If thread was started from bot's message, auto-respond if starter and starter.author.id == bot.user.id: should_respond = True debug_log("Thread started by bot - auto-responding") # If thread started from user message, only respond if mentioned elif bot.user in message.mentions: should_respond = True debug_log("Thread started by user - responding due to mention") except Exception as e: debug_log(f"Could not determine thread starter: {e}") # Default: only respond if mentioned if bot.user in message.mentions: should_respond = True if should_respond: async with message.channel.typing(): # Get chat history with proper conversation format history_messages = await get_chat_history(message.channel, bot.user.id) # Remove bot mention from the message user_message = message.content.replace(f'<@{bot.user.id}>', '').strip() # Collect image URLs from the message image_urls = [] for attachment in message.attachments: if any(attachment.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): image_urls.append(attachment.url) # Get AI response with proper conversation history response = await get_ai_response(history_messages, user_message, image_urls if image_urls else None) # Send response (split if too long for Discord's 2000 char limit) if len(response) > 2000: # Split into chunks chunks = [response[i:i+2000] for i in range(0, len(response), 2000)] for chunk in chunks: await message.reply(chunk) else: await message.reply(response) await bot.process_commands(message) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') def main(): if not all([DISCORD_TOKEN, LITELLM_API_KEY, LITELLM_API_BASE, MODEL_NAME]): print("Error: Missing required environment variables") print(f"DISCORD_TOKEN: {'✓' if DISCORD_TOKEN else '✗'}") print(f"LITELLM_API_KEY: {'✓' if LITELLM_API_KEY else '✗'}") print(f"LITELLM_API_BASE: {'✓' if LITELLM_API_BASE else '✗'}") print(f"MODEL_NAME: {'✓' if MODEL_NAME else '✗'}") return print(f"System Prompt loaded from: {SYSTEM_PROMPT_FILE}") print(f"Max history tokens: {MAX_HISTORY_TOKENS}") bot.run(DISCORD_TOKEN) if __name__ == "__main__": main()