""" title: Multi Agent Collaboration System for Open WebUI Description: Allows for Multiple Models to act as Agents in collaboration version: 0.7.6 """ from pydantic import BaseModel, Field from fastapi import Request from typing import Callable, Any from open_webui.models.users import Users from open_webui.utils.chat import generate_chat_completion class EventEmitter: def __init__(self, event_emitter: Callable[[dict], Any] = None): self.event_emitter = event_emitter async def emit(self, description="Unknown state", status="in_progress", done=False): """ Send a status event to the event emitter. :param description: Event description :param status: Event status :param done: Whether the event is complete """ if self.event_emitter: await self.event_emitter( { "type": "status", "data": { "status": status, "description": description, "done": done, }, } ) class Pipe: class UserValves(BaseModel): agent_list: list = ( Field(default=[], description="List of Models to process as agents"), ) operator_model: str = Field( default="us.anthropic.claude-3-5-sonnet-20241022-v2:0", description="Default Operator Model to use", ) pass def __init__(self): pass async def pipe( self, body: dict, __user__: dict, __request__: Request, __event_emitter__: Callable[[dict], Any] = None, ) -> str: """ :param __user__: User dictionary containing the user ID :param __event_emitter__: Optional event emitter for tracking status" :param body: body dictionary containing the prompt and messages and other settings sent to the LLM(s) :param __request__: User Request object :return: The final response from the operator model after processing through all agents """ # Initialize Event Emitter emitter = EventEmitter(__event_emitter__) # Use the unified endpoint with the updated signature user = Users.get_user_by_id(__user__["id"]) agents = __user__["valves"].agent_list operator_model = __user__["valves"].operator_model number_of_agents = len(agents) if "### Task:" in body["messages"][0]["content"]: body["model"] = operator_model print("Internal Request") return await generate_chat_completion(__request__, body, user) # Capture Stream Setting original_stream = body["stream"] if number_of_agents > 0: # Process through each agent in the list for agent_model in agents: body["stream"] = False # Temporarily change the model to the agent model body["model"] = agent_model print(f"Model being use: {agent_model}") message = f"Processing request through agent model {agent_model}" await emitter.emit( description=message, status="agent_processing", done=True ) response = await generate_chat_completion(__request__, body, user) content = response["choices"][0]["message"]["content"] print(f"This is the content from {agent_model}: {content}") # Add Agent response as context body["messages"].append( { "role": "assistant", "content": f"{response} \n (Provided by Agent: {agent_model})", } ) body["model"] = operator_model body["stream"] = original_stream print(f"Model being use: {operator_model}") message = f"Final Response from {operator_model}" await emitter.emit(description=message, status="final_processing", done=True) return await generate_chat_completion(__request__, body, user)