import os import discord from discord.ext import commands from openai import OpenAI import base64 from dotenv import load_dotenv import aiohttp from typing import Dict, Any, List import tiktoken import httpx # Load environment variables load_dotenv() # Get environment variables DISCORD_TOKEN = os.getenv('DISCORD_TOKEN') LITELLM_API_KEY = os.getenv('LITELLM_API_KEY') LITELLM_API_BASE = os.getenv('LITELLM_API_BASE') MODEL_NAME = os.getenv('MODEL_NAME') SYSTEM_PROMPT_FILE = os.getenv('SYSTEM_PROMPT_FILE', './system_prompt.txt') MAX_HISTORY_TOKENS = int(os.getenv('MAX_HISTORY_TOKENS', '3000')) DEBUG_LOGGING = os.getenv('DEBUG_LOGGING', 'false').lower() == 'true' ENABLE_TOOLS = os.getenv('ENABLE_TOOLS', 'false').lower() == 'true' def debug_log(message: str): """Print debug message if DEBUG_LOGGING is enabled""" if DEBUG_LOGGING: print(f"[DEBUG] {message}") # Load system prompt from file def load_system_prompt(): """Load system prompt from file, with fallback to default""" try: with open(SYSTEM_PROMPT_FILE, 'r', encoding='utf-8') as f: return f.read().strip() except FileNotFoundError: return "You are a helpful AI assistant integrated into Discord." SYSTEM_PROMPT = load_system_prompt() # Configure OpenAI client to point to LiteLLM client = OpenAI( api_key=LITELLM_API_KEY, base_url=LITELLM_API_BASE # e.g., "http://localhost:4000" ) # Initialize tokenizer for token counting try: encoding = tiktoken.encoding_for_model("gpt-4") except KeyError: encoding = tiktoken.get_encoding("cl100k_base") # Initialize Discord bot intents = discord.Intents.default() intents.message_content = True intents.messages = True bot = commands.Bot(command_prefix='!', intents=intents) # Message history cache - stores recent conversations per channel channel_history: Dict[int, List[Dict[str, Any]]] = {} def count_tokens(text: str) -> int: """Count tokens in a text string""" try: return len(encoding.encode(text)) except Exception: # Fallback: rough estimate (1 token ≈ 4 characters) return len(text) // 4 async def download_image(url: str) -> str | None: """Download image and convert to base64 using async aiohttp""" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: if response.status == 200: image_data = await response.read() base64_image = base64.b64encode(image_data).decode('utf-8') return base64_image except Exception as e: print(f"Error downloading image from {url}: {e}") return None async def execute_mcp_tool(tool_name: str, arguments: dict) -> str: """Execute an MCP tool via LiteLLM's /mcp/call_tool endpoint""" import json try: base_url = LITELLM_API_BASE.rstrip('/') headers = { "Authorization": f"Bearer {LITELLM_API_KEY}", "Content-Type": "application/json", "Accept": "application/json" } debug_log(f"Executing MCP tool: {tool_name} with args: {arguments}") async with httpx.AsyncClient(timeout=60.0) as http_client: response = await http_client.post( f"{base_url}/mcp/call_tool", headers=headers, json={ "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": arguments }, "id": 1 } ) debug_log(f"MCP call_tool response status: {response.status_code}") if response.status_code == 200: result = response.json() debug_log(f"MCP tool result: {str(result)[:200]}...") # Handle JSON-RPC response format if isinstance(result, dict): # Check for JSON-RPC result if "result" in result: rpc_result = result["result"] # MCP tool results have a "content" array if isinstance(rpc_result, dict) and "content" in rpc_result: content = rpc_result["content"] if isinstance(content, list) and len(content) > 0: # Handle text content blocks first_content = content[0] if isinstance(first_content, dict) and "text" in first_content: return first_content["text"] return json.dumps(content) return json.dumps(content) if content else "Tool executed successfully" return json.dumps(rpc_result) # Fallback for non-RPC format if "content" in result: content = result["content"] if isinstance(content, list) and len(content) > 0: first_content = content[0] if isinstance(first_content, dict) and "text" in first_content: return first_content["text"] return json.dumps(content) return json.dumps(content) if content else "Tool executed successfully" return json.dumps(result) return str(result) else: error_text = response.text debug_log(f"MCP call_tool error: {response.status_code} - {error_text}") return f"Error executing tool: {response.status_code} - {error_text}" except Exception as e: debug_log(f"Exception calling MCP tool: {e}") import traceback debug_log(f"Traceback: {traceback.format_exc()}") return f"Error executing tool: {str(e)}" async def get_available_mcp_tools(): """Query LiteLLM for available MCP tools and convert to OpenAI function format""" try: base_url = LITELLM_API_BASE.rstrip('/') headers = {"Authorization": f"Bearer {LITELLM_API_KEY}"} async with httpx.AsyncClient(timeout=30.0) as http_client: # Get available MCP tools tools_response = await http_client.get( f"{base_url}/v1/mcp/tools", headers=headers ) if tools_response.status_code == 200: tools_data = tools_response.json() mcp_tools = tools_data.get("tools", []) if isinstance(tools_data, dict) else tools_data debug_log(f"Found {len(mcp_tools)} MCP tools") # Convert MCP tools to OpenAI function calling format openai_tools = [] for tool in mcp_tools: if isinstance(tool, dict) and tool.get("name") and tool.get("description"): openai_tool = { "type": "function", "function": { "name": tool["name"], "description": tool.get("description", ""), "parameters": tool.get("inputSchema", {"type": "object", "properties": {}}) } } openai_tools.append(openai_tool) debug_log(f"Converted {len(openai_tools)} tools to OpenAI format") return openai_tools else: debug_log(f"MCP tools endpoint returned {tools_response.status_code}") except Exception as e: debug_log(f"Error fetching MCP tools: {e}") return [] async def get_chat_history(channel, bot_user_id: int, limit: int = 50) -> List[Dict[str, Any]]: """ Retrieve chat history and format as proper conversation messages. Only includes messages relevant to bot conversations. Returns list of message dicts with proper role attribution. Supports both regular channels and threads. """ messages = [] total_tokens = 0 # Check if this is a thread is_thread = isinstance(channel, discord.Thread) debug_log(f"Fetching history - is_thread: {is_thread}, channel: {channel.name if hasattr(channel, 'name') else 'DM'}") # For threads, we want ALL messages in the thread (not just bot-related) # For channels, we only want bot-related messages message_count = 0 skipped_system = 0 # For threads, fetch the context including parent message if it exists if is_thread: try: # Get the starter message (first message in thread) if channel.starter_message: starter = channel.starter_message else: starter = await channel.fetch_message(channel.id) # If the starter message is replying to another message, fetch that parent if starter and starter.reference and starter.reference.message_id: try: parent_message = await channel.parent.fetch_message(starter.reference.message_id) if parent_message and (parent_message.type == discord.MessageType.default or parent_message.type == discord.MessageType.reply): is_bot_parent = parent_message.author.id == bot_user_id role = "assistant" if is_bot_parent else "user" content = f"{parent_message.author.display_name}: {parent_message.content}" if not is_bot_parent else parent_message.content # Remove bot mention if present if not is_bot_parent and bot_user_id: content = content.replace(f'<@{bot_user_id}>', '').strip() msg = {"role": role, "content": content} msg_tokens = count_tokens(content) if msg_tokens <= MAX_HISTORY_TOKENS: messages.append(msg) total_tokens += msg_tokens message_count += 1 debug_log(f"Added parent message: role={role}, content_preview={content[:50]}...") except Exception as e: debug_log(f"Could not fetch parent message: {e}") # Add the starter message itself if starter and (starter.type == discord.MessageType.default or starter.type == discord.MessageType.reply): is_bot_starter = starter.author.id == bot_user_id role = "assistant" if is_bot_starter else "user" content = f"{starter.author.display_name}: {starter.content}" if not is_bot_starter else starter.content # Remove bot mention if present if not is_bot_starter and bot_user_id: content = content.replace(f'<@{bot_user_id}>', '').strip() msg = {"role": role, "content": content} msg_tokens = count_tokens(content) if total_tokens + msg_tokens <= MAX_HISTORY_TOKENS: messages.append(msg) total_tokens += msg_tokens message_count += 1 debug_log(f"Added thread starter: role={role}, content_preview={content[:50]}...") except Exception as e: debug_log(f"Could not fetch thread messages: {e}") # Fetch history from the channel/thread async for message in channel.history(limit=limit): message_count += 1 # Skip system messages (thread starters, pins, etc.) if message.type != discord.MessageType.default and message.type != discord.MessageType.reply: skipped_system += 1 debug_log(f"Skipping system message type: {message.type}") continue # Determine if we should include this message is_bot_message = message.author.id == bot_user_id is_bot_mentioned = any(mention.id == bot_user_id for mention in message.mentions) is_dm = isinstance(channel, discord.DMChannel) # In threads: include ALL messages for full context # In regular channels: only include bot-related messages # In DMs: include all messages if is_thread or is_dm: should_include = True else: should_include = is_bot_message or is_bot_mentioned if not should_include: continue # Determine role role = "assistant" if is_bot_message else "user" # Build content with author name in threads for multi-user context if is_thread and not is_bot_message: # Include username in threads for clarity content = f"{message.author.display_name}: {message.content}" else: content = message.content # Remove bot mention from user messages if not is_bot_message and is_bot_mentioned: content = content.replace(f'<@{bot_user_id}>', '').strip() # Note: We'll handle images separately in the main flow # For history, we just note that images were present if message.attachments: image_count = sum(1 for att in message.attachments if any(att.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp'])) if image_count > 0: content += f" [attached {image_count} image(s)]" # Add to messages with token counting msg = {"role": role, "content": content} msg_tokens = count_tokens(content) # Check if adding this message would exceed token limit if total_tokens + msg_tokens > MAX_HISTORY_TOKENS: break messages.append(msg) total_tokens += msg_tokens debug_log(f"Added message: role={role}, content_preview={content[:50]}...") # Reverse to get chronological order (oldest first) debug_log(f"Processed {message_count} messages, skipped {skipped_system} system messages") debug_log(f"Total messages collected: {len(messages)}, total tokens: {total_tokens}") return list(reversed(messages)) async def get_ai_response(history_messages: List[Dict[str, Any]], user_message: str, image_urls: List[str] = None) -> str: """ Get AI response using LiteLLM chat.completions with manual MCP tool execution. Uses manual tool execution loop since Responses API doesn't work with Bedrock + MCP. Args: history_messages: List of previous conversation messages with roles user_message: Current user message image_urls: Optional list of image URLs to include Returns: AI response string """ import json # Build messages array messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages.extend(history_messages) # Build current user message if image_urls: content_parts = [{"type": "text", "text": user_message}] for url in image_urls: base64_image = await download_image(url) if base64_image: content_parts.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"} }) messages.append({"role": "user", "content": content_parts}) else: messages.append({"role": "user", "content": user_message}) try: # Build request parameters request_params = { "model": MODEL_NAME, "messages": messages, "temperature": 0.7, } # Add MCP tools if enabled tools = [] if ENABLE_TOOLS: debug_log("Tools enabled - fetching MCP tools") tools = await get_available_mcp_tools() if tools: request_params["tools"] = tools request_params["tool_choice"] = "auto" debug_log(f"Added {len(tools)} tools to request") debug_log(f"Calling chat.completions with {len(tools)} tools") response = client.chat.completions.create(**request_params) # Handle tool calls if present response_message = response.choices[0].message tool_calls = getattr(response_message, 'tool_calls', None) # Tool execution loop (max 5 iterations to prevent infinite loops) max_iterations = 5 iteration = 0 while tool_calls and len(tool_calls) > 0 and iteration < max_iterations: iteration += 1 debug_log(f"Tool call iteration {iteration}: Model requested {len(tool_calls)} tool calls") # Add assistant's response with tool calls to messages messages.append({ "role": "assistant", "content": response_message.content, "tool_calls": [ { "id": tc.id, "type": "function", "function": { "name": tc.function.name, "arguments": tc.function.arguments } } for tc in tool_calls ] }) # Execute each tool call via MCP for tool_call in tool_calls: function_name = tool_call.function.name function_args_str = tool_call.function.arguments debug_log(f"Executing tool: {function_name}") # Parse arguments try: args_dict = json.loads(function_args_str) if isinstance(function_args_str, str) else function_args_str except json.JSONDecodeError: args_dict = {} debug_log(f"Failed to parse tool arguments: {function_args_str}") # Execute the tool via MCP tool_result = await execute_mcp_tool(function_name, args_dict) # Add tool result to messages messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": tool_result }) # Get next response from model debug_log("Getting model response after tool execution") request_params["messages"] = messages response = client.chat.completions.create(**request_params) response_message = response.choices[0].message tool_calls = getattr(response_message, 'tool_calls', None) if iteration >= max_iterations: debug_log(f"Warning: Reached max tool iterations ({max_iterations})") final_content = response.choices[0].message.content debug_log(f"Final response: {final_content[:100] if final_content else 'None'}...") return final_content or "I received a response but it was empty. Please try again." except Exception as e: error_msg = f"Error calling LiteLLM API: {str(e)}" print(error_msg) debug_log(f"Exception details: {e}") import traceback debug_log(f"Traceback: {traceback.format_exc()}") return error_msg @bot.event async def on_message(message): # Ignore messages from the bot itself if message.author == bot.user: return # Ignore system messages (thread starter, pins, etc.) if message.type != discord.MessageType.default and message.type != discord.MessageType.reply: return should_respond = False # Check if bot was mentioned if bot.user in message.mentions: should_respond = True # Check if message is a DM if isinstance(message.channel, discord.DMChannel): should_respond = True # Check if message is in a thread if isinstance(message.channel, discord.Thread): # Check if thread was started from a bot message try: starter = message.channel.starter_message if not starter: starter = await message.channel.fetch_message(message.channel.id) # If thread was started from bot's message, auto-respond if starter and starter.author.id == bot.user.id: should_respond = True debug_log("Thread started by bot - auto-responding") # If thread started from user message, only respond if mentioned elif bot.user in message.mentions: should_respond = True debug_log("Thread started by user - responding due to mention") except Exception as e: debug_log(f"Could not determine thread starter: {e}") # Default: only respond if mentioned if bot.user in message.mentions: should_respond = True if should_respond: async with message.channel.typing(): # Get chat history with proper conversation format history_messages = await get_chat_history(message.channel, bot.user.id) # Remove bot mention from the message user_message = message.content.replace(f'<@{bot.user.id}>', '').strip() # Collect image URLs from the message image_urls = [] for attachment in message.attachments: if any(attachment.filename.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): image_urls.append(attachment.url) # Get AI response with proper conversation history response = await get_ai_response(history_messages, user_message, image_urls if image_urls else None) # Send response (split if too long for Discord's 2000 char limit) if len(response) > 2000: # Split into chunks chunks = [response[i:i+2000] for i in range(0, len(response), 2000)] for chunk in chunks: await message.reply(chunk) else: await message.reply(response) await bot.process_commands(message) @bot.event async def on_ready(): print(f'{bot.user} has connected to Discord!') def main(): if not all([DISCORD_TOKEN, LITELLM_API_KEY, LITELLM_API_BASE, MODEL_NAME]): print("Error: Missing required environment variables") print(f"DISCORD_TOKEN: {'✓' if DISCORD_TOKEN else '✗'}") print(f"LITELLM_API_KEY: {'✓' if LITELLM_API_KEY else '✗'}") print(f"LITELLM_API_BASE: {'✓' if LITELLM_API_BASE else '✗'}") print(f"MODEL_NAME: {'✓' if MODEL_NAME else '✗'}") return print(f"System Prompt loaded from: {SYSTEM_PROMPT_FILE}") print(f"Max history tokens: {MAX_HISTORY_TOKENS}") bot.run(DISCORD_TOKEN) if __name__ == "__main__": main()